MACE  1.0.0
 All Classes Namespaces Files Functions Variables Enumerations Defines
libs/rpc/include/mace/rpc/tcp/server.hpp
00001 #ifndef _MACE_RPC_TCP_SERVER_HPP_
00002 #define _MACE_RPC_TCP_SERVER_HPP_
00003 #include <mace/rpc/server.hpp>
00004 #include <mace/cmt/asio/tcp/socket.hpp>
00005 #include <mace/cmt/asio.hpp>
00006 
00007 namespace mace { namespace rpc { namespace tcp {
00008 
00009   template<typename InterfaceType, typename ConnectionType>
00010   class server : public mace::rpc::server<InterfaceType,ConnectionType> {
00011     public:
00012       typedef ConnectionType            connection_type;
00013 
00014       template<typename SessionType>
00015       server( const boost::function<boost::shared_ptr<SessionType>()>& sg, uint16_t port )
00016       :mace::rpc::server<InterfaceType,ConnectionType>( sg ) {
00017         listen_complete = cmt::async( boost::bind(&server::listen,this,port) ); 
00018       }
00019 
00020       template<typename SessionType>
00021       server( const boost::shared_ptr<SessionType>& shared_session, uint16_t port )
00022       :mace::rpc::server<InterfaceType,ConnectionType>( shared_session ) {
00023         listen_complete = cmt::async( boost::bind(&server::listen,this,port) ); 
00024       }
00025 
00026       ~server() {
00027         try {
00028           if( acc ) acc->close();
00029           listen_complete.wait();
00030         }catch(...){
00031           elog( "%1%", boost::current_exception_diagnostic_information() );
00032         }
00033       }
00034 
00035     private:
00036             typedef cmt::asio::tcp::socket socket_t;
00037 
00038       void on_connection( const typename ConnectionType::ptr& c ) {
00039         c->closed.connect( boost::bind( &server::on_disconnect, this, c ) );
00040         connections[c] = this->sc->init_connection(c);
00041       }
00042       void on_disconnect( const typename ConnectionType::ptr& c ) {
00043         connections.erase( c );
00044       }
00045 
00046       void listen( uint16_t p ) {
00047         try {
00048           acc = boost::make_shared<boost::asio::ip::tcp::acceptor>( 
00049                     boost::ref(cmt::asio::default_io_service()), 
00050                     boost::asio::ip::tcp::endpoint( boost::asio::ip::tcp::v4(),p) );
00051 
00052           boost::system::error_code ec;
00053           do {
00054               socket_t::ptr iosp(new socket_t());
00055               ec = cmt::asio::tcp::accept( *acc, *iosp);
00056               if(!ec) {
00057                   cmt::async( boost::bind(&server::on_connection, this,
00058                               typename ConnectionType::ptr( new ConnectionType(iosp) ) )); 
00059               } else { 
00060                   elog( "%1%", boost::system::system_error(ec).what() );
00061               }
00062           }while( !ec );
00063           
00064         } catch (...) {
00065           elog( "%1%", boost::current_exception_diagnostic_information() );
00066         }
00067       }
00068 
00069       boost::shared_ptr<boost::asio::ip::tcp::acceptor>  acc;
00070       mace::cmt::future<void>                            listen_complete;
00071       std::map<typename ConnectionType::ptr,boost::any>  connections;
00072   };
00073 
00074 } } } // mace::rpc::tcp
00075 
00076 #endif