Bytemaster's Boost Libraries
|
00001 #ifndef _BOOST_RPC_JSON_CONNECTION_HPP_ 00002 #define _BOOST_RPC_JSON_CONNECTION_HPP_ 00003 #include <boost/cmt/future.hpp> 00004 #include <boost/chrono/chrono.hpp> 00005 #include <boost/rpc/json.hpp> 00006 #include <boost/cmt/retainable.hpp> 00007 #include <boost/function.hpp> 00008 #include <boost/signals.hpp> 00009 #include <boost/fusion/container/vector.hpp> 00010 #include <boost/fusion/functional/generation/make_unfused.hpp> 00011 #include <boost/bind.hpp> 00012 #include <boost/rpc/message.hpp> 00013 00014 namespace boost { namespace rpc { namespace json { 00022 class connection : public boost::cmt::retainable, boost::noncopyable { 00023 public: 00024 typedef boost::cmt::retainable_ptr<connection> ptr; 00025 typedef boost::function<void(const boost::json::Value&, boost::json::Value&)> handler; 00026 00027 connection(); 00028 virtual ~connection(); 00029 00030 void add_signal_connection( const std::string& name, 00031 const boost::signals::connection& c ); 00032 void add_method_handler( const std::string& name, const handler& h ); 00033 void invoke( boost::json::Value& msg, boost::json::Value& rtn_msg, 00034 const boost::chrono::microseconds& timeout_us = boost::chrono::microseconds::max() ); 00035 00036 virtual void send( const boost::json::Value& v )=0; 00037 virtual void start()=0; 00038 virtual bool is_connected()const = 0; 00039 00040 boost::signal<void()> disconnected; 00041 protected: 00042 void on_receive( const boost::json::Value& v ); 00043 00044 private: 00045 class connection_private* my; 00046 }; 00047 00048 namespace detail { 00049 00050 template<typename Seq,typename ResultType> 00051 struct rpc_send_functor { 00052 typedef ResultType result_type; 00053 00054 rpc_send_functor( connection& c, const char* name ) 00055 :m_con(c),m_msg(boost::json::Object()) 00056 { 00057 boost::json::Object& m_obj = m_msg.get_obj(); // obj stored in m_msg 00058 m_obj.push_back( boost::json::Pair( "id", 0 ) ); 00059 m_obj.push_back( boost::json::Pair( "method", std::string(name) ) ); 00060 m_obj.push_back( boost::json::Pair( "params", boost::json::Array() ) ); 00061 } 00062 00063 ResultType operator()( const Seq& params ) const { 00064 boost::json::Object& m_obj = m_msg.get_obj(); // obj stored in m_msg 00065 pack( m_obj.back().value_, params ); 00066 boost::json::Value rtn_msg; 00067 m_con.invoke( m_msg, rtn_msg ); 00068 ResultType ret_val; 00069 if(rtn_msg.contains("error") ) { 00070 error_object e; 00071 unpack( rtn_msg["error"], e ); 00072 BOOST_THROW_EXCEPTION( e ); 00073 } 00074 if( rtn_msg.contains( "result" ) ) { 00075 unpack( rtn_msg["result"], ret_val ); 00076 return ret_val; 00077 } 00078 error_object e; 00079 e.message = "invalid json RPC message, missing result or error"; 00080 BOOST_THROW_EXCEPTION( e ); 00081 } 00082 connection& m_con; 00083 mutable boost::json::Value m_msg; 00084 }; // rpc_send_functor 00085 template<typename Seq,typename ResultType> 00086 struct rpc_send_functor<Seq, boost::cmt::future<ResultType> > { 00087 typedef ResultType result_type; 00088 00089 rpc_send_functor( connection& c, const char* name ) 00090 :m_con(c),m_msg(boost::json::Object()) 00091 { 00092 boost::json::Object& m_obj = m_msg.get_obj(); // obj stored in m_msg 00093 m_obj.push_back( boost::json::Pair( "id", 0 ) ); 00094 m_obj.push_back( boost::json::Pair( "method", std::string(name) ) ); 00095 m_obj.push_back( boost::json::Pair( "params", boost::json::Array() ) ); 00096 } 00097 00098 ResultType operator()( const Seq& params ) const { 00099 boost::json::Object& m_obj = m_msg.get_obj(); // obj stored in m_msg 00100 pack( m_obj.back().value_, params ); 00101 boost::json::Value rtn_msg; 00102 m_con.invoke( m_msg, rtn_msg ); 00103 ResultType ret_val; 00104 if(rtn_msg.contains("error") ) { 00105 error_object e; 00106 unpack( rtn_msg["error"], e ); 00107 BOOST_THROW_EXCEPTION( e ); 00108 } 00109 if( rtn_msg.contains( "result" ) ) { 00110 unpack( rtn_msg["result"], ret_val ); 00111 return ret_val; 00112 } 00113 error_object e; 00114 e.message = "invalid json RPC message, missing result or error"; 00115 BOOST_THROW_EXCEPTION( e ); 00116 } 00117 connection& m_con; 00118 mutable boost::json::Value m_msg; 00119 }; // rpc_send_functor 00120 00121 00122 00123 template<typename Seq, typename Functor, bool is_signal = false> 00124 struct rpc_recv_functor { 00125 rpc_recv_functor( Functor f, connection&, const char* ) 00126 :m_func(f){} 00127 void operator()( const boost::json::Value& params, boost::json::Value& rtn ) { 00128 Seq paramv; 00129 unpack( params, paramv ); 00130 pack( rtn, m_func(paramv) ); 00131 } 00132 Functor m_func; 00133 }; 00134 00135 /* 00136 * Blocks a signal if it is currently unblocked and 00137 * unblocks it when it goes out of scope if it was blocked 00138 * when constructed. 00139 */ 00140 struct scoped_block_signal { 00141 scoped_block_signal( boost::signals::connection& _c ) 00142 :c(_c),unblock(false){ 00143 if( c != boost::signals::connection() && !c.blocked() ) { 00144 unblock = true; 00145 c.block(); 00146 } 00147 } 00148 ~scoped_block_signal() { 00149 if( unblock && c != boost::signals::connection() ) 00150 c.unblock(); 00151 } 00152 private: 00153 bool unblock; 00154 boost::signals::connection& c; 00155 }; 00156 template<typename Seq, typename Functor> 00157 struct rpc_recv_functor<Seq,Functor,true> { 00158 typedef typename boost::remove_reference<Functor>::type functor_type; 00159 rpc_recv_functor( Functor f, connection& c, const char* name ) 00160 :m_name(name),m_con(c),m_func(f){ 00161 wlog( "rpc_recv_functor %1% %2%", this, name ); 00162 m_sig_con = m_func.connect( rpc_send_functor<Seq, 00163 typename functor_type::result_type>( m_con, m_name ) ); 00164 m_sig_con.block(); 00165 c.add_signal_connection( name, m_sig_con ); 00166 } 00167 00168 00169 void operator()( const boost::json::Value& params, boost::json::Value& rtn ) { 00170 wlog( "rpc_recv_functor %1%", this ); 00171 scoped_block_signal block_reverse(m_sig_con); 00172 Seq paramv; 00173 unpack( params, paramv ); 00174 pack( rtn, m_func(paramv) ); 00175 } 00176 00177 boost::signals::connection m_sig_con; 00178 const char* m_name; 00179 connection& m_con; 00180 Functor m_func; 00181 }; 00182 00183 template<typename Seq, bool Sig, typename Functor> 00184 inline rpc_recv_functor<Seq,Functor,Sig> make_rpc_recv_functor( Functor f, connection& c, const char* n ) { 00185 return rpc_recv_functor<Seq,Functor,Sig>(f,c,n); 00186 } 00187 using boost::reflect::void_t; 00188 typedef boost::fusion::vector<std::string,int> connect_signal_params; 00189 typedef rpc_send_functor<connect_signal_params,void_t> rpc_connect_signal_base; 00190 00191 struct rpc_connect_delegate : public rpc_connect_signal_base { 00192 public: 00193 rpc_connect_delegate( connection& c, const char* name ) 00194 :rpc_connect_signal_base(c,"rpc_connect_signal"),m_name(name){} 00195 00196 void operator()( int count )const { 00197 connect_signal_params v(m_name,count); 00198 rpc_connect_signal_base::operator()(v); 00199 } 00200 private: 00201 std::string m_name; 00202 }; 00203 00204 template<bool IsSignal = false> 00205 struct if_signal { 00206 template<typename M> 00207 static void set_delegate( connection& c, M& m, const char* name ){ 00208 m = detail::rpc_send_functor<typename M::fused_params, 00209 typename M::result_type>(c, name); 00210 } 00211 }; 00212 00213 template<> 00214 struct if_signal<true> { 00215 template<typename M> 00216 static void set_delegate( connection& c, M& m, const char* name ){ 00217 m = detail::rpc_send_functor<typename M::fused_params, 00218 typename M::result_type>(c, name); 00219 00220 // when client connects/disconnects local signals, notify server of change 00221 m.set_connect_delegate(rpc_connect_delegate(c,name)); 00222 00223 // when server calls the client, emit locally, but do not 'echo' 00224 // back to the server 00225 c.add_method_handler( name, 00226 rpc_recv_functor<typename M::fused_params, 00227 boost::function<typename M::result_type(typename M::fused_params)> >( 00228 boost::bind(&M::emit, &m, _1), c, name) ); 00229 } 00230 }; 00231 00232 } // namespace detail 00233 00234 } } } // boost::rpc::json 00235 00236 #endif// _BOOST_RPC_JSON_CONNECTION_HPP_