00001 #ifndef _BOOST_RPC_JSON_CONNECTION_HPP_
00002 #define _BOOST_RPC_JSON_CONNECTION_HPP_
00003 #include <boost/rpc/json.hpp>
00004 #include <boost/cmt/retainable.hpp>
00005 #include <boost/function.hpp>
00006 #include <boost/signals.hpp>
00007 #include <boost/fusion/container/vector.hpp>
00008 #include <boost/fusion/include/make_unfused.hpp>
00009 #include <boost/bind.hpp>
00010 #include <boost/rpc/message.hpp>
00011
00012 namespace boost { namespace rpc { namespace json {
00020 class connection : public boost::cmt::retainable {
00021 public:
00022 typedef boost::cmt::retainable_ptr<connection> ptr;
00023 typedef boost::function<void(const boost::json::Value&, boost::json::Value&)> handler;
00024
00025 connection();
00026 virtual ~connection();
00027
00028 void add_signal_connection( const std::string& name,
00029 const boost::signals::connection& c );
00030 void add_method_handler( const std::string& name, const handler& h );
00031 void invoke( boost::json::Value& msg, boost::json::Value& rtn_msg, uint64_t timeout_us = -1 );
00032
00033 virtual void send( const boost::json::Value& v )=0;
00034 virtual void start()=0;
00035 virtual bool is_connected()const = 0;
00036
00037 boost::signal<void()> disconnected;
00038 protected:
00039 void on_receive( const boost::json::Value& v );
00040
00041 private:
00042 class connection_private* my;
00043 };
00044
00045 namespace detail {
00046
00047 template<typename Seq,typename ResultType>
00048 struct rpc_send_functor {
00049 typedef ResultType result_type;
00050
00051 rpc_send_functor( connection& c, const char* name )
00052 :m_con(c),m_msg(boost::json::Object())
00053 {
00054 boost::json::Object& m_obj = m_msg.get_obj();
00055 m_obj.push_back( boost::json::Pair( "id", 0 ) );
00056 m_obj.push_back( boost::json::Pair( "method", std::string(name) ) );
00057 m_obj.push_back( boost::json::Pair( "params", boost::json::Array() ) );
00058 }
00059
00060 ResultType operator()( const Seq& params ) const {
00061 boost::json::Object& m_obj = m_msg.get_obj();
00062 pack( m_obj.back().value_, params );
00063 boost::json::Value rtn_msg;
00064 m_con.invoke( m_msg, rtn_msg );
00065 ResultType ret_val;
00066 if(rtn_msg.contains("error") ) {
00067 error_object e;
00068 unpack( rtn_msg["error"], e );
00069 BOOST_THROW_EXCEPTION( e );
00070 }
00071 if( rtn_msg.contains( "result" ) ) {
00072 unpack( rtn_msg["result"], ret_val );
00073 return ret_val;
00074 }
00075 error_object e;
00076 e.message = "invalid json RPC message, missing result or error";
00077 BOOST_THROW_EXCEPTION( e );
00078 }
00079 connection& m_con;
00080 mutable boost::json::Value m_msg;
00081 };
00082
00083 template<typename Seq, typename Functor, bool is_signal = false>
00084 struct rpc_recv_functor {
00085 rpc_recv_functor( Functor f, connection&, const char* )
00086 :m_func(f){}
00087 void operator()( const boost::json::Value& params, boost::json::Value& rtn ) {
00088 Seq paramv;
00089 unpack( params, paramv );
00090 pack( rtn, m_func(paramv) );
00091 }
00092 Functor m_func;
00093 };
00094
00095
00096
00097
00098
00099
00100 struct scoped_block_signal {
00101 scoped_block_signal( boost::signals::connection& _c )
00102 :c(_c),unblock(false){
00103 if( c != boost::signals::connection() && !c.blocked() ) {
00104 unblock = true;
00105 c.block();
00106 }
00107 }
00108 ~scoped_block_signal() {
00109 if( unblock && c != boost::signals::connection() )
00110 c.unblock();
00111 }
00112 private:
00113 bool unblock;
00114 boost::signals::connection& c;
00115 };
00116
00117 template<typename Seq, typename Functor>
00118 struct rpc_recv_functor<Seq,Functor,true> {
00119 typedef typename boost::remove_reference<Functor>::type functor_type;
00120 rpc_recv_functor( Functor f, connection& c, const char* name )
00121 :m_name(name),m_con(c),m_func(f){
00122 m_sig_con = m_func.connect( rpc_send_functor<Seq,
00123 typename functor_type::result_type>( m_con, m_name ) );
00124 m_sig_con.block();
00125 c.add_signal_connection( name, m_sig_con );
00126 }
00127
00128 void operator()( const boost::json::Value& params, boost::json::Value& rtn ) {
00129 scoped_block_signal block_reverse(m_sig_con);
00130 Seq paramv;
00131 unpack( params, paramv );
00132 pack( rtn, m_func(paramv) );
00133 }
00134
00135 boost::signals::connection m_sig_con;
00136 const char* m_name;
00137 connection& m_con;
00138 Functor m_func;
00139 };
00140
00141 using boost::reflect::void_t;
00142 typedef boost::fusion::vector<std::string,int> connect_signal_params;
00143 typedef rpc_send_functor<connect_signal_params,void_t> rpc_connect_signal_base;
00144
00145 struct rpc_connect_delegate : public rpc_connect_signal_base {
00146 public:
00147 rpc_connect_delegate( connection& c, const char* name )
00148 :rpc_connect_signal_base(c,"rpc_connect_signal"),m_name(name){}
00149
00150 void operator()( int count )const {
00151 connect_signal_params v(m_name,count);
00152 rpc_connect_signal_base::operator()(v);
00153 }
00154 private:
00155 std::string m_name;
00156 };
00157
00158 template<bool IsSignal = false>
00159 struct if_signal {
00160 template<typename M>
00161 static void set_delegate( connection& c, M& m, const char* name ){
00162 m = detail::rpc_send_functor<typename M::fused_params,
00163 typename M::result_type>(c, name);
00164 }
00165 };
00166
00167 template<>
00168 struct if_signal<true> {
00169 template<typename M>
00170 static void set_delegate( connection& c, M& m, const char* name ){
00171 m = detail::rpc_send_functor<typename M::fused_params,
00172 typename M::result_type>(c, name);
00173
00174
00175 m.set_connect_delegate(rpc_connect_delegate(c,name));
00176
00177
00178
00179 c.add_method_handler( name,
00180 rpc_recv_functor<typename M::fused_params,
00181 boost::function<typename M::result_type(typename M::fused_params)> >(
00182 boost::bind(&M::emit, &m, _1), c, name) );
00183 }
00184 };
00185
00186 }
00187
00188 } } }
00189
00190 #endif// _BOOST_RPC_JSON_CONNECTION_HPP_