MACE  1.0.0
 All Classes Namespaces Files Functions Variables Enumerations Defines
libs/rpc/include/mace/rpc/connection.hpp
00001 #ifndef _MACE_RPC_CONNECTION_HPP_
00002 #define _MACE_RPC_CONNECTION_HPP_
00003 #include <mace/rpc/connection_base.hpp>
00004 #include <boost/fusion/support/is_sequence.hpp>
00005 #include <boost/make_shared.hpp>
00006 #include <mace/rpc/detail/pending_result.hpp>
00007 #include <mace/rpc/filter.hpp>
00008 #include <mace/stub/ptr.hpp>
00009 #include <boost/fusion/support/deduce.hpp>
00010 #include <boost/fusion/support/deduce_sequence.hpp>
00011 
00012 #include <boost/signals.hpp>
00013 
00014 namespace mace { namespace rpc { 
00015 
00047   template<typename IODelegate >
00048   class connection : public connection_base {
00049     public:
00050       typedef boost::shared_ptr<connection> ptr;
00051       typedef IODelegate io_delegate_type;
00052 
00053       boost::signal<void()> closed;
00054 
00055       template<typename R, typename ParamSeq>
00056       cmt::future<R> call_fused( const std::string& id, ParamSeq&& params ) {
00057         return call_fused( std::string(id), std::move(params) );
00058       }
00059       template<typename ParamSeq>
00060       void notice_fused( const std::string& id, ParamSeq&& params ) {
00061         notice_fused( std::string(id), std::move(params) );
00062       }
00063 
00064       template<typename R, typename ParamSeq>
00065       cmt::future<R> call_fused( std::string&& id, ParamSeq&& params ) {
00066         BOOST_STATIC_ASSERT( boost::fusion::traits::is_sequence<ParamSeq>::value );
00067         // TODO: filter params for non-const references and add them as additional 'return values'
00068         //       then pass the extra references to the pending_result impl.
00069         //       References must remain valid until pr->prom->wait() returns.
00070         auto pr = boost::make_shared<detail::pending_result_impl<R,connection,IODelegate> >( boost::ref(*this), mace::cmt::promise<R>::make() );
00071         function_filter<connection> f(*this);
00072         raw_call( std::move(id), IODelegate::pack(f, params), pr );
00073         return pr->prom;
00074       }
00075 
00076       template<typename ParamSeq>
00077       void notice_fused( std::string&& id, ParamSeq&& params ) {
00078         BOOST_STATIC_ASSERT( boost::fusion::traits::is_sequence<ParamSeq>::value );
00079         function_filter<connection> f(*this);
00080         raw_call( std::move(id), IODelegate::pack(f, params), detail::pending_result::ptr() );
00081       }
00082 
00083       using connection_base::add_method;
00087       template<typename Signature>
00088       std::string add_method( const boost::function<Signature>& m ) {
00089         // TODO:  convert m into a rpc::method and add it.
00090         return std::string();
00091       }
00092 
00096       template<typename Signature>
00097       boost::function<Signature> create_callback( const std::string& name ) {
00098         return boost::function<Signature>();  
00099       }
00100 
00105       template<typename Seq, typename Functor>
00106       struct rpc_recv_functor {
00107         rpc_recv_functor( Functor f, connection& c )
00108         :m_func(f),m_con(c){ }
00109       
00110         message operator()( const message& m ) {
00111           message reply;
00112           reply.id = m.id;
00113           try {
00114             Seq paramv;
00115             if( boost::fusion::size(paramv) ) {
00116                function_filter<connection> f(m_con);
00117                if( m.id ) {
00118                   slog( "%1%", m_func(IODelegate::template unpack<Seq, function_filter<connection> >( f, m.data )) );
00119                   reply.data = 
00120                       IODelegate::pack(f, m_func(IODelegate::template unpack<Seq, function_filter<connection> >( f, m.data )) );
00121                } else {
00122                  slog( "no id" );
00123                  m_func(IODelegate::template unpack<Seq,function_filter<connection> >( f, m.data ));
00124                }
00125             }
00126           } catch ( ... ) {
00127             if( m.id ) {
00128               function_filter<connection> f(m_con);
00129               reply.err  = rpc::message::exception_thrown;
00130               reply.data = IODelegate::pack( f, boost::current_exception_diagnostic_information() );
00131             }
00132           }
00133           return reply;
00134         }
00135       
00136         Functor     m_func;
00137         connection& m_con;
00138       };
00139 
00140 
00141 
00145       template<typename InterfaceType>
00146       struct add_interface_visitor {
00147         add_interface_visitor( connection& c, mace::stub::ptr<InterfaceType>& s )
00148         :m_con(c),m_aptr(s){}
00149       
00150         template<typename MemberPtr,  MemberPtr m>
00151         void operator()(const char* name )const  {
00152             typedef typename boost::function_types::result_type<MemberPtr>::type MemberRef;
00153             typedef typename boost::remove_reference<MemberRef>::type Member;
00154             typedef typename boost::fusion::traits::deduce_sequence<typename Member::fused_params>::type param_type;
00155             m_con.add_method( std::string(name), method(rpc_recv_functor<param_type, Member&>( (*m_aptr).*m, m_con )) );
00156         }
00157         connection&                       m_con;
00158         mace::stub::ptr<InterfaceType>&   m_aptr;
00159       };
00160 
00161     protected:
00162       connection( detail::connection_base* b ):connection_base(b){ slog( "cb: %1%", b ); }
00163       connection();
00164   };
00165 
00166 } }
00167 
00168 #endif