This case study is designed to demonstrate the general usefulness of Boost.Reflect by implementing a simple generic RPC system based upon Boost.Serialization. This example shows what is possible, and how Boost.Reflect can serve as the foundation of a more general Boost RPC library.
The goal is to provide type-safe remote procedure calls. One solution to this problem is the RCF - Interprocess Communication for C++ library discussed at CodeProject.
Lets review their approach first:
Server:
RCF_BEGIN(I_Echo, "I_Echo") RCF_METHOD_R1(std::string, echo, const std::string &) RCF_END(I_Echo) struct Echo { std::string echo(const std::string &s) { return s; } }; int main() { Echo echo; RCF::RcfServer server(RCF::TcpEndpoint(50001)); server.bind<I_Echo>(echo); server.startInThisThread(); return 0; }
Client:
RCF_BEGIN(I_Echo, "I_Echo") RCF_METHOD_R1(std::string, echo, const std::string &) RCF_END(I_Echo) int main() { RcfClient<I_Echo> echoClient(RCF::TcpEndpoint("localhost", 50001)); std::string s = echoClient.echo(RCF::Twoway, "what's up"); return 0; }
While this only shows a simple example, it should communicate approximately what the API is.
Now lets compare what the target Boost.Reflect based API would be:
Server:
struct Echo { std::string echo(const std::string &s) { return s; } }; BOOST_Reflect_INTERFACE( Echo, BOOST_Reflect_BASE, (echo) ) int main() { boost::reflect::rpc_server<Echo> server( Echo() ); server.listen( 50001 ); return 0; }
Client:
struct Echo { std::string echo(const std::string &s); // implementation not needed }; BOOST_Reflect_INTERFACE( Echo, BOOST_Reflect_BASE, (echo) ) int main() { boost::reflect::rpc_client<Echo> client; client.connect_to( "localhost", 50001 ); std::string s = client.echo( "what's up" ); return 0; }
Some improvements offered by Boost.Reflect is that the return value and parameter types are only specified once. But the real power of Boost.Reflect is that it allows type erasure to apply to the rpc_client<> and thus we can turn this client in to a fully functional command line interface to a remote server with the following change:
cli m_cli; m_cli.start_visit(client); // erase the details of the rpc_client, by storing // it in a generic Echo interface. boost::reflect::any<Echo> any_echo = client; std::string line; std::string cmd; std::string args; while( true ) { std::cerr << "Enter Method: "; std::getline( std::cin, line ); cmd = line.substr( 0, line.find('(') ); args = line.substr( cmd.size(), line.size() ); std::cerr << m_cli[cmd](args) << std::endl; }
The server is a boost::reflect::any<InterfaceType> that also implements the visitor pattern. It creates a map from method name to a geneirc function that takes a serialized string for arguments and returns a serialized string as return value.
namespace boost { namespace reflect { template<typename InterfaceType> class rpc_server : public boost::reflect::visitor< rpc_server<InterfaceType> >, public reflect::any<InterfaceType> { public: template<typename T> rpc_server( T v ) :reflect::any<InterfaceType>(v) { // initialize ourself start_visit(*this); } // starts an endless loop waiting for commands void listen( uint16_t port ) { using namespace boost::asio::ip; boost::asio::io_service io_service; udp::socket socket( io_service, udp::endpoint(udp::v4(), port ) ); std::vector<char> recv_buf(2048); for( ;; ) { udp::endpoint remote_ep; boost::system::error_code err; size_t bytes_recv = socket.receive_from( boost::asio::buffer(recv_buf), remote_ep, 0, err ); if( err && err != boost::asio::error::message_size ) throw boost::system::system_error(err); // unpack the message which contains two strings, method name and params. std::string buf(&recv_buf.front(),bytes_recv ); std::string method; std::string params; { std::istringstream iss( buf ); boost::archive::binary_iarchive ia(iss); ia >> method; ia >> params; } boost::system::error_code ignored_error; // call methods[method] with params and reply with a message that contains // the serailized return value. socket.send_to( boost::asio::buffer( methods[method](params) ), remote_ep, 0, ignored_error ); } } // for each functor on the InterfaceType, create an rpc_functor // that converts serailized params to their actual values, invokes // the method, and converts the return value to a serailized buffer. template<typename InterfaceName, typename M> bool accept( M& m, const char* name ) { methods[name] = rpc_functor<typename M::fused_params, M&>(m); return true; } boost::function<std::string(const std::string)>& operator[]( const std::string& name ) { return methods[name]; } private: // Seq is a boost::fusion::vector<PARAM TYPES> // Functor is a reference to the member function object. template<typename Seq, typename Functor> struct rpc_functor { rpc_functor( Functor f ) :m_func(f){} // this method does the actual work of converting params to // the proper types, invoking the Functor, and returning the // serialized return value. std::string operator()( const std::string& params ) { Seq paramv; std::istringstream is(params); { boost::archive::binary_iarchive ia(is); deserialize_fusion_vector(ia,paramv); } std::ostringstream os; { boost::archive::binary_oarchive oa(os); typename boost::remove_reference<Functor>::type::result_type r = m_func(paramv); oa << r; } return os.str(); } Functor m_func; }; std::map<std::string, boost::function<std::string(const std::string)> > methods; }; } } // namespace boost::reflect
The RPC Client needs to define a new object that implements the interface in such a way that it sends the method name and serialized parameters over a socket. In this case, the implementation of the interface is fully dynamic.
template<typename InterfaceType> class rpc_client : public boost::reflect::visitor< rpc_client<InterfaceType> >, public reflect::any<InterfaceType> { public: rpc_client() :m_ios(),m_sock(m_ios) { start_visit(*this); } bool connect_to( const std::string& host, uint16_t port ) { m_sock.open(boost::asio::ip::udp::v4()); m_ep = boost::asio::ip::udp::endpoint( boost::asio::ip::address::from_string(host), port ); } // this method does the actual work of sending a method/params across the socket // it is called by the rpc_client::rpc_functor which serailizes the values. std::string invoke( const char* name, const std::string& params ) { std::ostringstream os; { boost::archive::binary_oarchive oa(os); std::string n(name); oa << n; oa << params; } m_sock.send_to( boost::asio::buffer( os.str() ), m_ep ); boost::asio::ip::udp::endpoint rep; std::vector<char> recv_buf(2048); size_t len = m_sock.receive_from( boost::asio::buffer(recv_buf), rep ); return std::string(&recv_buf.front(),len); } template<typename InterfaceName, typename M> bool accept( M& m, const char* name ) { m.m_delegate = rpc_functor<typename M::fused_params, typename M::result_type>(*this,name); return true; } private: // stores a pointer to the name and rpc_client and is responsible // for serialization/deserialization template<typename Seq, typename ResultType> struct rpc_functor { rpc_functor( rpc_client& c, const char* name ) :m_client(c),m_name(name){} // The input is the parameters passed to the method as a boost::fusion::vector<> // The result is the deserialized response from the remote client. ResultType operator()( const Seq& params ) { std::ostringstream os; { boost::archive::binary_oarchive oa(os); serialize_fusion_vector(oa, params); } ResultType ret_val; std::istringstream is(m_client.invoke( m_name, os.str() ) ); { boost::archive::binary_iarchive ia(is); ia >> ret_val; } return ret_val; } const char* m_name; rpc_client& m_client; }; boost::asio::ip::udp::endpoint m_ep; boost::asio::io_service m_ios; boost::asio::ip::udp::socket m_sock; std::map<std::string, boost::function<std::string(const std::string)> > methods; };
© Daniel Larimer 2010-2011 - Licensed under Boost Software License, Version 1.0 | Boost Reflect Library |