MACE
1.0.0
|
00001 #ifndef _MACE_RPC_HTTP_DETAIL_CONNECTION_HPP 00002 #define _MACE_RPC_HTTP_DETAIL_CONNECTION_HPP 00003 #include <mace/rpc/tcp/detail/connection.hpp> 00004 #include <mace/rpc/raw/message.hpp> 00005 #include <mace/rpc/raw/raw_io.hpp> 00006 #include <mace/rpc/http/request.hpp> 00007 #include <mace/rpc/http/request_parser.hpp> 00008 #include <mace/rpc/http/reply.hpp> 00009 #include <mace/rpc/error.hpp> 00010 #include <boost/algorithm/string.hpp> 00011 00012 #include <mace/cmt/asio/tcp/socket.hpp> 00013 #include <mace/cmt/asio.hpp> 00014 00015 namespace mace { namespace rpc { namespace http { namespace detail { 00027 class connection : public mace::rpc::detail::connection_base { 00028 public: 00029 connection(){} 00030 00031 template<typename String> 00032 connection( String&& host, uint16_t port = 80 ) 00033 :m_host( host + ":" + boost::lexical_cast<std::string>(port) ) { 00034 m_eps = mace::cmt::asio::tcp::resolve( host, port ); 00035 if( !m_eps.size() ) { 00036 MACE_RPC_THROW( "Unable to resolve host '%1%'", %m_host ); 00037 } 00038 } 00039 00040 connection( const boost::asio::ip::tcp::endpoint& ep ) { 00041 m_host = ep.address().to_string() + ":" + boost::lexical_cast<std::string>(ep.port()); 00042 m_eps.push_back(ep); 00043 } 00044 00045 00046 connection( const mace::cmt::asio::tcp::socket::ptr& sock ) 00047 :m_sock(sock){ 00048 if( m_sock ) { 00049 m_read_done = 00050 mace::cmt::async( boost::bind( &connection::read_request_loop, this ) ); 00051 } 00052 } 00053 00054 virtual ~connection() { 00055 try { close(); } catch ( ... ) { } 00056 } 00057 00058 void close() { 00059 if( m_sock ) { 00060 m_sock->close(); 00061 if( m_read_done.valid() ) { 00062 m_read_done.wait(); 00063 } 00064 m_sock.reset(); 00065 } 00066 } 00067 00068 virtual http::request pack_request( rpc::message&& m ) { return http::request(); } 00069 virtual rpc::message unpack_reply( http::reply&& r ) { return rpc::message(); } 00070 00075 virtual http::reply handle_request( http::request&& r ) { 00076 return http::reply(); 00077 } 00078 00085 http::reply send( http::request&& r ) { 00086 r.headers.push_back( header("Host", m_host) ); 00087 if( !m_sock ) { 00088 m_sock.reset( new mace::cmt::asio::tcp::socket() ); 00089 for( uint32_t i = 0; i < m_eps.size(); ++i ) { 00090 if( m_sock->connect( m_eps[i] ).wait() ) { 00091 std::swap( m_eps[0],m_eps[i]); 00092 } 00093 } 00094 MACE_RPC_THROW( "Unable to connect to host." ); 00095 } 00096 *m_sock << r; 00097 return read_reply(); 00098 } 00099 00100 00101 typedef mace::cmt::asio::tcp::socket::iterator itr_t; 00102 http::request read_request() { 00103 // read http request 00104 http::request r; 00105 http::request_parser p; 00106 boost::tribool result; 00107 itr_t end_parse; 00108 boost::tie(result,end_parse) = p.parse( r, itr_t(m_sock.get()), itr_t() ); 00109 return r; 00110 } 00111 00112 std::string read_line() { 00113 std::string l; 00114 itr_t i(m_sock.get()); 00115 while( *i != '\n' ) { 00116 if( *i != '\r' ) 00117 l+=*i; 00118 ++i; 00119 } 00120 return l; 00121 } 00122 00128 http::reply read_reply() { 00129 http::reply r; 00130 std::string line = read_line(); 00131 std::string ver = line.substr(0,line.find(' ')); 00132 std::stringstream ss(line.substr(ver.size()+1,line.size()) ); 00133 int code; 00134 ss >> code; 00135 r.status = (http::reply::status_type)code; 00136 line = read_line(); 00137 std::string name, value; 00138 00139 int content_len = 0; 00140 std::string content_length("content-length"); 00141 while( line.size() ){ 00142 if( line[0] != ' ' ) { 00143 name = line.substr(0,line.find(':') ); 00144 boost::to_lower(name); 00145 value = line.substr(name.size()+1); 00146 boost::trim(value); 00147 r.headers.push_back(header(std::move(name),std::move(value))); 00148 if( name == content_length ) { 00149 content_len = boost::lexical_cast<int>(value); 00150 } else if( name == "connection" ) { 00151 boost::to_lower(value); 00152 r.keep_alive = (value == "keep-alive"); 00153 } 00154 } else { 00155 if( r.headers.size() ) { 00156 r.headers.back().value += line; 00157 } 00158 } 00159 line = read_line(); 00160 } 00161 if( content_len ) { 00162 r.content.resize(content_len); 00163 m_sock->read( &r.content.front(), r.content.size() ); 00164 } 00165 00166 if( !r.keep_alive ) { 00167 m_sock->close(); 00168 m_sock.reset(); 00169 } 00170 return r; 00171 } 00172 00176 void read_rpc_reply() { 00177 // unpack it and send it to the handle method 00178 handle( unpack_reply( read_reply() ) ); 00179 } 00180 00181 virtual void send( rpc::message&& m ) { 00182 // assert message is a request.. 00183 // delegate packing the message content to 00184 // a derived class 00185 send( pack_request( std::move(m) ) ); 00186 00187 // read reply 00188 m_read_done = cmt::async( boost::bind(&connection::read_rpc_reply, this ) ); 00189 } 00190 00191 void read_request_loop( ) { 00192 try { 00193 while ( true ) { 00194 handle_request( read_request() ); 00195 } 00196 } catch ( ... ) { 00197 elog( "connection closed: %1%", 00198 boost::current_exception_diagnostic_information() ); 00199 } 00200 break_promises(); 00201 m_sock.reset(); 00202 } 00203 00204 void handle_error( message::error_type e, const std::string& msg ) { 00205 elog( "%1%: %2%", int(e), msg ); 00206 } 00207 00208 std::string m_host; 00209 std::vector<boost::asio::ip::tcp::endpoint> m_eps; 00210 cmt::future<void> m_read_done; 00211 mace::cmt::asio::tcp::socket::ptr m_sock; 00212 }; 00213 00214 } } } } // mace::rpc::http::detail 00215 00216 #endif // _MACE_RPC_RAW_TCP_DETAIL_CONNECTION_HPP