MACE  1.0.0
 All Classes Namespaces Files Functions Variables Enumerations Defines
CMT (Cooperative Multi-Tasking) Library
MACE - Massively Asynchronous Coding Environment

The MACE CMT library allows you to effeciently manage large numbers of parallel and asynchronous tasks and inter-thread communication using a lock-free scheduler.

In a cooperative implementation of multitasking, each task must explicitly yield control to the central scheduler to allow the next task to run. Assuming tasks play fairly this can result in much more effecient code when dealing with a high number of parallel tasks that are normally 'idle' waiting for data or a large number of small tasks.

Like traditional multi-threading each task has its own 'thread' of control which I will call a fiber. Unlike traditional multi-threading, you can be 100% sure that there can be no race conditions between two fibers running in the same thread; therefore, you can consider all operations that do not result in a yield to the scheduler 'atomic'.

The ability to treat most of your code as 'atomic' while still performing asynchronous operations that appear 'blocking' and 'synchronous' can greatly improve the readability of your code.

Because you only block a fiber when you are waiting on the result of an asynchronous operation, say posting a task to another thread, you greatly increase the amount of parallelism without increasing the number of 'real threads' and incurring additional synchronization and unnecessary context switching associated with large numbers of pre-emptive threads.

Lock-Free Scheduler

Multi-threading is hard, lock-free multi-threading is even harder and a source of a large number of hard-to-debug race conditions if not done right. With MACE CMT you can develop heavily multi-threaded applications without ever having to block on a boost::mutex (unless a thread is idle waiting for something to do).

Because traditional mutexes such as boost::mutex stall an entire thread and result in a switch between user-space and kernel space they can become a significant bottleneck in cases with high contention. They are also 'deadly' to a cooperative environment where one task grabing a boost::mutex will stop all other tasks from running in that thread.

The preferred method of interthread communication and synchronization is to ensure that only one thread is responsible for manipulating the 'shared' data. If another thread needs to query or set that data it does so by delegating the operation to the appropirate thread.

    future<R> r = other_thread->async( operation ); // async
    int r = other_thread->async( operation );       // sync

The act of posting operation is a single atomic compare_and_exchange and the act of 'waiting' on the result is almost free assuming your thread has other tasks to run while it waits. If it has nothing else to do, then it becomes an effective boost::wait_condition.

Consider the following example;

    async(implicit_other_task); // implicit
    future<R> r = other_thread->async( operation );
    explicit_other_task();      // explicit other tasks, no need for context switching
    r.wait();                   // perform implicit_other_task if r is not ready

The other tasks could be 'explicit' such as other_task(), or they could be implicit:

Most of the time a program with many things going on has plenty of asynchronous implicit tasks to keep it busy while waiting and therefore you incure minimal 'locking overhead'. It is even better if you can run call async(operation) and not even care about the return value.

Note:
While no locks are used directly, there is no pratical way to avoid memory allocation as a result of posting an asynchronous operation. The default memory allocator on most systems requires a memory lock. If this becomes a performance issue, you may want to consider using a thread-friendly allocator such as jemalloc.

Single Threaded Benchmark

Here is a simple benchmark program that performs an asynchornous operation and waits on the result. This example is effecitvely synchronous because only one async operation is in flight at a time and there is only one real thread.

#include <mace/cmt/thread.hpp>
using boost::chrono::system_clock;
namespace cmt = mace::cmt;

int hello( int in ){ return in; }

int main( int argc, char** argv ) {
    system_clock::time_point start = system_clock::now();
    int sum = 0;
    for( uint32_t i = 0; i < 100000000; ++i ) {
        sum += cmt::async( std::bind(hello, i) ).wait();
        if( i % 1000000 == 0 ) {
            system_clock::time_point end = system_clock::now();
            slog( "%1% calls/sec", (i/((end - start).count()/1000000000.0)) );
        }
    }
}

Multi-Threading with MACE.CMT

In this example all tasks will occur in parallel in three different threads (t1,t2, and main).

While main waits for results from threads t1 and t2 it will switch contexts and execute other operations that are scheduled in the main thread such as calculating the result of f3.

#include <mace/cmt/thread.hpp>

int hello( const std::string& s ) { return s.size(); }

namespace cmt = mace::cmt;

int main( int argc, char** argv ) {
  try{ 
   cmt::thread::current().set_name("main");
   cmt::thread* t1 = cmt::thread::create( "t1" );
   cmt::thread* t2 = cmt::thread::create( "t2" );

   cmt::future<int> f1 = t1->async( bind(hello,"world1") );
   auto f2 = t2->async( bind(hello,"world2") );
   auto f3 = cmt::async( bind(hello,"world3") );

   // world3 is processed in current thread, while waiting on f1 & f2
   std::cerr<<( f1.wait() + f2.wait() + f3.wait() ) << std::endl;

   t1->quit(); // cancel any tasks and join thread
   t2->quit(); // cancel any tasks and join thread

   cmt::thread::current().quit();
 } catch ( ... ) {
    elog( "Caught exception" );
    return 1; 
 }
 return 0;
}

Exception Handling

Any exception thrown durring an asynchronous operation is caught by the thread that calls cmt::future::wait()

Yielding and Sleeping

The current task can either yield and allow other tasks to run before returning or it can sleep for an specific amount of time while allowing other tasks to run.

      cmt::usleep(100000/*us*/);
      cmt::yield();

      // or the more verbose...
      cmt::thread::current().usleep(10000);
      cmt::thread::current().yield();

You can only yield or sleep for the current thread. If there are no other tasks ready to run then yield() returns immediately.

Note:
Sleep and yield times are dependent upon other tasks yielding in a timely manner.

Boost.Signals Integration

This example shows how a task can wait on an event triggered by a boost::signal and return the parameters emited by the signal. The output of the following program is 'hello world!' after waiting for 2 seconds.

Everything in this example runs in the main thread.

#include <mace/cmt/thread.hpp>
#include <mace/cmt/signals.hpp>

namespace cmt = mace::cmt;

boost::signal<void(std::string)> test_signal;

void delay() {
    cmt::usleep(2000000);
    test_signal("hello world!");
}

int main( int argc, char** argv ) {
     std::cerr<< "Delay for 2 seconds...\n";
     cmt::async( delay );
     std::cerr<< cmt::wait(test_signal) << std::endl;
     return 0;
}

Cooperative Mutexes

Cooperatively multi-tasked code must still worry about reentrancy. Suppose you have a thread sending a message across a socket, the socket members are thread safe, but the write_message() operation is not rentrant because the context could yield while waiting for a partial write to complete.

If while it has yielded another task in the same thread attempts to write a second message then you will get garbage out as both fibers take turns writing parts of their messages out of the socket.

Example problem:

    async(write_message);
    async(write_message);
    void write_message() {
      sock->write(part1); // may yield
      sock->write(part2); // may yield
      sock->write(part3); // may yield
    }

The output could look something like:

    part1
    part2
    part1
    part3
    part2
    part3

What you want to happen is this:

    void write_message() {
      boost::unique_lock<cmt::mutex> lock(sock->write_lock);
      sock->write(part1); // may yield
      sock->write(part2); // may yield
      sock->write(part3); // may yield
    }

Now if while writing the first message, someone attempts to write a second message second write will 'queue' behind the first by 'blocking' on the mutex.

The difference between boost::mutex and mace::cmt::mutex is that mace::cmt::mutex will not 'block' the calling thread while waiting for a lock, it will simply yield to other tasks until the lock has been aquired. There are no system calls and in a single threaded multi-tasked environment like the example above, no spin-lock contention at all. Multi-threaded environments use a spin-lock while updating the lock state.

Move-Aware Bind (passing non-copyable parameters)

One of the major drawbacks of passing parameters through to asynchronous operations via boost::bind() is that boost::bind must make an internal copy of the parameters. The only way to avoid the internal copy is to use boost::ref(), but now you must make sure that the reference does not go out of scope before the bound functor.

A side effect of this lack of move support is that it becomes impossible to pass non-copyable, but movable, objects between threads without doing heap allocation and using smart pointers.

Often even the simple case of calling an asynchronous method and passing a string as a parameter results in two unnecessary copies.

With that background, mace::cmt::bind(...) is designed to serve the needs of the CMT library and returns a functor that takes no arguments, the exact type of functor required for passing to mace::cmt::async(...).

  int test( std::string&& s );
  
  int main() {
    return async( mace::cmt::bind( &test, std::string("Hello World") ) ).wait();
  }

This functor takes its arguments by rvalue-reference and therefore, you cannot call it using boost::bind or std::bind.

A side effect of mace::cmt::bind's default assumption of 'move semantics', is that you cannot 'bind' non-rvalues. Therefore, you can use one of the following techniques:

   int cr_test( const std::string& s );
   int main() {
      std::string lvalue;
      int r  = cmt::async( cmt::bind( &cr_test, boost::ref(lvalue) ) );
      int r2 = cmt::async( cmt::bind( &cr_test, cmt::copy(lvalue) ) );
   }

The benefit of always using mace::cmt::bind is that in addition for potential performance gains, and support for move-only objects, your code is also much more explicit about where copies are being made.

Using Promises

Sometimes you need to perform an asynchronous operation of your own, such as a network call or waiting on user input. Here is an example from MACE.CMT's ASIO wrapper. Using cmt::asio::read you can perform a 'synchronous' read in the current thread without blocking the current thread. Instead the current thread will switch to running other tasks while waiting for the read to complete and then switch back once the read is ready. If an error occurs then read() will throw the exception passed to ASIO's completion handler.

    template<typename AsyncReadStream, typename MutableBufferSequence>
    size_t read( AsyncReadStream& s, const MutableBufferSequence& buf, const microseconds& timeout_us = microseconds::max() ) {
        promise<size_t>::ptr p(new promise<size_t>());
        boost::asio::async_read( s, buf, boost::bind( detail::read_write_handler, p, _1, _2 ) );
        return p->wait(timeout_us);
    }
    void read_write_handler( const promise<size_t>::ptr& p, const boost::system::error_code& ec, size_t bytes_transferred ) {
        if( !ec ) p->set_value(bytes_transferred);
        else p->set_exception( boost::copy_exception( boost::system::system_error(ec) ) );
    }

Boost.ASIO Integration

MACE.CMT vs Boost.ASIO

Boost.ASIO is currently how many projects perform asynchronous operations and multi-plex tasks on a pool of threads running asio::io_service::run. Synchronization occurs via the use of asio::io_service::strand which ensures that no two handlers within the strand can be run at the same time. Unfortunately, this is not useful for handlers that may want to block.

The Boost.Thread library includes boost::future<T> type that on the surface looks very similar to mace::cmt::future<T>, except that several problems emerge in practice:

Consider the following case study:

Boost.ASIOMACE.CMT
    void async_opp(function<void(int,bool)> callback) {
      callback( 42, error_code );  
    }
    void another_async_opp(function<void(int,bool)> callback) {
        callback( 53, error_code );  
    }

    // Requires shared or global state
    mutex            m;
    int              shared_state = 0;
    int              results[2];
    asio::io_service io_service

    // could be called by any thread running ioservice!
    void handle_result(int r, bool error) {
       unique_lock(m); // must protect shared_state  
       if( error && state != -1 ) { 
          state = -1; 
          std::cerr<<"An error occured!\n"; 
       } else if( state != -1 ) {
          results[state++] = r;
          if( state == 2 ) 
              std::cout<<results[0]+results[1]<<std::endl;
       }
    }
    void test( ) {
      io_service.post( bind( async_opp, handle_result ) );
      io_service.post( bind( another_async_opp, handle_result ) );
      other_work();
      // do other work
   }
  int async_opp() { return 42; }
  int another_async_opp() { return 53; }

  // given
  cmt::thread* t1;
  cmt::thread* t2;

  void test() {
    cmt::future<int> f1 = t1->async( async_opp );
    cmt::future<int> f2 = t2->async( another_async_opp );
    async( other_work ); // in current thread

    try {
      // while waiting other_work can run in this thread.
      std::cerr<< f1.wait() + f2.wait() << std::endl;
    } catch ( ... ) {
      std::cerr<<"An error occured!\n";
    }
  }
  • If this were implemented using boost::future<T> then other_work could not occur in the same thread while waiting for other asynchronous opperations to complete.
  • In order to avoid blocking an entire boost::asio::strand, you would have to revert to fine-grained locking with mutex.
  void test2( ) {
    test(); // behaves asynchronously, unexpected,  when is it done?
    // must we propagate another callback? 
    test( test2_complete_callback ); 
  }
  • now what about exceptions??
  • should we block this thread, add a global wait condition?
  • perhaps an event queue to help structure the problem?
  • Adopt a data-flow architecture?
  • Avoid async designs all together?
  void test2() { 
    test(); // behaves synchronously as expected
    async(test); // or call async and forget! 
  }

Cooperative Threading vs Qt-like Event Loops

The cooperative multi-tasking implementation is far supperior to the QApplication/QThread event loop when it comes to waiting for asynchronous tasks. If you want to implement a method in Qt that synchronously invokes a remote procedure call, then it must block the thread while it waits for the return value. If you want to keep the user interface responsive then you may optionally "recursively" process events.

There are many problems with recursive event loop invocations that lead to dead locks because the tasks must complete in the order in which they were called or the stack can never unwind.

Typically the solution to this problem is to use callbacks, signals, and other notification techniques. The problem with this approach is that you lose the "localization of code" and variables / algorithms end up spread across multiple methods. Local variables then need to be "maintained" outside of function scope as class member variables, often allocated on the heap. This greatly increases the complexity of the code.

This complexity becomes obvious when you have many asynchronous operations that must be performed synchrously or have some non-trivial dependency. Suppose you need to invoke 3 remote procedure calls on 3 different servers and that you need the return value from 1 of the calls before you can invoke the other two and that you need all three values before you can do your final calculations. This task is creates a mess of speghetti code with callbacks, state machine variables, etc unless you are willing to accept the performance hit of blocking an entire "heavy weight", preemitvely multi-tasked, operating system thread.

This same problem becomes trivial with the use of the MACE.CMT library. Simply asynchronously invoke each method which will return a future object. Then pass the futures into the other methods which will automatically run when the data is available. A complex asynchronous mess turns into what looks like synchronous code.