MACE  1.0.0
 All Classes Namespaces Files Functions Variables Enumerations Defines
libs/rpc/include/mace/rpc/json/connection.hpp
00001 #ifndef _MACE_RPC_JSON_CONNECTION_HPP_
00002 #define _MACE_RPC_JSON_CONNECTION_HPP_
00003 #include <mace/rpc/json/value_io.hpp>
00004 #include <mace/rpc/error.hpp>
00005 #include <boost/function.hpp>
00006 #include <vector>
00007 #include <mace/cmt/thread.hpp>
00008 #include <boost/signals.hpp>
00009 #include <boost/fusion/include/size.hpp>
00010 #include <boost/fusion/include/front.hpp>
00011 #include <boost/fusion/support/deduce.hpp>
00012 #include <boost/fusion/include/make_fused_function_object.hpp>
00013 #include <boost/fusion/support/deduce_sequence.hpp>
00014 #include <boost/fusion/include/make_unfused.hpp>
00015 #include <mace/rpc/json/named_parameters.hpp>
00016 #include <boost/function_types/result_type.hpp>
00017 
00018 
00019 #include <mace/stub/ptr.hpp>
00020 
00021 namespace mace { namespace rpc { 
00026 namespace json {
00027 
00028 typedef boost::error_info<struct json_rpc_error_,json::value> error;
00029 typedef boost::error_info<struct json_err_data_,std::string> err_data;
00030 typedef boost::error_info<struct json_err_code_,int64_t> err_code;
00031 
00032   struct error_object : public virtual mace::rpc::exception {
00033       const char* what()const throw()     { return message().c_str();     }
00034       virtual void       rethrow()const   { BOOST_THROW_EXCEPTION(*this); } 
00035 
00036       const std::string& data()const
00037       { return *boost::get_error_info<mace::rpc::json::err_data>(*this); }
00038       int64_t code()const
00039       { return *boost::get_error_info<mace::rpc::json::err_code>(*this); }
00040   };
00041 
00042   class connection;
00043   typedef boost::function<json::value( connection&, const json::value& param )> rpc_method;
00044 
00053   class connection : public boost::enable_shared_from_this<connection> {
00054     public:
00059       struct function_filter {
00060          function_filter( connection& c ):m_con(c){}
00061 
00062          template<typename T>
00063          const T& operator()( const T& v ) { return v; }
00064 
00065          template<typename T>
00066          inline void operator()( const json::value& j, T& v ) { 
00067            json::io::unpack( *this, j, v ); 
00068          }
00069          template<typename Signature>
00070          inline void operator()( const json::value& j, boost::function<Signature> & v ) { 
00071              typedef typename boost::function_types::parameter_types<Signature>::type  mpl_param_types;
00072              typedef typename boost::fusion::result_of::as_vector<mpl_param_types>::type param_types;
00073              typedef typename boost::function_types::result_type<Signature>::type  R;
00074 
00075              mace::cmt::future<R> (connection::*cf)(const std::string&, const param_types&) = &connection::call_fused;
00076              v = boost::fusion::make_unfused(boost::bind( cf, &m_con, (const std::string&)j, _1 ) );
00077            //return unpack( *this, j, v ); 
00078          }
00079 
00080          template<typename Signature>
00081          std::string operator()( const boost::function<Signature>& f ); 
00082 
00083          private:
00084              connection& m_con;
00085       };
00086       
00087       typedef boost::shared_ptr<connection> ptr;
00088       typedef boost::weak_ptr<connection>   wptr;
00089 
00093       connection( mace::cmt::thread* t = &mace::cmt::thread::current()  );
00094       ~connection();
00095 
00096       mace::cmt::thread* get_thread()const;
00097 
00098       void add_method( const std::string& mid, const rpc_method& m );
00099 
00101       std::string add_method( const rpc_method& m );
00102 
00103 
00104       #include <mace/rpc/json/detail/call_methods.hpp>
00105 
00106 
00107       template<typename ParamSeq>
00108       mace::cmt::future<json::value> call_fused( const std::string& method_name, const ParamSeq& param ) {
00109         json::value msg;
00110         create_call( method_name, param, msg );
00111         msg["id"]     = next_method_id();
00112         typename pending_result_impl<json::value>::ptr pr = boost::make_shared<pending_result_impl<json::value> >(); 
00113         send( msg, boost::static_pointer_cast<pending_result>(pr) );
00114         return pr->prom;
00115       }
00116 
00117       template<typename ParamSeq>
00118       void create_call( const std::string& method_name, const ParamSeq& param, json::value& msg ) {
00119         msg["method"] = method_name;
00120       
00121         if( boost::fusion::size(param ) )
00122           pack_params( msg["params"], param, typename has_named_params<ParamSeq>::type() );
00123       
00124         msg["jsonrpc"] = "2.0";
00125       }
00126 
00127       template<typename R, typename ParamSeq>
00128       mace::cmt::future<R> call_fused( const std::string& method_name, const ParamSeq& param ) {
00129         json::value msg;
00130         msg["method"] = method_name;
00131         msg["id"]     = next_method_id();
00132 
00133         if( boost::fusion::size(param ) )
00134           pack_params( msg["params"], param, typename has_named_params<ParamSeq>::type() );
00135 
00136         typename pending_result_impl<R>::ptr pr = boost::make_shared<pending_result_impl<R> >(); 
00137 
00138         msg["jsonrpc"] = "2.0";
00139         send( msg, boost::static_pointer_cast<pending_result>(pr) );
00140         return pr->prom;
00141       }
00142       template<typename ParamSeq>
00143       void notice_fused( const std::string& method_name, const ParamSeq& param ) {
00144         json::value msg;
00145         msg["method"] = method_name;
00146         // TODO: JSON RCP 1.0 sets this to 'null' instead of being empty
00147         //msg["id"]     = next_method_id();
00148 
00149         // TODO: JSON RPC 1.0 does not allow empty param
00150         if( boost::fusion::size(param ) )
00151           pack_params( msg["params"], param, typename has_named_params<ParamSeq>::type() );
00152 
00153         msg["jsonrpc"] = "2.0";
00154         send( msg );
00155       }
00156 
00157       boost::signal<void()> closed;
00158       
00159       void handle_call( const json::value& c, json::value& result );
00160       void handle_notice( const json::value& m );
00161 
00162 
00163       class pending_result {
00164         public:
00165           typedef boost::shared_ptr<pending_result> ptr;
00166           virtual ~pending_result(){}
00167           virtual void handle_result( connection& c, const json::value& data )       = 0;
00168           virtual void handle_error( const boost::exception_ptr& e  ) = 0;
00169       };
00170 
00171     protected:
00172        // change how params are packed based upon whether or not they are named params
00173        template<typename Seq>
00174        void pack_params( json::value& v, const Seq& s, const boost::true_type& is_named ) {
00175           function_filter f(*this);
00176           json::io::pack(f, v, boost::fusion::at_c<0>(s));
00177        }
00178        template<typename Seq>
00179        void pack_params( json::value& v, const Seq& s, const boost::false_type& is_named ) {
00180           function_filter f(*this);
00181           json::io::pack(f, v,s);
00182        }
00183 
00184 
00185       void break_promises();
00186       void handle_call(   const json::value& m );
00187       void handle_result( const json::value& m );
00188       void handle_error(  const json::value& m );
00189 
00190 
00191       virtual void send( const json::value& msg );
00192       virtual void send( const json::value& msg, const connection::pending_result::ptr& pr );
00193       virtual uint64_t next_method_id();
00194 
00195     private:
00196       friend class connection_private;
00197 
00198 
00199       template<typename R> 
00200       class pending_result_impl : public pending_result {
00201         public:
00202           pending_result_impl():prom(new mace::cmt::promise<R>()){}
00203           ~pending_result_impl() {
00204             if( !prom->ready() ) {
00205               prom->set_exception( boost::copy_exception( mace::cmt::error::broken_promise() ));
00206             }
00207           }
00208           typedef boost::shared_ptr<pending_result_impl> ptr;
00209           virtual void handle_result( connection& c, const json::value& data ) {
00210             R value;
00211             function_filter f(c);
00212             json::io::unpack( f, data, value );
00213             prom->set_value( value );
00214           }
00215           virtual void handle_error( const boost::exception_ptr& e  ) {
00216             prom->set_exception(e);
00217           }
00218           typename mace::cmt::promise<R>::ptr prom;
00219       };
00220       class connection_private* my;
00221   };
00222 
00223   namespace detail {
00224 
00225      template<typename Seq, typename Functor,bool NamedParams>
00226      struct rpc_recv_functor {
00227        rpc_recv_functor( Functor f )
00228        :m_func(f){ }
00229 
00230        json::value operator()( json::connection& c, const json::value& param ) {
00231          Seq paramv;
00232          if( boost::fusion::size(paramv) ) {
00233             if( !param.is_array() ) {
00234               MACE_RPC_THROW( "param value is not an array" );
00235             }
00236             connection::function_filter f(c);
00237             json::io::unpack( f, param, paramv );
00238          }
00239          json::value rtn;
00240          connection::function_filter f(c);
00241          json::io::pack( f, rtn, m_func(paramv) );
00242          return rtn;
00243        }
00244 
00245        Functor m_func;
00246      };
00247 
00251      template<typename Seq, typename Functor>
00252      struct rpc_recv_functor<Seq,Functor,true> {
00253        rpc_recv_functor( Functor f )
00254        :m_func(f){  }
00255 
00256        json::value operator()( json::connection& c, const json::value& param ) {
00257          Seq paramv;
00258          if( param.is_array() ) {
00259             json::io::unpack( param, paramv );
00260          }
00261          else if( param.is_object() ) {
00262             connection::function_filter f(c);
00263             json::io::unpack( f, param, boost::fusion::at_c<0>(paramv) );
00264          } else {
00265             MACE_RPC_THROW( "param value is not an object or array" );
00266          }
00267          json::value rtn;
00268          connection::function_filter f(c);
00269          json::io::pack( f, rtn, m_func(paramv) );
00270          return rtn;
00271        }
00272 
00273        Functor m_func;
00274      };
00275   }
00276 
00277 
00278   template<typename Signature>
00279   std::string connection::function_filter::operator()( const boost::function<Signature>& f ) {
00280      typedef typename boost::function_types::parameter_types<Signature>::type  mpl_param_types;
00281      typedef typename boost::fusion::result_of::as_vector<mpl_param_types>::type param_types;
00282      typedef typename boost::function_types::result_type<Signature>::type  R;
00283      return m_con.add_method( detail::rpc_recv_functor<param_types,boost::function<R(param_types)>,false>( boost::fusion::make_fused_function_object(f) ) );
00284   }
00285 
00286 
00290   template<typename InterfaceType>
00291   struct add_interface_visitor {
00292     add_interface_visitor( rpc::json::connection& c, mace::stub::ptr<InterfaceType>& s )
00293     :m_con(c),m_aptr(s){}
00294   
00295     template<typename MemberPtr,  MemberPtr m>
00296     void operator()(const char* name )const  {
00297         typedef typename boost::function_types::result_type<MemberPtr>::type MemberRef;
00298         typedef typename boost::remove_reference<MemberRef>::type Member;
00299         typedef typename boost::fusion::traits::deduce_sequence<typename Member::fused_params>::type param_type;
00300         m_con.add_method( name, detail::rpc_recv_functor<param_type, Member&, 
00301                           has_named_params<typename Member::fused_params>::value >( (*m_aptr).*m ) );
00302     }
00303     rpc::json::connection&                   m_con;
00304     mace::stub::ptr<InterfaceType>& m_aptr;
00305   };
00306 
00307 
00308 } } } // mace::rpc::json
00309 
00310 #endif