MACE
1.0.0
|
00001 #ifndef _MACE_RPC_JSON_CONNECTION_HPP_ 00002 #define _MACE_RPC_JSON_CONNECTION_HPP_ 00003 #include <mace/rpc/json/value_io.hpp> 00004 #include <mace/rpc/error.hpp> 00005 #include <boost/function.hpp> 00006 #include <vector> 00007 #include <mace/cmt/thread.hpp> 00008 #include <boost/signals.hpp> 00009 #include <boost/fusion/include/size.hpp> 00010 #include <boost/fusion/include/front.hpp> 00011 #include <boost/fusion/support/deduce.hpp> 00012 #include <boost/fusion/include/make_fused_function_object.hpp> 00013 #include <boost/fusion/support/deduce_sequence.hpp> 00014 #include <boost/fusion/include/make_unfused.hpp> 00015 #include <mace/rpc/json/named_parameters.hpp> 00016 #include <boost/function_types/result_type.hpp> 00017 00018 00019 #include <mace/stub/ptr.hpp> 00020 00021 namespace mace { namespace rpc { 00026 namespace json { 00027 00028 typedef boost::error_info<struct json_rpc_error_,json::value> error; 00029 typedef boost::error_info<struct json_err_data_,std::string> err_data; 00030 typedef boost::error_info<struct json_err_code_,int64_t> err_code; 00031 00032 struct error_object : public virtual mace::rpc::exception { 00033 const char* what()const throw() { return message().c_str(); } 00034 virtual void rethrow()const { BOOST_THROW_EXCEPTION(*this); } 00035 00036 const std::string& data()const 00037 { return *boost::get_error_info<mace::rpc::json::err_data>(*this); } 00038 int64_t code()const 00039 { return *boost::get_error_info<mace::rpc::json::err_code>(*this); } 00040 }; 00041 00042 class connection; 00043 typedef boost::function<json::value( connection&, const json::value& param )> rpc_method; 00044 00053 class connection : public boost::enable_shared_from_this<connection> { 00054 public: 00059 struct function_filter { 00060 function_filter( connection& c ):m_con(c){} 00061 00062 template<typename T> 00063 const T& operator()( const T& v ) { return v; } 00064 00065 template<typename T> 00066 inline void operator()( const json::value& j, T& v ) { 00067 json::io::unpack( *this, j, v ); 00068 } 00069 template<typename Signature> 00070 inline void operator()( const json::value& j, boost::function<Signature> & v ) { 00071 typedef typename boost::function_types::parameter_types<Signature>::type mpl_param_types; 00072 typedef typename boost::fusion::result_of::as_vector<mpl_param_types>::type param_types; 00073 typedef typename boost::function_types::result_type<Signature>::type R; 00074 00075 mace::cmt::future<R> (connection::*cf)(const std::string&, const param_types&) = &connection::call_fused; 00076 v = boost::fusion::make_unfused(boost::bind( cf, &m_con, (const std::string&)j, _1 ) ); 00077 //return unpack( *this, j, v ); 00078 } 00079 00080 template<typename Signature> 00081 std::string operator()( const boost::function<Signature>& f ); 00082 00083 private: 00084 connection& m_con; 00085 }; 00086 00087 typedef boost::shared_ptr<connection> ptr; 00088 typedef boost::weak_ptr<connection> wptr; 00089 00093 connection( mace::cmt::thread* t = &mace::cmt::thread::current() ); 00094 ~connection(); 00095 00096 mace::cmt::thread* get_thread()const; 00097 00098 void add_method( const std::string& mid, const rpc_method& m ); 00099 00101 std::string add_method( const rpc_method& m ); 00102 00103 00104 #include <mace/rpc/json/detail/call_methods.hpp> 00105 00106 00107 template<typename ParamSeq> 00108 mace::cmt::future<json::value> call_fused( const std::string& method_name, const ParamSeq& param ) { 00109 json::value msg; 00110 create_call( method_name, param, msg ); 00111 msg["id"] = next_method_id(); 00112 typename pending_result_impl<json::value>::ptr pr = boost::make_shared<pending_result_impl<json::value> >(); 00113 send( msg, boost::static_pointer_cast<pending_result>(pr) ); 00114 return pr->prom; 00115 } 00116 00117 template<typename ParamSeq> 00118 void create_call( const std::string& method_name, const ParamSeq& param, json::value& msg ) { 00119 msg["method"] = method_name; 00120 00121 if( boost::fusion::size(param ) ) 00122 pack_params( msg["params"], param, typename has_named_params<ParamSeq>::type() ); 00123 00124 msg["jsonrpc"] = "2.0"; 00125 } 00126 00127 template<typename R, typename ParamSeq> 00128 mace::cmt::future<R> call_fused( const std::string& method_name, const ParamSeq& param ) { 00129 json::value msg; 00130 msg["method"] = method_name; 00131 msg["id"] = next_method_id(); 00132 00133 if( boost::fusion::size(param ) ) 00134 pack_params( msg["params"], param, typename has_named_params<ParamSeq>::type() ); 00135 00136 typename pending_result_impl<R>::ptr pr = boost::make_shared<pending_result_impl<R> >(); 00137 00138 msg["jsonrpc"] = "2.0"; 00139 send( msg, boost::static_pointer_cast<pending_result>(pr) ); 00140 return pr->prom; 00141 } 00142 template<typename ParamSeq> 00143 void notice_fused( const std::string& method_name, const ParamSeq& param ) { 00144 json::value msg; 00145 msg["method"] = method_name; 00146 // TODO: JSON RCP 1.0 sets this to 'null' instead of being empty 00147 //msg["id"] = next_method_id(); 00148 00149 // TODO: JSON RPC 1.0 does not allow empty param 00150 if( boost::fusion::size(param ) ) 00151 pack_params( msg["params"], param, typename has_named_params<ParamSeq>::type() ); 00152 00153 msg["jsonrpc"] = "2.0"; 00154 send( msg ); 00155 } 00156 00157 boost::signal<void()> closed; 00158 00159 void handle_call( const json::value& c, json::value& result ); 00160 void handle_notice( const json::value& m ); 00161 00162 00163 class pending_result { 00164 public: 00165 typedef boost::shared_ptr<pending_result> ptr; 00166 virtual ~pending_result(){} 00167 virtual void handle_result( connection& c, const json::value& data ) = 0; 00168 virtual void handle_error( const boost::exception_ptr& e ) = 0; 00169 }; 00170 00171 protected: 00172 // change how params are packed based upon whether or not they are named params 00173 template<typename Seq> 00174 void pack_params( json::value& v, const Seq& s, const boost::true_type& is_named ) { 00175 function_filter f(*this); 00176 json::io::pack(f, v, boost::fusion::at_c<0>(s)); 00177 } 00178 template<typename Seq> 00179 void pack_params( json::value& v, const Seq& s, const boost::false_type& is_named ) { 00180 function_filter f(*this); 00181 json::io::pack(f, v,s); 00182 } 00183 00184 00185 void break_promises(); 00186 void handle_call( const json::value& m ); 00187 void handle_result( const json::value& m ); 00188 void handle_error( const json::value& m ); 00189 00190 00191 virtual void send( const json::value& msg ); 00192 virtual void send( const json::value& msg, const connection::pending_result::ptr& pr ); 00193 virtual uint64_t next_method_id(); 00194 00195 private: 00196 friend class connection_private; 00197 00198 00199 template<typename R> 00200 class pending_result_impl : public pending_result { 00201 public: 00202 pending_result_impl():prom(new mace::cmt::promise<R>()){} 00203 ~pending_result_impl() { 00204 if( !prom->ready() ) { 00205 prom->set_exception( boost::copy_exception( mace::cmt::error::broken_promise() )); 00206 } 00207 } 00208 typedef boost::shared_ptr<pending_result_impl> ptr; 00209 virtual void handle_result( connection& c, const json::value& data ) { 00210 R value; 00211 function_filter f(c); 00212 json::io::unpack( f, data, value ); 00213 prom->set_value( value ); 00214 } 00215 virtual void handle_error( const boost::exception_ptr& e ) { 00216 prom->set_exception(e); 00217 } 00218 typename mace::cmt::promise<R>::ptr prom; 00219 }; 00220 class connection_private* my; 00221 }; 00222 00223 namespace detail { 00224 00225 template<typename Seq, typename Functor,bool NamedParams> 00226 struct rpc_recv_functor { 00227 rpc_recv_functor( Functor f ) 00228 :m_func(f){ } 00229 00230 json::value operator()( json::connection& c, const json::value& param ) { 00231 Seq paramv; 00232 if( boost::fusion::size(paramv) ) { 00233 if( !param.is_array() ) { 00234 MACE_RPC_THROW( "param value is not an array" ); 00235 } 00236 connection::function_filter f(c); 00237 json::io::unpack( f, param, paramv ); 00238 } 00239 json::value rtn; 00240 connection::function_filter f(c); 00241 json::io::pack( f, rtn, m_func(paramv) ); 00242 return rtn; 00243 } 00244 00245 Functor m_func; 00246 }; 00247 00251 template<typename Seq, typename Functor> 00252 struct rpc_recv_functor<Seq,Functor,true> { 00253 rpc_recv_functor( Functor f ) 00254 :m_func(f){ } 00255 00256 json::value operator()( json::connection& c, const json::value& param ) { 00257 Seq paramv; 00258 if( param.is_array() ) { 00259 json::io::unpack( param, paramv ); 00260 } 00261 else if( param.is_object() ) { 00262 connection::function_filter f(c); 00263 json::io::unpack( f, param, boost::fusion::at_c<0>(paramv) ); 00264 } else { 00265 MACE_RPC_THROW( "param value is not an object or array" ); 00266 } 00267 json::value rtn; 00268 connection::function_filter f(c); 00269 json::io::pack( f, rtn, m_func(paramv) ); 00270 return rtn; 00271 } 00272 00273 Functor m_func; 00274 }; 00275 } 00276 00277 00278 template<typename Signature> 00279 std::string connection::function_filter::operator()( const boost::function<Signature>& f ) { 00280 typedef typename boost::function_types::parameter_types<Signature>::type mpl_param_types; 00281 typedef typename boost::fusion::result_of::as_vector<mpl_param_types>::type param_types; 00282 typedef typename boost::function_types::result_type<Signature>::type R; 00283 return m_con.add_method( detail::rpc_recv_functor<param_types,boost::function<R(param_types)>,false>( boost::fusion::make_fused_function_object(f) ) ); 00284 } 00285 00286 00290 template<typename InterfaceType> 00291 struct add_interface_visitor { 00292 add_interface_visitor( rpc::json::connection& c, mace::stub::ptr<InterfaceType>& s ) 00293 :m_con(c),m_aptr(s){} 00294 00295 template<typename MemberPtr, MemberPtr m> 00296 void operator()(const char* name )const { 00297 typedef typename boost::function_types::result_type<MemberPtr>::type MemberRef; 00298 typedef typename boost::remove_reference<MemberRef>::type Member; 00299 typedef typename boost::fusion::traits::deduce_sequence<typename Member::fused_params>::type param_type; 00300 m_con.add_method( name, detail::rpc_recv_functor<param_type, Member&, 00301 has_named_params<typename Member::fused_params>::value >( (*m_aptr).*m ) ); 00302 } 00303 rpc::json::connection& m_con; 00304 mace::stub::ptr<InterfaceType>& m_aptr; 00305 }; 00306 00307 00308 } } } // mace::rpc::json 00309 00310 #endif