MACE  1.0.0
 All Classes Namespaces Files Functions Variables Enumerations Defines
libs/rpc/include/mace/rpc/http/detail/connection.hpp
00001 #ifndef _MACE_RPC_HTTP_DETAIL_CONNECTION_HPP
00002 #define _MACE_RPC_HTTP_DETAIL_CONNECTION_HPP
00003 #include <mace/rpc/tcp/detail/connection.hpp>
00004 #include <mace/rpc/raw/message.hpp>
00005 #include <mace/rpc/raw/raw_io.hpp>
00006 #include <mace/rpc/http/request.hpp>
00007 #include <mace/rpc/http/request_parser.hpp>
00008 #include <mace/rpc/http/reply.hpp>
00009 #include <mace/rpc/error.hpp>
00010 #include <boost/algorithm/string.hpp>
00011 
00012 #include <mace/cmt/asio/tcp/socket.hpp>
00013 #include <mace/cmt/asio.hpp>
00014 
00015 namespace mace { namespace rpc { namespace http {   namespace detail {
00027   class connection : public mace::rpc::detail::connection_base  {
00028     public:
00029       connection(){}
00030 
00031       template<typename String>
00032       connection( String&& host, uint16_t port = 80 )
00033       :m_host( host + ":" + boost::lexical_cast<std::string>(port) ) {
00034         m_eps = mace::cmt::asio::tcp::resolve( host, port );
00035         if( !m_eps.size() ) {
00036           MACE_RPC_THROW( "Unable to resolve host '%1%'", %m_host );
00037         }
00038       }
00039 
00040       connection( const boost::asio::ip::tcp::endpoint& ep ) {
00041         m_host = ep.address().to_string() + ":" + boost::lexical_cast<std::string>(ep.port());
00042         m_eps.push_back(ep);
00043       }
00044 
00045 
00046       connection( const mace::cmt::asio::tcp::socket::ptr& sock )
00047       :m_sock(sock){
00048         if( m_sock ) {
00049           m_read_done = 
00050             mace::cmt::async( boost::bind( &connection::read_request_loop, this ) );
00051         }
00052       }
00053 
00054       virtual ~connection() {
00055         try { close(); } catch ( ... ) { }
00056       }
00057 
00058       void close() {
00059         if( m_sock ) {
00060           m_sock->close();
00061           if( m_read_done.valid() ) {
00062             m_read_done.wait();
00063           }
00064           m_sock.reset();
00065         }
00066       }
00067 
00068       virtual http::request pack_request( rpc::message&& m ) { return http::request(); }
00069       virtual rpc::message  unpack_reply( http::reply&& r )  { return rpc::message();  }
00070 
00075       virtual http::reply handle_request( http::request&& r ) {
00076         return http::reply();
00077       }
00078 
00085       http::reply send( http::request&& r ) {
00086         r.headers.push_back( header("Host", m_host) );
00087         if( !m_sock ) {
00088           m_sock.reset( new mace::cmt::asio::tcp::socket() );
00089           for( uint32_t i = 0; i < m_eps.size(); ++i ) {
00090             if( m_sock->connect( m_eps[i] ).wait() ) {
00091               std::swap( m_eps[0],m_eps[i]);
00092             }
00093           }
00094           MACE_RPC_THROW( "Unable to connect to host." );
00095         }
00096         *m_sock << r;
00097         return read_reply();
00098       }
00099 
00100 
00101       typedef mace::cmt::asio::tcp::socket::iterator itr_t;
00102       http::request read_request() {
00103         // read http request
00104         http::request r;
00105         http::request_parser p;
00106         boost::tribool result;
00107         itr_t end_parse;
00108         boost::tie(result,end_parse) = p.parse( r, itr_t(m_sock.get()), itr_t() );
00109         return r;
00110       }
00111 
00112       std::string read_line() {
00113          std::string l;
00114          itr_t i(m_sock.get());
00115          while( *i != '\n' ) {
00116           if( *i != '\r' )
00117               l+=*i;
00118           ++i;
00119          }
00120          return l;
00121       }
00122 
00128       http::reply read_reply() {
00129         http::reply r;
00130         std::string line = read_line();
00131         std::string ver  = line.substr(0,line.find(' '));
00132         std::stringstream ss(line.substr(ver.size()+1,line.size()) );
00133         int code; 
00134         ss >> code;
00135         r.status = (http::reply::status_type)code;
00136         line = read_line();
00137         std::string name, value;
00138 
00139         int content_len = 0;
00140         std::string content_length("content-length");
00141         while( line.size() ){
00142           if( line[0] != ' ' ) {
00143               name = line.substr(0,line.find(':') );
00144               boost::to_lower(name);
00145               value = line.substr(name.size()+1);
00146               boost::trim(value);
00147               r.headers.push_back(header(std::move(name),std::move(value)));
00148               if( name == content_length ) {
00149                 content_len = boost::lexical_cast<int>(value);
00150               } else if( name == "connection" ) {
00151                 boost::to_lower(value);
00152                 r.keep_alive = (value == "keep-alive");
00153               }
00154           } else {
00155             if( r.headers.size() ) {
00156               r.headers.back().value += line;
00157             }
00158           }
00159           line = read_line();
00160         }
00161         if( content_len ) {
00162           r.content.resize(content_len);
00163           m_sock->read( &r.content.front(), r.content.size() );
00164         }
00165 
00166         if( !r.keep_alive ) {
00167           m_sock->close();
00168           m_sock.reset();
00169         }
00170         return r;
00171       }
00172 
00176       void read_rpc_reply() {
00177         // unpack it and send it to the handle method
00178         handle( unpack_reply( read_reply() ) );
00179       }
00180 
00181       virtual void send( rpc::message&& m ) {
00182         // assert message is a request..
00183         // delegate packing the message content to
00184         // a derived class
00185         send( pack_request( std::move(m) ) );
00186 
00187         // read reply
00188         m_read_done = cmt::async( boost::bind(&connection::read_rpc_reply, this ) );
00189       }
00190 
00191       void read_request_loop( ) {
00192         try {
00193           while ( true ) {
00194             handle_request( read_request() );
00195           }
00196         } catch ( ... ) {
00197           elog( "connection closed: %1%", 
00198           boost::current_exception_diagnostic_information() );
00199         }
00200         break_promises();
00201         m_sock.reset();
00202       }
00203       
00204       void handle_error( message::error_type e, const std::string& msg ) {
00205         elog( "%1%: %2%", int(e), msg );
00206       }
00207 
00208       std::string                                 m_host;
00209       std::vector<boost::asio::ip::tcp::endpoint> m_eps;
00210       cmt::future<void>                           m_read_done;
00211       mace::cmt::asio::tcp::socket::ptr           m_sock; 
00212   };
00213 
00214 } } } } // mace::rpc::http::detail
00215 
00216 #endif // _MACE_RPC_RAW_TCP_DETAIL_CONNECTION_HPP