MACE
1.0.0
|
00001 #ifndef _MACE_RPC_CONNECTION_HPP_ 00002 #define _MACE_RPC_CONNECTION_HPP_ 00003 #include <mace/rpc/connection_base.hpp> 00004 #include <boost/fusion/support/is_sequence.hpp> 00005 #include <boost/make_shared.hpp> 00006 #include <mace/rpc/detail/pending_result.hpp> 00007 #include <mace/rpc/filter.hpp> 00008 #include <mace/stub/ptr.hpp> 00009 #include <boost/fusion/support/deduce.hpp> 00010 #include <boost/fusion/support/deduce_sequence.hpp> 00011 00012 #include <boost/signals.hpp> 00013 00014 namespace mace { namespace rpc { 00015 00047 template<typename IODelegate > 00048 class connection : public connection_base { 00049 public: 00050 typedef boost::shared_ptr<connection> ptr; 00051 typedef IODelegate io_delegate_type; 00052 00053 boost::signal<void()> closed; 00054 00055 template<typename R, typename ParamSeq> 00056 cmt::future<R> call_fused( const std::string& id, ParamSeq&& params ) { 00057 return call_fused( std::string(id), std::move(params) ); 00058 } 00059 template<typename ParamSeq> 00060 void notice_fused( const std::string& id, ParamSeq&& params ) { 00061 notice_fused( std::string(id), std::move(params) ); 00062 } 00063 00064 template<typename R, typename ParamSeq> 00065 cmt::future<R> call_fused( std::string&& id, ParamSeq&& params ) { 00066 BOOST_STATIC_ASSERT( boost::fusion::traits::is_sequence<ParamSeq>::value ); 00067 // TODO: filter params for non-const references and add them as additional 'return values' 00068 // then pass the extra references to the pending_result impl. 00069 // References must remain valid until pr->prom->wait() returns. 00070 auto pr = boost::make_shared<detail::pending_result_impl<R,connection,IODelegate> >( boost::ref(*this), mace::cmt::promise<R>::make() ); 00071 function_filter<connection> f(*this); 00072 raw_call( std::move(id), IODelegate::pack(f, params), pr ); 00073 return pr->prom; 00074 } 00075 00076 template<typename ParamSeq> 00077 void notice_fused( std::string&& id, ParamSeq&& params ) { 00078 BOOST_STATIC_ASSERT( boost::fusion::traits::is_sequence<ParamSeq>::value ); 00079 function_filter<connection> f(*this); 00080 raw_call( std::move(id), IODelegate::pack(f, params), detail::pending_result::ptr() ); 00081 } 00082 00083 using connection_base::add_method; 00087 template<typename Signature> 00088 std::string add_method( const boost::function<Signature>& m ) { 00089 // TODO: convert m into a rpc::method and add it. 00090 return std::string(); 00091 } 00092 00096 template<typename Signature> 00097 boost::function<Signature> create_callback( const std::string& name ) { 00098 return boost::function<Signature>(); 00099 } 00100 00105 template<typename Seq, typename Functor> 00106 struct rpc_recv_functor { 00107 rpc_recv_functor( Functor f, connection& c ) 00108 :m_func(f),m_con(c){ } 00109 00110 message operator()( const message& m ) { 00111 message reply; 00112 reply.id = m.id; 00113 try { 00114 Seq paramv; 00115 if( boost::fusion::size(paramv) ) { 00116 function_filter<connection> f(m_con); 00117 if( m.id ) { 00118 slog( "%1%", m_func(IODelegate::template unpack<Seq, function_filter<connection> >( f, m.data )) ); 00119 reply.data = 00120 IODelegate::pack(f, m_func(IODelegate::template unpack<Seq, function_filter<connection> >( f, m.data )) ); 00121 } else { 00122 slog( "no id" ); 00123 m_func(IODelegate::template unpack<Seq,function_filter<connection> >( f, m.data )); 00124 } 00125 } 00126 } catch ( ... ) { 00127 if( m.id ) { 00128 function_filter<connection> f(m_con); 00129 reply.err = rpc::message::exception_thrown; 00130 reply.data = IODelegate::pack( f, boost::current_exception_diagnostic_information() ); 00131 } 00132 } 00133 return reply; 00134 } 00135 00136 Functor m_func; 00137 connection& m_con; 00138 }; 00139 00140 00141 00145 template<typename InterfaceType> 00146 struct add_interface_visitor { 00147 add_interface_visitor( connection& c, mace::stub::ptr<InterfaceType>& s ) 00148 :m_con(c),m_aptr(s){} 00149 00150 template<typename MemberPtr, MemberPtr m> 00151 void operator()(const char* name )const { 00152 typedef typename boost::function_types::result_type<MemberPtr>::type MemberRef; 00153 typedef typename boost::remove_reference<MemberRef>::type Member; 00154 typedef typename boost::fusion::traits::deduce_sequence<typename Member::fused_params>::type param_type; 00155 m_con.add_method( std::string(name), method(rpc_recv_functor<param_type, Member&>( (*m_aptr).*m, m_con )) ); 00156 } 00157 connection& m_con; 00158 mace::stub::ptr<InterfaceType>& m_aptr; 00159 }; 00160 00161 protected: 00162 connection( detail::connection_base* b ):connection_base(b){ slog( "cb: %1%", b ); } 00163 connection(); 00164 }; 00165 00166 } } 00167 00168 #endif