MACE
1.0.0
|
00001 #ifndef _MACE_RPC_TCP_SERVER_HPP_ 00002 #define _MACE_RPC_TCP_SERVER_HPP_ 00003 #include <mace/rpc/server.hpp> 00004 #include <mace/cmt/asio/tcp/socket.hpp> 00005 #include <mace/cmt/asio.hpp> 00006 00007 namespace mace { namespace rpc { namespace tcp { 00008 00009 template<typename InterfaceType, typename ConnectionType> 00010 class server : public mace::rpc::server<InterfaceType,ConnectionType> { 00011 public: 00012 typedef ConnectionType connection_type; 00013 00014 template<typename SessionType> 00015 server( const boost::function<boost::shared_ptr<SessionType>()>& sg, uint16_t port ) 00016 :mace::rpc::server<InterfaceType,ConnectionType>( sg ) { 00017 listen_complete = cmt::async( boost::bind(&server::listen,this,port) ); 00018 } 00019 00020 template<typename SessionType> 00021 server( const boost::shared_ptr<SessionType>& shared_session, uint16_t port ) 00022 :mace::rpc::server<InterfaceType,ConnectionType>( shared_session ) { 00023 listen_complete = cmt::async( boost::bind(&server::listen,this,port) ); 00024 } 00025 00026 ~server() { 00027 try { 00028 if( acc ) acc->close(); 00029 listen_complete.wait(); 00030 }catch(...){ 00031 elog( "%1%", boost::current_exception_diagnostic_information() ); 00032 } 00033 } 00034 00035 private: 00036 typedef cmt::asio::tcp::socket socket_t; 00037 00038 void on_connection( const typename ConnectionType::ptr& c ) { 00039 c->closed.connect( boost::bind( &server::on_disconnect, this, c ) ); 00040 connections[c] = this->sc->init_connection(c); 00041 } 00042 void on_disconnect( const typename ConnectionType::ptr& c ) { 00043 connections.erase( c ); 00044 } 00045 00046 void listen( uint16_t p ) { 00047 try { 00048 acc = boost::make_shared<boost::asio::ip::tcp::acceptor>( 00049 boost::ref(cmt::asio::default_io_service()), 00050 boost::asio::ip::tcp::endpoint( boost::asio::ip::tcp::v4(),p) ); 00051 00052 boost::system::error_code ec; 00053 do { 00054 socket_t::ptr iosp(new socket_t()); 00055 ec = cmt::asio::tcp::accept( *acc, *iosp); 00056 if(!ec) { 00057 cmt::async( boost::bind(&server::on_connection, this, 00058 typename ConnectionType::ptr( new ConnectionType(iosp) ) )); 00059 } else { 00060 elog( "%1%", boost::system::system_error(ec).what() ); 00061 } 00062 }while( !ec ); 00063 00064 } catch (...) { 00065 elog( "%1%", boost::current_exception_diagnostic_information() ); 00066 } 00067 } 00068 00069 boost::shared_ptr<boost::asio::ip::tcp::acceptor> acc; 00070 mace::cmt::future<void> listen_complete; 00071 std::map<typename ConnectionType::ptr,boost::any> connections; 00072 }; 00073 00074 } } } // mace::rpc::tcp 00075 00076 #endif