MACE
1.0.0
|
00001 #ifndef _MACE_RPC_TCP_DETAIL_CONNECTION_HPP_ 00002 #define _MACE_RPC_TCP_DETAIL_CONNECTION_HPP_ 00003 #include <boost/make_shared.hpp> 00004 00005 #include <mace/rpc/detail/connection_base.hpp> 00006 #include <mace/rpc/raw/message.hpp> 00007 #include <mace/rpc/raw/raw_io.hpp> 00008 #include <mace/rpc/error.hpp> 00009 00010 #include <mace/cmt/asio/tcp/socket.hpp> 00011 #include <mace/cmt/thread.hpp> 00012 00013 namespace mace { namespace rpc { namespace tcp { namespace detail { 00014 00015 namespace raw = mace::rpc::raw; 00016 00017 class connection : public mace::rpc::detail::connection_base { 00018 public: 00019 connection(){} 00020 00021 connection( const boost::asio::ip::tcp::endpoint& ep ) { 00022 connect( ep ); 00023 } 00024 connection( const mace::cmt::asio::tcp::socket::ptr& sock ) 00025 :m_sock(sock){ 00026 if( m_sock ) { 00027 m_read_done = mace::cmt::async( boost::bind( &connection::read_loop, this ) ); 00028 } 00029 } 00030 void close() { 00031 if( m_sock ) { 00032 m_sock->close(); 00033 m_read_done.wait(); 00034 m_sock.reset(); 00035 } 00036 } 00037 virtual ~connection() { 00038 try { close(); } catch ( ... ) { } 00039 } 00040 virtual void send_message( rpc::message&& m ) = 0; 00041 virtual rpc::message read_message() = 0; 00042 00043 void send( message&& m ) { 00044 if( m_sock ) { 00045 send_message( std::move(m) ); 00046 } else { 00047 MACE_RPC_THROW( "No Connection" ); 00048 } 00049 } 00050 00051 void read_loop( ) { 00052 try { 00053 while ( true ) { 00054 handle( read_message() ); 00055 } 00056 } catch ( ... ) { 00057 elog( "connection closed: %1%", 00058 boost::current_exception_diagnostic_information() ); 00059 } 00060 break_promises(); 00061 m_sock.reset(); 00062 } 00063 void connect( const boost::asio::ip::tcp::endpoint& ep ) { 00064 close(); 00065 try { 00066 m_sock = boost::make_shared<mace::cmt::asio::tcp::socket>(); 00067 m_sock->connect(ep).wait(); 00068 m_read_done = mace::cmt::async( boost::bind( &connection::read_loop, this ) ); 00069 } catch ( ... ) { 00070 m_sock.reset(); 00071 throw; 00072 } 00073 } 00074 void handle_error( message::error_type e, const std::string& msg ) { 00075 elog( "%1%: %2%", int(e), msg ); 00076 } 00077 cmt::future<void> m_read_done; 00078 mace::cmt::asio::tcp::socket::ptr m_sock; 00079 }; 00080 00081 00082 } } } } // mace::rpc::tcp::detail 00083 #endif // _MACE_RPC_TCP_CONNECTION_HPP_