MACE  1.0.0
 All Classes Namespaces Files Functions Variables Enumerations Defines
libs/rpc/include/mace/rpc/tcp/detail/connection.hpp
00001 #ifndef _MACE_RPC_TCP_DETAIL_CONNECTION_HPP_
00002 #define _MACE_RPC_TCP_DETAIL_CONNECTION_HPP_
00003 #include <boost/make_shared.hpp>
00004 
00005 #include <mace/rpc/detail/connection_base.hpp>
00006 #include <mace/rpc/raw/message.hpp>
00007 #include <mace/rpc/raw/raw_io.hpp>
00008 #include <mace/rpc/error.hpp>
00009 
00010 #include <mace/cmt/asio/tcp/socket.hpp>
00011 #include <mace/cmt/thread.hpp>
00012 
00013 namespace mace { namespace rpc { namespace tcp { namespace detail { 
00014 
00015   namespace raw = mace::rpc::raw;
00016 
00017   class connection : public mace::rpc::detail::connection_base {
00018     public:
00019       connection(){}
00020 
00021       connection( const boost::asio::ip::tcp::endpoint& ep ) {
00022         connect( ep );
00023       }
00024       connection( const mace::cmt::asio::tcp::socket::ptr& sock )
00025       :m_sock(sock){
00026         if( m_sock ) {
00027           m_read_done = mace::cmt::async( boost::bind( &connection::read_loop, this ) );
00028         }
00029       }
00030       void close() {
00031         if( m_sock ) {
00032           m_sock->close();
00033           m_read_done.wait();
00034           m_sock.reset();
00035         }
00036       }
00037       virtual ~connection() {
00038         try { close(); } catch ( ... ) { }
00039       }
00040       virtual void         send_message( rpc::message&& m ) = 0;
00041       virtual rpc::message read_message()                   = 0;
00042 
00043       void send( message&& m ) {
00044         if( m_sock ) {
00045             send_message( std::move(m) );
00046         } else {
00047           MACE_RPC_THROW( "No Connection" );
00048         }
00049       }
00050 
00051       void read_loop( ) {
00052         try {
00053           while ( true ) {
00054             handle( read_message() );
00055           }
00056         } catch ( ... ) {
00057           elog( "connection closed: %1%", 
00058                 boost::current_exception_diagnostic_information() );
00059         }
00060         break_promises();
00061         m_sock.reset();
00062       }
00063       void connect( const boost::asio::ip::tcp::endpoint& ep ) {
00064         close(); 
00065         try {
00066             m_sock = boost::make_shared<mace::cmt::asio::tcp::socket>();
00067             m_sock->connect(ep).wait();
00068             m_read_done = mace::cmt::async( boost::bind( &connection::read_loop, this ) );
00069         } catch ( ... ) {
00070           m_sock.reset();
00071           throw;
00072         }
00073       }
00074       void handle_error( message::error_type e, const std::string& msg ) {
00075         elog( "%1%: %2%", int(e), msg );
00076       }
00077       cmt::future<void>                  m_read_done;
00078       mace::cmt::asio::tcp::socket::ptr m_sock; 
00079   };
00080 
00081 
00082 } } } } // mace::rpc::tcp::detail
00083 #endif // _MACE_RPC_TCP_CONNECTION_HPP_