Bytemaster's Boost Libraries
/Users/dlarimer/dev/libs/rpc/include/boost/rpc/json/connection.hpp
00001 #ifndef _BOOST_RPC_JSON_CONNECTION_HPP_
00002 #define _BOOST_RPC_JSON_CONNECTION_HPP_
00003 #include <boost/cmt/future.hpp>
00004 #include <boost/chrono/chrono.hpp>
00005 #include <boost/rpc/json.hpp>
00006 #include <boost/cmt/retainable.hpp>
00007 #include <boost/function.hpp>
00008 #include <boost/signals.hpp>
00009 #include <boost/fusion/container/vector.hpp>
00010 #include <boost/fusion/functional/generation/make_unfused.hpp>
00011 #include <boost/bind.hpp>
00012 #include <boost/rpc/message.hpp>
00013 
00014 namespace boost { namespace rpc { namespace json {
00022   class connection : public boost::cmt::retainable, boost::noncopyable {
00023     public:
00024       typedef boost::cmt::retainable_ptr<connection>              ptr;
00025       typedef boost::function<void(const boost::json::Value&, boost::json::Value&)> handler;
00026   
00027       connection();
00028       virtual ~connection();
00029 
00030       void add_signal_connection( const std::string& name, 
00031                                   const boost::signals::connection& c );
00032       void add_method_handler( const std::string& name, const handler& h ); 
00033       void invoke( boost::json::Value& msg, boost::json::Value& rtn_msg, 
00034                    const boost::chrono::microseconds& timeout_us = boost::chrono::microseconds::max() );
00035 
00036       virtual void send( const boost::json::Value& v )=0;
00037       virtual void start()=0;
00038       virtual bool is_connected()const = 0;
00039 
00040       boost::signal<void()> disconnected;
00041     protected:
00042       void on_receive( const boost::json::Value& v );
00043 
00044     private:
00045       class connection_private* my;
00046   };
00047 
00048   namespace detail {
00049 
00050     template<typename Seq,typename ResultType>
00051     struct rpc_send_functor {
00052       typedef ResultType result_type;
00053 
00054       rpc_send_functor( connection& c, const char* name )
00055       :m_con(c),m_msg(boost::json::Object()) 
00056       {
00057         boost::json::Object&  m_obj = m_msg.get_obj(); // obj stored in m_msg
00058         m_obj.push_back( boost::json::Pair( "id", 0 ) );
00059         m_obj.push_back( boost::json::Pair( "method", std::string(name) ) );
00060         m_obj.push_back( boost::json::Pair( "params", boost::json::Array() ) );
00061       }
00062 
00063       ResultType operator()( const Seq& params ) const  {
00064         boost::json::Object&  m_obj = m_msg.get_obj(); // obj stored in m_msg
00065         pack( m_obj.back().value_, params );
00066         boost::json::Value  rtn_msg;
00067         m_con.invoke( m_msg, rtn_msg );
00068         ResultType  ret_val;
00069         if(rtn_msg.contains("error") ) {
00070           error_object e;
00071           unpack( rtn_msg["error"], e );
00072           BOOST_THROW_EXCEPTION( e );
00073         }
00074         if( rtn_msg.contains( "result" ) )  {
00075           unpack( rtn_msg["result"], ret_val );
00076           return ret_val;
00077         }
00078         error_object e;
00079         e.message = "invalid json RPC message, missing result or error";
00080         BOOST_THROW_EXCEPTION( e );
00081       }
00082       connection&          m_con;
00083       mutable boost::json::Value    m_msg;
00084     }; // rpc_send_functor
00085     template<typename Seq,typename ResultType>
00086     struct rpc_send_functor<Seq, boost::cmt::future<ResultType> > {
00087       typedef ResultType result_type;
00088 
00089       rpc_send_functor( connection& c, const char* name )
00090       :m_con(c),m_msg(boost::json::Object()) 
00091       {
00092         boost::json::Object&  m_obj = m_msg.get_obj(); // obj stored in m_msg
00093         m_obj.push_back( boost::json::Pair( "id", 0 ) );
00094         m_obj.push_back( boost::json::Pair( "method", std::string(name) ) );
00095         m_obj.push_back( boost::json::Pair( "params", boost::json::Array() ) );
00096       }
00097 
00098       ResultType operator()( const Seq& params ) const  {
00099         boost::json::Object&  m_obj = m_msg.get_obj(); // obj stored in m_msg
00100         pack( m_obj.back().value_, params );
00101         boost::json::Value  rtn_msg;
00102         m_con.invoke( m_msg, rtn_msg );
00103         ResultType  ret_val;
00104         if(rtn_msg.contains("error") ) {
00105           error_object e;
00106           unpack( rtn_msg["error"], e );
00107           BOOST_THROW_EXCEPTION( e );
00108         }
00109         if( rtn_msg.contains( "result" ) )  {
00110           unpack( rtn_msg["result"], ret_val );
00111           return ret_val;
00112         }
00113         error_object e;
00114         e.message = "invalid json RPC message, missing result or error";
00115         BOOST_THROW_EXCEPTION( e );
00116       }
00117       connection&          m_con;
00118       mutable boost::json::Value    m_msg;
00119     }; // rpc_send_functor
00120 
00121 
00122 
00123     template<typename Seq, typename Functor, bool is_signal = false>
00124     struct rpc_recv_functor {
00125       rpc_recv_functor( Functor f, connection&, const char* )
00126       :m_func(f){}
00127       void operator()( const boost::json::Value& params, boost::json::Value& rtn ) {
00128         Seq paramv;
00129         unpack( params, paramv );
00130         pack( rtn, m_func(paramv) );
00131       }
00132       Functor m_func;
00133     };
00134 
00135     /*
00136      *  Blocks a signal if it is currently unblocked and 
00137      *  unblocks it when it goes out of scope if it was blocked
00138      *  when constructed. 
00139      */
00140     struct scoped_block_signal {
00141       scoped_block_signal( boost::signals::connection& _c )
00142       :c(_c),unblock(false){ 
00143         if( c != boost::signals::connection() && !c.blocked() )  {
00144           unblock = true;
00145           c.block();
00146         }
00147       }
00148       ~scoped_block_signal() { 
00149         if( unblock && c != boost::signals::connection() ) 
00150             c.unblock(); 
00151       }
00152       private:
00153         bool                        unblock;
00154         boost::signals::connection& c; 
00155     };
00156     template<typename Seq, typename Functor>
00157     struct rpc_recv_functor<Seq,Functor,true> {
00158       typedef typename boost::remove_reference<Functor>::type functor_type;
00159       rpc_recv_functor( Functor f, connection& c, const char* name )
00160       :m_name(name),m_con(c),m_func(f){
00161         wlog( "rpc_recv_functor %1% %2%", this, name );
00162         m_sig_con = m_func.connect( rpc_send_functor<Seq,
00163                                     typename functor_type::result_type>( m_con, m_name ) );
00164         m_sig_con.block(); 
00165         c.add_signal_connection( name, m_sig_con );
00166       }
00167 
00168 
00169       void operator()( const boost::json::Value& params, boost::json::Value& rtn ) {
00170         wlog( "rpc_recv_functor %1%", this );
00171         scoped_block_signal block_reverse(m_sig_con);
00172         Seq paramv;
00173         unpack( params, paramv );
00174         pack( rtn, m_func(paramv) );
00175       }
00176 
00177       boost::signals::connection m_sig_con;
00178       const char*                m_name;
00179       connection&                m_con;
00180       Functor                    m_func;
00181     };
00182 
00183     template<typename Seq, bool Sig, typename Functor>
00184     inline rpc_recv_functor<Seq,Functor,Sig> make_rpc_recv_functor( Functor f, connection& c, const char* n ) {
00185         return rpc_recv_functor<Seq,Functor,Sig>(f,c,n);
00186     }
00187     using boost::reflect::void_t;
00188     typedef boost::fusion::vector<std::string,int>         connect_signal_params;
00189     typedef rpc_send_functor<connect_signal_params,void_t> rpc_connect_signal_base;
00190 
00191     struct rpc_connect_delegate : public rpc_connect_signal_base {
00192       public:
00193         rpc_connect_delegate( connection& c, const char* name )
00194         :rpc_connect_signal_base(c,"rpc_connect_signal"),m_name(name){}
00195 
00196         void operator()( int count )const {
00197           connect_signal_params v(m_name,count);
00198           rpc_connect_signal_base::operator()(v);
00199         }
00200       private:
00201         std::string          m_name;
00202     };
00203 
00204     template<bool IsSignal = false> 
00205     struct if_signal {
00206       template<typename M>
00207       static void set_delegate( connection& c, M& m, const char* name ){
00208         m = detail::rpc_send_functor<typename M::fused_params, 
00209                                      typename M::result_type>(c, name);
00210       }
00211     };
00212 
00213     template<> 
00214     struct if_signal<true> {
00215       template<typename M>
00216       static void set_delegate( connection& c, M& m, const char* name ){
00217         m = detail::rpc_send_functor<typename M::fused_params, 
00218                                      typename M::result_type>(c, name);
00219 
00220         // when client connects/disconnects local signals, notify server of change
00221         m.set_connect_delegate(rpc_connect_delegate(c,name));
00222 
00223         // when server calls the client, emit locally, but do not 'echo'
00224         // back to the server
00225         c.add_method_handler( name, 
00226              rpc_recv_functor<typename M::fused_params, 
00227                 boost::function<typename M::result_type(typename M::fused_params)> >(
00228                 boost::bind(&M::emit, &m, _1), c, name) );
00229       }
00230     };
00231   
00232   } // namespace detail
00233 
00234 } } } // boost::rpc::json
00235 
00236 #endif// _BOOST_RPC_JSON_CONNECTION_HPP_
 All Classes Namespaces Files Functions Variables Typedefs Defines