From 042f1412c869197973c2ee329742f33bc97d525a Mon Sep 17 00:00:00 2001 From: ruanshudong Date: Sun, 9 Feb 2020 23:03:52 +0800 Subject: [PATCH] http2 compiler succ --- examples/HttpDemo/HttpClient/main.cpp | 99 ++++++------ examples/HttpDemo/HttpServer/HttpServer.cpp | 41 +++-- examples/HttpDemo/HttpServer/config.conf | 18 ++- servant/libservant/AppProtocol.cpp | 35 ++--- servant/libservant/Application.cpp | 144 +----------------- servant/libservant/ServantProxy.cpp | 1 + servant/servant/ServantHelper.h | 2 +- servant/servant/TarsCurrent.h | 66 +------- util/include/util/tc_http2session.h | 99 +++++------- util/include/util/tc_network_buffer.h | 23 ++- util/src/tc_http2session.cpp | 160 +++++++++++++------- util/src/tc_network_buffer.cpp | 1 + 12 files changed, 267 insertions(+), 422 deletions(-) diff --git a/examples/HttpDemo/HttpClient/main.cpp b/examples/HttpDemo/HttpClient/main.cpp index b714784..142661f 100644 --- a/examples/HttpDemo/HttpClient/main.cpp +++ b/examples/HttpDemo/HttpClient/main.cpp @@ -28,45 +28,11 @@ using namespace std; using namespace tars; using namespace tup; -// int main(int argc,char ** argv) -// { -// if(argc != 3) -// { -// cout << "usage: " << argv[0] << " ThreadNum CallTimes" << endl; -// return -1; -// } - -// try -// { -// tars::Int32 threads = TC_Common::strto(string(argv[1])); -// TC_ThreadPool tp; -// tp.init(threads); -// tp.start(); -// cout << "init tp succ" << endl; -// tars::Int32 times = TC_Common::strto(string(argv[2])); - -// for(int i = 0; i& headers, - // const std::string& body, - // HttpCallback* cb); -void asyncRpc(int c) +void syncRpc2(int c) +{ + int64_t t = TC_Common::now2us(); + + std::map header; + header[":authority"] = "domain.com"; + header[":scheme"] = "http"; + + std::map rheader; + //发起远程调用 + for (int i = 0; i < c; ++i) + { + string rbody; + + try + { + + param.servant2Prx->http_call("GET", "/", header, "helloworld", rheader, rbody); + + cout << "rsp:" << rbody << endl; + } + catch(exception& e) + { + cout << "exception:" << e.what() << endl; + } + ++callback_count; + } + + int64_t cost = TC_Common::now2us() - t; + cout << "syncRpc2 total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl; +} + +void asyncRpc2(int c) { int64_t t = TC_Common::now2us(); @@ -196,7 +192,7 @@ void asyncRpc(int c) try { - param.servantPrx->http_call_async(header, "helloworld", p); + param.servant2Prx->http_call_async(header, "helloworld", p); } catch(exception& e) { @@ -205,7 +201,7 @@ void asyncRpc(int c) } int64_t cost = TC_Common::now2us() - t; - cout << "asyncCall send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl; + cout << "asyncRpc2 send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl; } int main(int argc, char *argv[]) @@ -214,7 +210,7 @@ int main(int argc, char *argv[]) { if (argc < 4) { - cout << "Usage:" << argv[0] << "--count=1000 --call=[basehttp|synchttp|asynchttp] --thread=1" << endl; + cout << "Usage:" << argv[0] << "--count=1000 --call=[basehttp|synchttp|synchttp2|asynchttp2] --thread=1" << endl; return 0; } @@ -235,6 +231,7 @@ int main(int argc, char *argv[]) _comm->setProperty("asyncqueuecap", "1000000"); param.servantPrx = _comm->stringToProxy(httpObj); + param.servant2Prx = _comm->stringToProxy(http2Obj); param.servantPrx->tars_connect_timeout(5000); param.servantPrx->tars_async_timeout(60*1000); @@ -242,10 +239,12 @@ int main(int argc, char *argv[]) ProxyProtocol proto; proto.requestFunc = ProxyProtocol::http1Request; proto.responseFunc = ProxyProtocol::http1Response; - // proto.requestFunc = ProxyProtocol::http2Request; - // proto.responseFunc = ProxyProtocol::http2Response; param.servantPrx->tars_set_protocol(proto); + proto.requestFunc = ProxyProtocol::http2Request; + proto.responseFunc = ProxyProtocol::http2Response; + param.servant2Prx->tars_set_protocol(proto); + int64_t start = TC_Common::now2us(); std::function func; @@ -258,9 +257,17 @@ int main(int argc, char *argv[]) { func = syncRpc; } - else if(param.call == "asynchttp") + // else if(param.call == "asynchttp") + // { + // func = asyncRpc; + // } + else if (param.call == "synchttp2") { - func = asyncRpc; + func = syncRpc2; + } + else if(param.call == "asynchttp2") + { + func = asyncRpc2; } else { diff --git a/examples/HttpDemo/HttpServer/HttpServer.cpp b/examples/HttpDemo/HttpServer/HttpServer.cpp index 1847183..e8f55d8 100644 --- a/examples/HttpDemo/HttpServer/HttpServer.cpp +++ b/examples/HttpDemo/HttpServer/HttpServer.cpp @@ -16,38 +16,32 @@ #include "HttpServer.h" #include "HttpImp.h" +#include "Http2Imp.h" +#include "util/tc_http2session.h" using namespace std; HttpServer g_app; - -static TC_NetWorkBuffer::PACKET_TYPE parseHttp(TC_NetWorkBuffer &in, vector &out) +TC_NetWorkBuffer::PACKET_TYPE parseHttp2(TC_NetWorkBuffer&in, vector &out) { - vector buffer = in.getBuffers(); - cout << "parseHttp:" << buffer.data() << endl; - try + TC_Http2Session *session = (TC_Http2Session*)(in.getContextData()); + + if(session == NULL) { - bool b = TC_HttpRequest::checkRequest(buffer.data(), buffer.size()); - if(b) - { - out.swap(buffer); - in.clearBuffers(); - return TC_NetWorkBuffer::PACKET_FULL; - } - else - { - return TC_NetWorkBuffer::PACKET_LESS; - } - } - catch(exception &ex) - { - return TC_NetWorkBuffer::PACKET_ERR; + session = new TC_Http2Session(); + in.setContextData(session); + + TC_EpollServer::Connection *connection = (TC_EpollServer::Connection *)in.getConnection(); + Http2Imp::addHttp2Session(connection->getId(), session); } - return TC_NetWorkBuffer::PACKET_LESS; + cout << "parseHttp2:" << in.getBufferLength() << endl; + + return session->parse(in, out); } + void HttpServer::initialize() { @@ -55,8 +49,9 @@ HttpServer::initialize() //... addServant(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj"); - // addServantProtocol(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj",&TC_NetWorkBuffer::parseHttp); - addServantProtocol(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj",parseHttp); + addServant(ServerConfig::Application + "." + ServerConfig::ServerName + ".Http2Obj"); + addServantProtocol(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj",&TC_NetWorkBuffer::parseHttp); + addServantProtocol(ServerConfig::Application + "." + ServerConfig::ServerName + ".Http2Obj", &parseHttp2); } ///////////////////////////////////////////////////////////////// void diff --git a/examples/HttpDemo/HttpServer/config.conf b/examples/HttpDemo/HttpServer/config.conf index 2ec2d08..61952b2 100755 --- a/examples/HttpDemo/HttpServer/config.conf +++ b/examples/HttpDemo/HttpServer/config.conf @@ -51,20 +51,24 @@ #配置绑定端口 - #ip:port:timeout endpoint = tcp -h 0.0.0.0 -p 8081 -t 10000 - #允许的IP地址 allow = - #最大连接数 maxconns = 4096 - #当前线程个数 threads = 5 - #处理对象 servant = Test.HttpServer.HttpObj - #队列最大包个数 queuecap = 1000000 - protocol = not-tars + protocol = not-tars + + + endpoint = tcp -h 0.0.0.0 -p 8082 -t 10000 + allow = + maxconns = 4096 + threads = 5 + servant = Test.HttpServer.Http2Obj + queuecap = 1000000 + protocol = not-tars + diff --git a/servant/libservant/AppProtocol.cpp b/servant/libservant/AppProtocol.cpp index dca2d2e..8bedcf6 100644 --- a/servant/libservant/AppProtocol.cpp +++ b/servant/libservant/AppProtocol.cpp @@ -126,15 +126,14 @@ struct Http1Context TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http1Response(TC_NetWorkBuffer &in, ResponsePacket& rsp) { - void *contextData = in.getContextData(); + Http1Context *context = (Http1Context*)(in.getContextData()); - if(contextData == NULL) + if(context == NULL) { - contextData = in.setContextData(new Http1Context()); + context = new Http1Context(); + in.setContextData(context, [=]{ delete context; }); } - Http1Context *context = (Http1Context*)contextData; - context->buff.append(in.getBuffersString()); in.clearBuffers(); @@ -226,14 +225,11 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http1Response(TC_NetWorkBuffer &in, // ENCODE function, called by network thread vector ProxyProtocol::http2Request(RequestPacket& request, Transceiver *trans) { - cout << "http2Request" << endl; // TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(request.iRequestId); TC_NgHttp2* session = trans->getHttp2Session(); assert(session != NULL); - cout << "http2Request:" << session << endl; - if (session->getState() == TC_NgHttp2::None) { session->Init(); @@ -243,8 +239,6 @@ vector ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr assert (session->getState() == TC_NgHttp2::Http2); // return encodeHttp2(request, session); - cout << "http2Request1" << endl; - std::vector nva; const std::string method(":method"); @@ -266,7 +260,6 @@ vector ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr nva.push_back(nv); } - cout << "http2Request2" << endl; nghttp2_data_provider* pData = NULL; nghttp2_data_provider data; if (!request.sBuffer.empty()) @@ -288,10 +281,9 @@ vector ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr return vector(); } - cout << "http2Request3" << endl; request.iRequestId = sid; nghttp2_session_send(session->session()); - + // 交给tars发送 // std::string out; // out.swap(session->sendBuffer()); @@ -301,23 +293,24 @@ vector ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr out.assign(session->sendBuffer().begin(), session->sendBuffer().end()); - cout << "http2Request4:" << out.data() << endl; + cout << "iRequestId:" << request.iRequestId << ", size:" << out.size() << endl; + return out; } // TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, list& done, void* userptr) TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in, ResponsePacket& rsp) { + cout << "http2Response1:" << in.getBufferLength() << endl; TC_NgHttp2* session = ((Transceiver*)(in.getConnection()))->getHttp2Session(); - assert (session->getState() == TC_NgHttp2::Http2); + // assert (session->getState() == TC_NgHttp2::Http2); auto it = session->doneResponses().begin(); - if(it == session->doneResponses().end()) + if(it == session->doneResponses().end() && !in.empty()) { vector buffer = in.getBuffers(); - in.clearBuffers(); // Transceiver* userptr = ((Transceiver*))in->getConnection(); // int sessionId = userptr->getAdapterProxy()->getId(); @@ -330,20 +323,24 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in, // throw std::runtime_error("nghttp2_session_mem_recv return error"); return TC_NetWorkBuffer::PACKET_ERR; } + + in.moveHeader(readlen); } - it = session->doneResponses().begin(); - if(it == session->doneResponses().end()) + if(session->doneResponses().empty()) { return TC_NetWorkBuffer::PACKET_LESS; } + it = session->doneResponses().begin(); rsp.iRequestId = it->second.streamId; rsp.status = it->second.headers; rsp.sBuffer.assign(it->second.body.begin(), it->second.body.end()); session->doneResponses().erase(it); + cout << "http2Response2 size:" << session->doneResponses().size() << ", iRequestId:" << rsp.iRequestId << endl; + // std::map::const_iterator it(session->_doneResponses.begin()); // for (; it != session->_doneResponses.end(); ++ it) // { diff --git a/servant/libservant/Application.cpp b/servant/libservant/Application.cpp index ef0cfbc..4d0a7f8 100644 --- a/servant/libservant/Application.cpp +++ b/servant/libservant/Application.cpp @@ -137,59 +137,6 @@ CommunicatorPtr& Application::getCommunicator() return _communicator; } -// void Application::waitForQuit() -// { -// int64_t iLastCheckTime = TNOW; -// int64_t iNow = iLastCheckTime; - -// unsigned int iNetThreadNum = _epollServer->getNetThreadNum(); -// vector vNetThread = _epollServer->getNetThread(); - -// for (size_t i = 0; i < iNetThreadNum; ++i) -// { -// vNetThread[i]->start(); -// } - -// _epollServer->debug("server netthread num : " + TC_Common::tostr(iNetThreadNum)); - -// while(!_epollServer->isTerminate()) -// { -// { -// TC_ThreadLock::Lock sync(*_epollServer); -// _epollServer->timedWait(5000); -// } - -// iNow = TNOW; - -// if(iNow - iLastCheckTime > REPORT_SEND_QUEUE_INTERVAL) -// { -// iLastCheckTime = iNow; - -// size_t n = 0; -// for(size_t i = 0;i < iNetThreadNum; ++i) -// { -// n = n + vNetThread[i]->getSendRspSize(); -// } - -// if(_epollServer->_pReportRspQueue) -// { -// _epollServer->_pReportRspQueue->report(n); -// } -// } -// } - -// if(_epollServer->isTerminate()) -// { -// for(size_t i = 0; i < iNetThreadNum; ++i) -// { -// vNetThread[i]->terminate(); -// vNetThread[i]->getThreadControl().join(); -// } - -// _epollServer->stopThread(); -// } -// } - void reportRspQueue(TC_EpollServer *epollServer) { // TLOGDEBUG("Application::reportRspQueue" << endl); @@ -271,6 +218,7 @@ bool Application::cmdViewStatus(const string& command, const string& params, str return true; } + bool Application::cmdCloseCoreDump(const string& command, const string& params, string& result) { #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS @@ -437,43 +385,6 @@ bool Application::cmdLoadConfig(const string& command, const string& params, str return true; } -// bool Application::cmdConnections(const string& command, const string& params, string& result) -// { -// TLOGTARS("Application::cmdConnections:" << command << " " << params << endl); - -// ostringstream os; - -// os << OUT_LINE_LONG << endl; - -// map m = _epollServer->getListenSocketInfo(); - -// for(map::const_iterator it = m.begin(); it != m.end(); ++it) -// { -// vector v = it->second->getConnStatus(); - -// os << OUT_LINE << "\n" << outfill("[adater:" + it->second->getName() + "] [connections:" + TC_Common::tostr(v.size())+ "]") << endl; - -// os << outfill("conn-uid", ' ', 15) -// << outfill("ip:port", ' ', 25) -// << outfill("last-time", ' ', 25) -// << outfill("timeout", ' ', 10) << endl; - -// for(size_t i = 0; i < v.size(); i++) -// { -// os << outfill(TC_Common::tostr(v[i].uid), ' ', 15) -// << outfill(v[i].ip + ":" + TC_Common::tostr(v[i].port), ' ', 25) -// << outfill(TC_Common::tm2str(v[i].iLastRefreshTime,"%Y-%m-%d %H:%M:%S"), ' ', 25) -// << outfill(TC_Common::tostr(v[i].timeout), ' ', 10) << endl; -// } -// } -// os << OUT_LINE_LONG << endl; - -// result = os.str(); - -// return true; -// } - - bool Application::cmdConnections(const string& command, const string& params, string& result) { TLOGTARS("Application::cmdConnections:" << command << " " << params << endl); @@ -648,18 +559,6 @@ bool Application::cmdReloadLocator(const string& command, const string& params, return bSucc; } -// void Application::outAllAdapter(ostream &os) -// { -// map m = _epollServer->getListenSocketInfo(); - -// for(map::const_iterator it = m.begin(); it != m.end(); ++it) -// { -// outAdapter(os, ServantHelperManager::getInstance()->getAdapterServant(it->second->getName()),it->second); - -// os << OUT_LINE << endl; -// } -// } - void Application::outAllAdapter(ostream &os) { auto m = _epollServer->getListenSocketInfo(); @@ -705,12 +604,6 @@ bool Application::addAppConfig(const string &filename) return true; } -// void Application::setHandle(TC_EpollServer::BindAdapterPtr& adapter) -// { -// adapter->setHandle(); -// } - - void Application::main(int argc, char *argv[]) { TC_Option op; @@ -719,7 +612,6 @@ void Application::main(int argc, char *argv[]) } void Application::main(const TC_Option &option) -// void Application::main(int argc, char *argv[]) { try { @@ -730,8 +622,8 @@ void Application::main(const TC_Option &option) TC_Common::ignorePipe(); #endif //解析配置文件 - // parseConfig(argc, argv); parseConfig(option); + //初始化Proxy部分 initializeClient(); @@ -788,34 +680,6 @@ void Application::main(const TC_Option &option) } } - // //设置HandleGroup分组,启动线程 - // for (size_t i = 0; i < adapters.size(); ++i) - // { - // string name = adapters[i]->getName(); - - // string groupName = adapters[i]->getHandleGroupName(); - - // if(name != groupName) - // { - // TC_EpollServer::BindAdapterPtr ptr = _epollServer->getBindAdapter(groupName); - - // if (!ptr) - // { - // throw runtime_error("[TARS][adater `" + name + "` setHandle to group `" + groupName + "` fail!"); - // } - - // } - // setHandle(adapters[i]); - // } - - //启动业务处理线程 - // _epollServer->startHandle(); - // _epollServer->createEpoll(); - - // cout << "\n" << outfill("[initialize server] ", '.') << " [Done]" << endl; - - // cout << OUT_LINE_LONG << endl; - //动态加载配置文件 TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_LOAD_CONFIG, Application::cmdLoadConfig); @@ -888,9 +752,9 @@ void Application::main(const TC_Option &option) } catch (exception &ex) { - TarsRemoteNotify::getInstance()->report("exit: " + string(ex.what())); - cout << "[main exception]:" << ex.what() << endl; + + TarsRemoteNotify::getInstance()->report("exit: " + string(ex.what())); exit(-1); } diff --git a/servant/libservant/ServantProxy.cpp b/servant/libservant/ServantProxy.cpp index d171def..0e3913b 100644 --- a/servant/libservant/ServantProxy.cpp +++ b/servant/libservant/ServantProxy.cpp @@ -903,6 +903,7 @@ void ServantProxy::http_call(const std::string& method, msg->request.sFuncName = method; // 使用下面两个字段保存头部和包体 msg->request.context = headers; + msg->request.sBuffer.assign(body.begin(), body.end()); invoke(msg); diff --git a/servant/servant/ServantHelper.h b/servant/servant/ServantHelper.h index 5c8200e..f81ef86 100644 --- a/servant/servant/ServantHelper.h +++ b/servant/servant/ServantHelper.h @@ -182,7 +182,7 @@ protected: /** * 锁 */ - TC_SpinLock _mutex; + TC_SpinLock _mutex; /** * 是否染色 diff --git a/servant/servant/TarsCurrent.h b/servant/servant/TarsCurrent.h index 5061efc..b59e20e 100644 --- a/servant/servant/TarsCurrent.h +++ b/servant/servant/TarsCurrent.h @@ -223,23 +223,6 @@ public: */ void sendResponse(const char* buff, uint32_t len); - // /** - // * tars协议的发送响应数据(仅TARS协议有效) - // * @param iRet - // * @param status - // * @param buffer - // */ - // void sendResponse(int iRet, const vector& buffer = TARS_BUFFER(), - // const map& status = TARS_STATUS(), - // const string & sResultDesc = ""); - - // /** - // * 普通协议的发送响应数据(非TARS协议有效) - // * @param buff - // * @param len - // */ - // void sendResponse(const char* buff, uint32_t len); - protected: friend class ServantHandle; @@ -248,30 +231,20 @@ protected: /** * 初始化 - * @param stRecvData + * @param data */ - // void initialize(const TC_EpollServer::tagRecvData &stRecvData); void initialize(const shared_ptr &data); /** * 初始化 - * @param stRecvData - * @param beginTime + * @param data */ - // void initialize(const TC_EpollServer::tagRecvData &stRecvData, int64_t beginTime); void initializeClose(const shared_ptr &data); - /** - * 初始化 - * @param stRecvData - */ - // void initializeClose(const TC_EpollServer::tagRecvData &stRecvData); - /** * 初始化 * @param sRecvBuffer */ - // void initialize(const string &sRecvBuffer); void initialize(const vector &sRecvBuffer); /** @@ -300,31 +273,6 @@ protected: */ shared_ptr _data; - // /** - // * 消息_bindAdapter - // */ - // TC_EpollServer::BindAdapter* _bindAdapter; - - // /** - // * 唯一标识 - // */ - // uint32_t _uid; - - // /** - // * ip地址 - // */ - // string _ip; - - // /** - // * 端口 - // */ - // int _port; - - // /** - // * 用于回包时选择网络线程 - // */ - // int _fd; - /** * 客户端请求包 */ @@ -335,11 +283,6 @@ protected: */ bool _response; - // /** - // * 收到请求时间 - // */ - // int64_t _begintime; - /** * 接口处理的返回值 */ @@ -350,11 +293,6 @@ protected: */ bool _reportStat; - /** - * 连接关闭的类型,初始值是-1,非法值 - */ - // int _closeType; - /** * 设置额外返回的内容 */ diff --git a/util/include/util/tc_http2session.h b/util/include/util/tc_http2session.h index 480cd10..fb7a938 100644 --- a/util/include/util/tc_http2session.h +++ b/util/include/util/tc_http2session.h @@ -6,6 +6,8 @@ #include "util/tc_thread.h" #include "util/tc_autoptr.h" #include "util/tc_http.h" +#include "util/tc_network_buffer.h" +#include "util/tc_spin_lock.h" #include "nghttp2/nghttp2.h" namespace tars @@ -35,6 +37,10 @@ class TC_Http2Session: public TC_HandleBase { public: + TC_Http2Session(); + + ~TC_Http2Session(); + struct Http2Response { int status; @@ -43,66 +49,35 @@ public: string body; }; - int parse(string &in, string &out); + /** + * get all http2 request id + * @param in + * @param out + * @return + */ + static int doRequest(const vector &request, vector& vtReqid); - int getRequest(const vector &request, vector& vtReqid); + /** + * http2 + * @param in + * @param out + * @return + */ + TC_NetWorkBuffer::PACKET_TYPE parse(TC_NetWorkBuffer&in, vector &out); - int doResopnse(int32_t reqid, const Http2Response &response, vector& out); - - int getMethod(int32_t reqid, Req_Type &method) - { - TC_ThreadLock::Lock lock(reqLock_); - map::iterator it = mReq_.find(reqid); - if (it != mReq_.end()) - method = it->second.method; - else - return -1; - - return 0; - } - - int getUri(int32_t reqid, string &uri) - { - TC_ThreadLock::Lock lock(reqLock_); - map::iterator it = mReq_.find(reqid); - if (it != mReq_.end()) - uri = it->second.uri; - else - return -1; - - return 0; - } - - int getHeader(int32_t reqid, TC_Http::http_header_type &header) - { - TC_ThreadLock::Lock lock(reqLock_); - map::iterator it = mReq_.find(reqid); - if (it != mReq_.end()) - header = it->second.header; - else - return -1; - - return 0; - } - - int getBody(int32_t reqid, string &body) - { - TC_ThreadLock::Lock lock(reqLock_); - map::iterator it = mReq_.find(reqid); - if (it != mReq_.end()) - body = it->second.body; - else - return -1; - - return 0; - } + int doResponse(int32_t reqid, const Http2Response &response, vector& out); int doRequest(const vector &request, vector& response); - void setResponseFunc(ResponseFunc func) - { - responseFunc_ = func; - } + void setResponseFunc(ResponseFunc func) { responseFunc_ = func; } + + int getMethod(int32_t reqid, Req_Type &method); + + int getUri(int32_t reqid, string &uri); + + int getHeader(int32_t reqid, TC_Http::http_header_type &header); + + int getBody(int32_t reqid, string &body); struct RequestPack { @@ -126,17 +101,17 @@ public: unsigned int readPos; }; - TC_Http2Session(); - ~TC_Http2Session(); - - TC_ThreadLock responseBufLock_; + TC_SpinLock responseBufLock_; string responseBuf_; - TC_ThreadLock reqLock_; + TC_SpinLock reqLock_; map mReq_; - string reqout_; + vector reqout_; + +protected: + private: @@ -153,7 +128,7 @@ private: bool bNewCon_; - TC_ThreadLock nghttpLock; + TC_SpinLock nghttpLock; // bool bOldVersion_; // bool bUpgrade_; diff --git a/util/include/util/tc_network_buffer.h b/util/include/util/tc_network_buffer.h index 53a93af..eab7525 100755 --- a/util/include/util/tc_network_buffer.h +++ b/util/include/util/tc_network_buffer.h @@ -128,6 +128,18 @@ public: */ TC_NetWorkBuffer(void *connection) { _connection = connection; } + /** + * deconstruct + * @param buff + */ + ~TC_NetWorkBuffer() + { + if(_deconstruct) + { + _deconstruct(); + } + } + /** * 获取connection, 不同服务模型中获取的对象不一样, 需要自己强制转换 * @param buff @@ -138,7 +150,7 @@ public: * 设置上下文数据, 可以业务存放数据 * @param buff */ - void* setContextData(void *contextData) { _contextData = contextData; return _contextData; } + void setContextData(void *contextData, std::function deconstruct = std::function() ) { _contextData = contextData; _deconstruct = deconstruct; } /** * 获取上下文数据, 给业务存放数据 @@ -311,7 +323,7 @@ public: } /** - * http协议判读 + * http1 * @param in * @param out * @return @@ -319,7 +331,7 @@ public: static TC_NetWorkBuffer::PACKET_TYPE parseHttp(TC_NetWorkBuffer&in, vector &out); /** - * echo协议, 一般用于调试 + * echo * @param in * @param out * @return @@ -394,6 +406,11 @@ protected: */ void* _contextData = NULL; + /** + * deconstruct contextData + */ + std::function _deconstruct; + /** * buffer list */ diff --git a/util/src/tc_http2session.cpp b/util/src/tc_http2session.cpp index 336e97b..33270c6 100644 --- a/util/src/tc_http2session.cpp +++ b/util/src/tc_http2session.cpp @@ -46,7 +46,7 @@ static ssize_t send_callback(nghttp2_session *session, const uint8_t *data, { TC_Http2Session *ptr = (TC_Http2Session*)user_data; { - TC_ThreadLock::Lock lock(ptr->responseBufLock_); + TC_LockT lock(ptr->responseBufLock_); ptr->responseBuf_.append((char*)data, length); } //TLOGDEBUG("[send_callback] length:" << length << endl); @@ -62,7 +62,7 @@ static int on_header_callback(nghttp2_session *session, TC_Http2Session *ptr = (TC_Http2Session*)user_data; { - TC_ThreadLock::Lock lock(ptr->reqLock_); + TC_LockTlock(ptr->reqLock_); map::iterator it = ptr->mReq_.find(frame->hd.stream_id); if (it != ptr->mReq_.end()) { @@ -109,17 +109,17 @@ static int on_begin_headers_callback(nghttp2_session *session, } { - TC_ThreadLock::Lock lock(ptr->reqLock_); + TC_LockT lock(ptr->reqLock_); ptr->mReq_[frame->hd.stream_id].streamId = frame->hd.stream_id; } return 0; } -static int on_frame_recv_callback(nghttp2_session *session, - const nghttp2_frame *frame, void *user_data) +static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { //TLOGDEBUG("[on_frame_recv_callback] id:" << frame->hd.stream_id << " type:" << int(frame->hd.type) << endl); + cout << "[on_frame_recv_callback] id:" << frame->hd.stream_id << " type:" << int(frame->hd.type) << endl; TC_Http2Session *ptr = (TC_Http2Session*)user_data; @@ -133,19 +133,19 @@ static int on_frame_recv_callback(nghttp2_session *session, //TLOGDEBUG("[on_frame_recv_callback] NGHTTP2_FLAG_END_STREAM" << endl); { - TC_ThreadLock::Lock lock(ptr->reqLock_); + TC_LockT lock(ptr->reqLock_); map::iterator it = ptr->mReq_.find(frame->hd.stream_id); if (it != ptr->mReq_.end()) { it->second.bFinish = true; - //stream�Ѿ������ˣ��ж�ͷ���Ƿ���ȷ if(it->second.header.find(":method") != it->second.header.end() || it->second.header.find(":path") != it->second.header.end() || it->second.header.find(":scheme") != it->second.header.end()) { - TC_Http2Session::RequestPack *tmpptr = &(it->second); - ptr->reqout_.append((char*)&tmpptr, sizeof(TC_Http2Session::RequestPack *)); + cout << "insert reqout_" << endl; + char *tmpptr = (char*)&(it->second); + ptr->reqout_.insert(ptr->reqout_.end(), (char*)&tmpptr, (char*)&tmpptr + sizeof(TC_Http2Session::RequestPack *)); } } } @@ -165,7 +165,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, //TLOGDEBUG("[on_data_chunk_recv_callback] stream_id:" << stream_id << endl); TC_Http2Session *ptr = (TC_Http2Session*)user_data; { - TC_ThreadLock::Lock lock(ptr->reqLock_); + TC_LockT lock(ptr->reqLock_); map::iterator it = ptr->mReq_.find(stream_id); if (it != ptr->mReq_.end()) { @@ -175,15 +175,14 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, return 0; } -static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, - uint32_t error_code, void *user_data) +static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) { //TLOGDEBUG("[on_stream_close_callback] streamid:" << stream_id << endl); TC_Http2Session *ptr = (TC_Http2Session*)user_data; { - TC_ThreadLock::Lock lock(ptr->reqLock_); + TC_LockT lock(ptr->reqLock_); map::iterator it = ptr->mReq_.find(stream_id); if (it != ptr->mReq_.end()) { @@ -204,26 +203,18 @@ TC_Http2Session::TC_Http2Session():session_(NULL), bNewCon_(true) nghttp2_session_callbacks_set_send_callback(callbacks, send_callback); - nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, - on_frame_recv_callback); + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback); - nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, - on_data_chunk_recv_callback); + nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, on_data_chunk_recv_callback); - nghttp2_session_callbacks_set_on_stream_close_callback( - callbacks, on_stream_close_callback); + nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, on_stream_close_callback); - nghttp2_session_callbacks_set_on_header_callback(callbacks, - on_header_callback); + nghttp2_session_callbacks_set_on_header_callback(callbacks, on_header_callback); - nghttp2_session_callbacks_set_on_begin_headers_callback( - callbacks, on_begin_headers_callback); + nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, on_begin_headers_callback); nghttp2_session_server_new(&session_, callbacks, ((void*)this)); - //����remote_window_size - //��Ϊ���ڵ�nghttp2û�������������ڣ�ֻ����ô�� - //���nghttp2_session�Ľṹ���ˣ�������������� *(int32_t*)((char*)session_ + 2380) = 100000000; //TLOGDEBUG("window size:" << nghttp2_session_get_remote_window_size(session_) << endl); @@ -236,7 +227,7 @@ TC_Http2Session::~TC_Http2Session() nghttp2_session_del(session_); } -int TC_Http2Session::parse(string &in, string &out) +TC_NetWorkBuffer::PACKET_TYPE TC_Http2Session::parse(TC_NetWorkBuffer&in, vector &out) { if(bNewCon_) { @@ -246,30 +237,38 @@ int TC_Http2Session::parse(string &in, string &out) {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 100*1024*1024}}; nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, iv, ARRLEN(iv)); } - - int readlen = nghttp2_session_mem_recv(session_, (uint8_t *)in.c_str(), in.size()); + + vector buff = in.getBuffers(); + + int readlen = nghttp2_session_mem_recv(session_, (uint8_t *)buff.data(), buff.size()); + + cout << "parse:" << readlen << ", reqout_ size: " << reqout_.size() << endl; + if(readlen < 0) { return TC_NetWorkBuffer::PACKET_ERR; } else { - in = in.substr(readlen); + in.moveHeader(readlen); - out.clear(); - out.swap(reqout_); - if (out.empty()) + if (reqout_.empty()) + { return TC_NetWorkBuffer::PACKET_LESS; - else - return TC_NetWorkBuffer::PACKET_FULL; + } + + out.insert(out.end(), reqout_.begin(), reqout_.end()); + reqout_.clear(); } + + return TC_NetWorkBuffer::PACKET_FULL; } -int TC_Http2Session::getRequest(const vector &request, vector& vtReqid) +int TC_Http2Session::doRequest(const vector &request, vector& vtReqid) { vtReqid.clear(); - string resopnseAbout; + string responseAbout; TC_Http::http_header_type responseHeader; string sstatus; for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Session::RequestPack *)) @@ -283,10 +282,10 @@ int TC_Http2Session::getRequest(const vector &request, vector& vt return 0; } -int TC_Http2Session::doResopnse(int32_t reqid, const Http2Response &response, vector& out) +int TC_Http2Session::doResponse(int32_t reqid, const Http2Response &response, vector& out) { { - TC_ThreadLock::Lock lock(reqLock_); + TC_LockT lock(reqLock_); map::iterator it = mReq_.find(reqid); if (it == mReq_.end()) return -1; @@ -319,7 +318,7 @@ int TC_Http2Session::doResopnse(int32_t reqid, const Http2Response &response, ve data_prd.read_callback = str_read_callback; int ret ; { - TC_ThreadLock::Lock lock(nghttpLock); + TC_LockT lock(nghttpLock); ret = nghttp2_submit_response(session_, reqid, hdrs, response.header.size()+1, &data_prd); if (ret != 0) @@ -335,7 +334,7 @@ int TC_Http2Session::doResopnse(int32_t reqid, const Http2Response &response, ve delete [] hdrs; { - TC_ThreadLock::Lock lock(responseBufLock_); + TC_LockT lock(responseBufLock_); out.clear(); out.insert(out.end(), responseBuf_.begin(), responseBuf_.end()); @@ -343,7 +342,7 @@ int TC_Http2Session::doResopnse(int32_t reqid, const Http2Response &response, ve } { - TC_ThreadLock::Lock lock(reqLock_); + TC_LockT lock(reqLock_); mReq_.erase(reqid); } @@ -352,8 +351,8 @@ int TC_Http2Session::doResopnse(int32_t reqid, const Http2Response &response, ve int TC_Http2Session::doRequest(const vector &request, vector& response) { - int resopnseStatus = 0; - string resopnseAbout; + int responseStatus = 0; + string responseAbout; TC_Http::http_header_type responseHeader; string sstatus; for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Session::RequestPack *)) @@ -363,36 +362,36 @@ int TC_Http2Session::doRequest(const vector &request, vector& respon string sMethod = TC_Common::upper(TC_Common::trim(ptr->header.find(":method")->second)); if (sMethod == "GET") - resopnseStatus = REQUEST_GET; + responseStatus = REQUEST_GET; else if (sMethod == "POST") - resopnseStatus = REQUEST_POST; + responseStatus = REQUEST_POST; else if (sMethod == "OPTIONS") - resopnseStatus = REQUEST_OPTIONS; + responseStatus = REQUEST_OPTIONS; else if (sMethod == "HEAD") - resopnseStatus = REQUEST_HEAD; + responseStatus = REQUEST_HEAD; else if (sMethod == "PUT") - resopnseStatus = REQUEST_PUT; + responseStatus = REQUEST_PUT; else if (sMethod == "DELETE") - resopnseStatus = REQUEST_DELETE; + responseStatus = REQUEST_DELETE; else { continue; } sstatus = ptr->header.find(":path")->second; - resopnseAbout.clear(); + responseAbout.clear(); responseHeader.clear(); DataPack dataPack; dataPack.readPos = 0; - responseFunc_((Req_Type)resopnseStatus, + responseFunc_((Req_Type)responseStatus, sstatus, ptr->header, ptr->body, - resopnseStatus, - resopnseAbout, + responseStatus, + responseAbout, responseHeader, dataPack.dataBuf); - sstatus = TC_Common::tostr(resopnseStatus); + sstatus = TC_Common::tostr(responseStatus); const char* strstatus = ":status"; nghttp2_nv *hdrs = new nghttp2_nv[responseHeader.size() + 1]; @@ -416,7 +415,7 @@ int TC_Http2Session::doRequest(const vector &request, vector& respon data_prd.read_callback = str_read_callback; int ret ; { - TC_ThreadLock::Lock lock(nghttpLock); + TC_LockT lock(nghttpLock); ret = nghttp2_submit_response(session_, ptr->streamId, hdrs, responseHeader.size()+1, &data_prd); if (ret != 0) @@ -435,7 +434,7 @@ int TC_Http2Session::doRequest(const vector &request, vector& respon delete [] hdrs; { - TC_ThreadLock::Lock lock(reqLock_); + TC_LockT lock(reqLock_); mReq_.erase(ptr->streamId); } @@ -444,6 +443,53 @@ int TC_Http2Session::doRequest(const vector &request, vector& respon return 0; } +int TC_Http2Session::getMethod(int32_t reqid, Req_Type &method) +{ + TC_LockT lock(reqLock_); + map::iterator it = mReq_.find(reqid); + if (it != mReq_.end()) + method = it->second.method; + else + return -1; + + return 0; +} + +int TC_Http2Session::getUri(int32_t reqid, string &uri) +{ + TC_LockT lock(reqLock_); + map::iterator it = mReq_.find(reqid); + if (it != mReq_.end()) + uri = it->second.uri; + else + return -1; + + return 0; +} + +int TC_Http2Session::getHeader(int32_t reqid, TC_Http::http_header_type &header) +{ + TC_LockT lock(reqLock_); + map::iterator it = mReq_.find(reqid); + if (it != mReq_.end()) + header = it->second.header; + else + return -1; + + return 0; +} + +int TC_Http2Session::getBody(int32_t reqid, string &body) +{ + TC_LockT lock(reqLock_); + map::iterator it = mReq_.find(reqid); + if (it != mReq_.end()) + body = it->second.body; + else + return -1; + + return 0; +} } #endif diff --git a/util/src/tc_network_buffer.cpp b/util/src/tc_network_buffer.cpp index a7729a1..1e3e97b 100755 --- a/util/src/tc_network_buffer.cpp +++ b/util/src/tc_network_buffer.cpp @@ -339,6 +339,7 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseHttp(TC_NetWorkBuffer&in, v return b; } + TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseEcho(TC_NetWorkBuffer&in, vector &out) { try