MACE
1.0.0
|
00001 #ifndef _MACE_CMT_FUTURE_HPP 00002 #define _MACE_CMT_FUTURE_HPP 00003 #include <mace/cmt/retainable.hpp> 00004 #include <mace/cmt/error.hpp> 00005 #include <mace/cmt/log/log.hpp> 00006 #include <boost/thread/mutex.hpp> 00007 #include <mace/cmt/spin_yield_lock.hpp> 00008 #include <boost/optional.hpp> 00009 #include <boost/chrono.hpp> 00010 #include <boost/thread/condition_variable.hpp> 00011 00012 namespace mace { namespace cmt { 00013 using boost::chrono::microseconds; 00014 using boost::chrono::system_clock; 00015 boost::system_time to_system_time( const boost::chrono::system_clock::time_point& t ); 00016 00017 class abstract_thread; 00018 class task; 00019 class promise_base : public retainable { 00020 public: 00021 typedef retainable_ptr<promise_base> ptr; 00022 promise_base():m_task(0),m_blocked_thread(0),m_timeout(microseconds::max()){} 00023 virtual ~promise_base(){} 00024 00025 void set_task( task* t ); 00026 void cancel(); 00027 virtual bool ready()const = 0; 00028 protected: 00029 void enqueue_thread(); 00030 void wait( const microseconds& timeout_us ); 00031 void wait_until( const system_clock::time_point& timeout_us ); 00032 void notify(); 00033 virtual void set_timeout()=0; 00034 virtual void set_exception( const boost::exception_ptr& e )=0; 00035 00036 private: 00037 friend class thread; 00038 friend class thread_private; 00039 00040 task* m_task; 00041 abstract_thread* m_blocked_thread; 00042 microseconds m_timeout; 00043 }; 00044 00045 struct void_t {}; 00046 00053 template<typename T = void_t> 00054 class promise : public promise_base { 00055 public: 00056 typedef retainable_ptr<promise> ptr; 00057 static ptr make() { return ptr(new promise()); } 00058 00059 promise(){} 00060 promise( const T& v ):m_value(v){} 00061 00062 bool error()const { return m_error; } 00063 virtual bool ready()const { 00064 boost::unique_lock<spin_yield_lock> lock( m_spin_yield ); 00065 return ( m_error || m_value ); 00066 } 00067 00068 virtual const T& wait(const microseconds& timeout = microseconds::max() ){ 00069 { // lock while we check values 00070 boost::unique_lock<spin_yield_lock> lock( m_spin_yield ); 00071 if( m_error ) boost::rethrow_exception(m_error); 00072 if( m_value ) return *m_value; 00073 enqueue_thread(); 00074 } // unlock before yielding, but after enqueing 00075 promise_base::wait(timeout); 00076 if( m_error ) { 00077 boost::exception_ptr er = m_error; 00078 m_error = boost::exception_ptr(); 00079 boost::rethrow_exception(er); 00080 } 00081 if( m_value ) return *m_value; 00082 BOOST_THROW_EXCEPTION( error::future_value_not_ready() ); 00083 return *m_value; 00084 } 00085 00086 virtual const T& wait_until(const system_clock::time_point& timeout ){ 00087 { // lock while we check values 00088 boost::unique_lock<spin_yield_lock> lock( m_spin_yield ); 00089 if( m_error ) boost::rethrow_exception(m_error); 00090 if( m_value ) return *m_value; 00091 enqueue_thread(); 00092 } // unlock before yielding, but after enqueing 00093 promise_base::wait_until(timeout); 00094 if( m_error ) { 00095 boost::exception_ptr er = m_error; 00096 m_error = boost::exception_ptr(); 00097 boost::rethrow_exception(er); 00098 } 00099 if( m_value ) return *m_value; 00100 BOOST_THROW_EXCEPTION( error::future_value_not_ready() ); 00101 return *m_value; 00102 } 00103 00104 virtual void set_exception( const boost::exception_ptr& e ) { 00105 { 00106 boost::unique_lock<spin_yield_lock> lock( m_spin_yield ); 00107 m_error = e; 00108 } 00109 notify(); 00110 } 00111 virtual void set_value( T&& v ) { 00112 { 00113 boost::unique_lock<spin_yield_lock> lock( m_spin_yield ); 00114 if( m_error ) 00115 return; 00116 m_value = std::move(v); 00117 } 00118 notify(); 00119 } 00120 virtual void set_value( const T& v ) { 00121 { 00122 boost::unique_lock<spin_yield_lock> lock( m_spin_yield ); 00123 if( m_error ) 00124 return; 00125 m_value = v; 00126 } 00127 notify(); 00128 } 00129 00130 protected: 00131 virtual void set_timeout() { 00132 { 00133 boost::unique_lock<spin_yield_lock> lock( m_spin_yield ); 00134 if( m_value ) 00135 return; 00136 m_error = boost::copy_exception( error::future_wait_timeout() ); 00137 } 00138 notify(); 00139 } 00140 00141 mutable cmt::spin_yield_lock m_spin_yield; 00142 mutable boost::exception_ptr m_error; 00143 mutable boost::optional<T> m_value; 00144 }; 00145 00146 00147 template<> 00148 class promise<void> : public promise<void_t> {}; 00149 00150 00173 template<typename T = void_t> 00174 class future { 00175 public: 00176 typedef typename promise<T>::ptr promise_ptr; 00177 typedef T value_type; 00178 00179 future( const promise_ptr& p = promise_ptr() ) 00180 :m_prom(p){} 00181 future( const T& v ):m_prom( new promise<T>(v) ){} 00182 00183 void cancel() { if( m_prom && !ready() ) m_prom->cancel(); } 00184 00185 bool valid()const { return !!m_prom; } 00186 bool ready()const { return m_prom->ready();} 00187 bool error()const { return valid() ? m_prom->error() : false; } 00188 operator const T&()const { 00189 if( !m_prom ) BOOST_THROW_EXCEPTION( error::null_future() ); 00190 return m_prom->wait(); 00191 } 00192 const T& wait(const microseconds& timeout = microseconds::max() )const { 00193 if( !m_prom ) BOOST_THROW_EXCEPTION( error::null_future() ); 00194 return m_prom->wait(timeout); 00195 } 00196 const T& wait_until(const system_clock::time_point& timeout )const { 00197 if( !m_prom ) BOOST_THROW_EXCEPTION( error::null_future() ); 00198 return m_prom->wait_until(timeout); 00199 } 00200 00201 00202 private: 00203 mutable promise_ptr m_prom; 00204 }; 00205 00206 template<> 00207 class future<void> : public future<void_t> { 00208 public: 00209 future( const promise<void_t>::ptr& p = promise<void_t>::ptr() ) 00210 :future<void_t>(p){} 00211 future( const void_t& v ):future<void_t>(v){} 00212 }; 00213 00214 00215 } } // mace::cmt 00216 00217 00218 #endif