MACE  1.0.0
 All Classes Namespaces Files Functions Variables Enumerations Defines
stub/examples/rpc.hpp

See RPC Client Example for an explanation of this example.

#ifndef _MACE_STUB_RPC_HPP_
#define _MACE_STUB_RPC_HPP_
#include <mace/stub/mirror_interface.hpp>
#include <mace/stub/ptr.hpp>

#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/system/system_error.hpp>
#include <boost/asio.hpp>
#include "vector_serialize.hpp"


namespace mace { namespace stub {

template<typename InterfaceType,typename InterfaceDelegate=mirror_interface>
class rpc_client : public stub::ptr<InterfaceType,InterfaceDelegate> {
  public:
    rpc_client()
    :m_ios(),m_sock(m_ios) {
      vtable_reflector<InterfaceType,InterfaceDelegate>::visit( set_visitor( *this ) );
    }

   bool connect_to( const std::string& host, uint16_t port ) {
       m_sock.open(boost::asio::ip::udp::v4());
       m_ep = boost::asio::ip::udp::endpoint( boost::asio::ip::address::from_string(host), port );
           return true;
   }

   std::string invoke( const char* name, const std::string& params ) {
     std::ostringstream os;
     boost::archive::binary_oarchive oa(os);
     std::string n(name);
     oa << n;
     oa << params;

     m_sock.send_to( boost::asio::buffer( os.str() ), m_ep );

     boost::asio::ip::udp::endpoint rep;
     std::vector<char> recv_buf(2048);
     size_t len = m_sock.receive_from( boost::asio::buffer(recv_buf), rep );
     return std::string(&recv_buf.front(),len);
   }
   private:
     struct set_visitor {
       set_visitor( rpc_client& ci )
       :c(ci){}

       template<typename MemberPtr, MemberPtr m>
       void operator()( const char* name )const {
         typedef typename boost::function_types::result_type<MemberPtr>::type member_ref;
         typedef typename boost::remove_reference<member_ref>::type member;
         ((*c).*m) = rpc_functor<typename member::fused_params, 
                                 typename member::result_type>( c, name );
       }
       rpc_client& c;
     };

     template<typename Seq, typename ResultType>
     struct rpc_functor {
       rpc_functor( rpc_client& c, const char* name )
       :m_client(c),m_name(name){}

       ResultType operator()( const Seq& params )const {
          // serialize the parameters
          std::ostringstream os; 
          boost::archive::binary_oarchive oa(os);
          serialize_fusion_vector(oa, params);
          // make a call and store the result into the input stream; 
          std::istringstream is(m_client.invoke( m_name, os.str() ) );

          // unpack the result type
          ResultType  ret_val;
          boost::archive::binary_iarchive ia(is);
          ia >> ret_val;
          return ret_val;
       }
       const char* m_name;
       rpc_client& m_client;
     };
       boost::asio::ip::udp::endpoint m_ep;
       boost::asio::io_service        m_ios;
       boost::asio::ip::udp::socket   m_sock;
};


class rpc_server {
    public:
       template<typename T>
       friend struct get_visitor;

       template<typename InterfaceType>
       rpc_server( const mace::stub::ptr<InterfaceType>& v )
       :stub_ptr(v) {
           mace::stub::ptr<InterfaceType>& i = 
              boost::any_cast<mace::stub::ptr<InterfaceType>&>(stub_ptr);

           mace::stub::vtable_reflector<InterfaceType>::visit( get_visitor<InterfaceType>(*i, *this ) );
       }

       void listen( uint16_t port ) {
            using namespace boost::asio::ip;
            boost::asio::io_service io_service;
            udp::socket  socket( io_service, udp::endpoint(udp::v4(), port ) );
            std::vector<char>  recv_buf(2048);
            for( ;; )
            {
                udp::endpoint remote_ep;
                boost::system::error_code err;
                size_t bytes_recv = socket.receive_from( boost::asio::buffer(recv_buf),
                                     remote_ep, 0, err );
                if( err && err != boost::asio::error::message_size )
                    throw boost::system::system_error(err);

                std::string         buf(&recv_buf.front(),bytes_recv );
                std::string         method;
                std::string         params;
                {
                    std::istringstream iss( buf );
                    boost::archive::binary_iarchive ia(iss);
                    ia >> method;
                    ia >> params;
                }
                boost::system::error_code ignored_error;
                socket.send_to( boost::asio::buffer( methods[method](params) ),
                                remote_ep, 0, ignored_error );
            }
       }

       boost::function<std::string(const std::string)>& operator[]( const std::string& name ) 
       { return methods[name]; }

    private:

      // rpc_server
       template<typename Seq, typename Functor>
       struct rpc_functor {
           rpc_functor( Functor f )
           :m_func(f){}

           std::string operator()( const std::string& params )const {
                Seq paramv;
                std::istringstream is(params);
                boost::archive::binary_iarchive ia(is);
                deserialize_fusion_vector(ia,paramv);                    

                std::ostringstream os;
                boost::archive::binary_oarchive oa(os);
                typename boost::remove_reference<Functor>::type::result_type r = m_func(paramv);
                oa << r;
                return os.str();
           }
           Functor m_func;
       };


       template<typename T>
       struct get_visitor {
          get_visitor( vtable<T>& vt, rpc_server& si )
          :v(vt),s(si){}

          template<typename MemberPtr, MemberPtr m>
          void operator()( const char* name )const {
            typedef typename boost::function_types::result_type<MemberPtr>::type member_ref;
            typedef typename boost::remove_reference<member_ref>::type member;
            s.methods[name] = rpc_functor<typename member::fused_params, 
                                BOOST_TYPEOF( v.*m )&>(v.*m);
          }
          vtable<T>& v;
          rpc_server& s;
       };

       std::map<std::string, boost::function<std::string(const std::string)> > methods;
       boost::any stub_ptr;
};

} } // namespace mace::stub

#endif