diff --git a/examples/HttpDemo/Http2Server/Http2Imp.cpp b/examples/HttpDemo/Http2Server/Http2Imp.cpp index 554176d..9b30c79 100644 --- a/examples/HttpDemo/Http2Server/Http2Imp.cpp +++ b/examples/HttpDemo/Http2Server/Http2Imp.cpp @@ -21,7 +21,7 @@ using namespace std; TC_SpinLock Http2Imp::_mutex; -unordered_map Http2Imp::_http2; +unordered_map> Http2Imp::_http2; ////////////////////////////////////////////////////// void Http2Imp::initialize() @@ -36,52 +36,77 @@ void Http2Imp::destroy() //destroy servant here: //... } - -void doRequestFunc(const TC_Http2Server::Req_Type reqtype, const string &requri, const TC_Http::http_header_type &reqHeader, const string &reqBody, TC_Http2Server::Http2Response &rsp) -{ - rsp.status = 200; - rsp.about = "OK"; - rsp.body = "response helloworld 2"; -} +// +//void doRequestFunc(const TC_Http2Server::Req_Type reqtype, const string &requri, const TC_Http::http_header_type &reqHeader, const string &reqBody, TC_Http2Server::Http2Response &rsp) +//{ +// rsp.status = 200; +// rsp.about = "OK"; +// rsp.body = "response helloworld 2"; +//} int Http2Imp::doRequest(TarsCurrentPtr current, vector &buffer) { + shared_ptr session = getHttp2(current->getUId()); - TC_Http2Server* session = getHttp2(current->getUId()); +// cout << "doRequest:" << session << ", buffer size:" << current->getRequestBuffer().size() << endl; - static bool flag = true; - if(flag) - { - //method 1: - vector vtReqid; - TC_Http2Server::doRequest(current->getRequestBuffer(), vtReqid); + vector contexts; - // cout << "doRequest size:" << vtReqid.size() << endl; + session->decodeRequest(contexts); - TC_Http2Server::Http2Response rsp; - rsp.status = 200; - rsp.about = "OK"; - rsp.body = "response helloworld 1"; +// cout << "doRequest context size:" << contexts.size() << endl; - for(size_t i = 0; i < vtReqid.size(); i++) - { - string rbody; - session->getBody(vtReqid[i], rbody); + for(size_t i = 0; i< contexts.size(); ++i) + { + TC_Http2Server::Http2Context & context = contexts[i]; + vector data; -// cout << vtReqid[i] << ", " << rbody << endl; + context.response.setHeader("X-Header", "TARS"); + context.response.setResponse(200, "OK", context.request.getContent()); - vector data; - session->doResponse(vtReqid[i], rsp, data); - buffer.insert(buffer.end(), data.begin(), data.end()); + int ret = session->encodeResponse(context, data); + if(ret != 0) + { + cout << "encodeResponse error:" << session->getErrMsg() << endl; + } + buffer.insert(buffer.end(), data.begin(), data.end()); + } +// cout << "doRequest buffer size:" << buffer.size() << endl; - } - } - else - { - //method 2: - session->doRequest(current->getRequestBuffer(), doRequestFunc, buffer); - } +// static bool flag = true; +// if(flag) +// { +// //method 1: +// vector vtReqid; +// TC_Http2Server::doRequest(current->getRequestBuffer(), vtReqid); +// +// // cout << "doRequest size:" << vtReqid.size() << endl; +// +// TC_Http2Server::Http2Response rsp; +// rsp.status = 200; +// rsp.about = "OK"; +// rsp.body = "response helloworld 1"; +// +// for(size_t i = 0; i < vtReqid.size(); i++) +// { +// string rbody; +// session->getBody(vtReqid[i], rbody); +// +//// cout << vtReqid[i] << ", " << rbody << endl; +// +// vector data; +// session->doResponse(vtReqid[i], rsp, data); +// buffer.insert(buffer.end(), data.begin(), data.end()); +// +// +// } +// } +// else +// { +// //method 2: +// session->doRequest(current->getRequestBuffer(), doRequestFunc, buffer); +// } // flag = !flag; @@ -90,6 +115,7 @@ int Http2Imp::doRequest(TarsCurrentPtr current, vector &buffer) int Http2Imp::doClose(TarsCurrentPtr current) { + cout << "doClose" << endl; delHttp2(current->getUId()); return 0; diff --git a/examples/HttpDemo/Http2Server/Http2Imp.h b/examples/HttpDemo/Http2Server/Http2Imp.h index 02504b1..39f49f0 100644 --- a/examples/HttpDemo/Http2Server/Http2Imp.h +++ b/examples/HttpDemo/Http2Server/Http2Imp.h @@ -57,7 +57,7 @@ public: */ int doClose(TarsCurrentPtr current); - static TC_Http2Server *getHttp2(uint32_t uid) + static shared_ptr getHttp2(uint32_t uid) { TC_LockT lock(_mutex); @@ -70,7 +70,7 @@ public: return NULL; } - static void addHttp2(uint32_t uid, TC_Http2Server* ptr) + static void addHttp2(uint32_t uid, const shared_ptr &ptr) { TC_LockT lock(_mutex); @@ -93,7 +93,7 @@ protected: static TC_SpinLock _mutex; - static unordered_map _http2; + static unordered_map> _http2; }; ///////////////////////////////////////////////////// #endif diff --git a/examples/HttpDemo/Http2Server/HttpServer.cpp b/examples/HttpDemo/Http2Server/HttpServer.cpp index 4208fed..eff7dd2 100644 --- a/examples/HttpDemo/Http2Server/HttpServer.cpp +++ b/examples/HttpDemo/Http2Server/HttpServer.cpp @@ -24,18 +24,27 @@ HttpServer g_app; TC_NetWorkBuffer::PACKET_TYPE parseHttp2(TC_NetWorkBuffer&in, vector &out) { - TC_Http2Server*session = (TC_Http2Server*)(in.getContextData()); + TC_Http2Server*sessionPtr = (TC_Http2Server*)(in.getContextData()); - if(session == NULL) + if(sessionPtr == NULL) { - session = new TC_Http2Server(); - in.setContextData(session, [=]{delete session;}); + shared_ptr session(new TC_Http2Server()); +// in.setContextData(session, [=]{delete session;}); + in.setContextData(session.get()); + + session->settings(3000); TC_EpollServer::Connection *connection = (TC_EpollServer::Connection *)in.getConnection(); Http2Imp::addHttp2(connection->getId(), session); + + sessionPtr = session.get(); } - return session->parse(in, out); + TC_NetWorkBuffer::PACKET_TYPE flag = sessionPtr->parse(in, out); + +// cout << "parseHttp2:" << session << ", out size:" << out.size() << endl; + + return flag; } diff --git a/examples/HttpDemo/Http2Server/config.conf b/examples/HttpDemo/Http2Server/config.conf index e147403..bacb72b 100755 --- a/examples/HttpDemo/Http2Server/config.conf +++ b/examples/HttpDemo/Http2Server/config.conf @@ -21,16 +21,16 @@ #合并回调线程和网络线程(以网络线程个数为准) mergenetasync = 0 #模块名称 - modulename = TestApp.HttpServer + modulename = TestApp.Http2Server - + #定义所有绑定的IP closecout = 0 #应用名称 app = TestApp #服务名称 - server = HttpServer + server = Http2Server #服务的数据目录,可执行文件,配置文件等 basepath = ./ datapath = ./ @@ -54,7 +54,7 @@ allow = maxconns = 4096 threads = 1 - servant = TestApp.HttpServer.Http2Obj + servant = TestApp.Http2Server.Http2Obj queuecap = 1000000 protocol = not-tars diff --git a/util/include/util/tc_http.h b/util/include/util/tc_http.h index e2263bd..0fea608 100755 --- a/util/include/util/tc_http.h +++ b/util/include/util/tc_http.h @@ -88,6 +88,9 @@ struct TC_HttpRequest_Exception : public TC_Http_Exception ~TC_HttpRequest_Exception() throw(){}; }; +class TC_HttpRequest; +class TC_HttpResponse; + /** * @brief 简单的URL解析类. * @@ -279,6 +282,8 @@ public: void specialize(); protected: + friend class TC_HttpRequest; + /** * @brief 换成URL. * @@ -524,6 +529,12 @@ public: const string &getContent() const { return _content; } /** + * @brief get body + * @return http body + */ + string &getContent() { return _content; } + + /** * @brief 设置http body(默认不修改content-length). * * @param content http body内容 @@ -1156,10 +1167,40 @@ public: int doRequest(TC_HttpResponse &stHttpRep, int iTimeout = 3000); /** - * @brief 请求类型. + * @brief get request type */ int requestType() const { return _requestType ; } + /** + * @brief set request type + */ + void setRequestType(int requestType) { _requestType = requestType ; } + + /** + * set method + * @param + * @return method invalid, throw exception + */ + void setMethod(const char * sMethod); + + /** + * set method + * @param + */ + void setPath(const char * sPath); + + /** + * set domain + * @param + */ + void setDomain(const char * sDomain); + + /** + * set schema + * @param + */ + void setScheme(const char * sScheme); + /** * @brief 是否是GET请求. * diff --git a/util/include/util/tc_http2.h b/util/include/util/tc_http2.h index f771813..af0b869 100644 --- a/util/include/util/tc_http2.h +++ b/util/include/util/tc_http2.h @@ -13,10 +13,25 @@ namespace tars class TC_Http2 { public: + /** + * constructor + */ TC_Http2(); + /** + * deconstructor + */ virtual ~TC_Http2(); + struct DataPack + { + DataPack(const char *data, size_t length) : _dataBuf(data), _length(length) {} + + const char* _dataBuf; + size_t _length; + size_t _readPos = 0; + }; + /** * @brief setting */ @@ -39,11 +54,23 @@ public: */ void swap(vector &buff) { _buff.swap(buff); } + /** + * insert buff + * @param buff + */ + void insertBuff(const char *buff, size_t length) { _buff.insert(_buff.end(), buff, buff + length); } + /** * @brief session */ nghttp2_session* session() const { return _session; } + /** + * + * @return + */ + const char *getErrMsg(); + protected: /** * error code @@ -61,41 +88,42 @@ protected: vector _buff; }; -class TC_Http2Server + + +class TC_Http2Server : public TC_Http2 { public: - enum Req_Type - { - REQUEST_GET, - REQUEST_POST, - REQUEST_OPTIONS, - REQUEST_HEAD, - REQUEST_PUT, - REQUEST_DELETE - } ; - - struct Http2Response - { - int status; - string about; - TC_Http::http_header_type header; - string body; - }; - - typedef std::function RequestFunc; - TC_Http2Server(); ~TC_Http2Server(); - /** - * get all http2 request id - * @param in - * @param out - * @return - */ - static int doRequest(const vector &request, vector& vtReqid); + struct Http2Context + { +// Http2Context(int32_t id) : reqId(id) {} + + int32_t reqId; + bool bFinish = false; + TC_HttpRequest request; + TC_HttpResponse response; + }; + + /** + * parse all request + * @param request + * @param unordered_map> + * @return + */ + void decodeRequest(vector &contexts); + + /** + * + * @param reqid + * @param response + * @param out + * @return + */ + int encodeResponse(const Http2Context &context, vector &out); /** * http2 @@ -105,61 +133,37 @@ public: */ TC_NetWorkBuffer::PACKET_TYPE parse(TC_NetWorkBuffer&in, vector &out); - int doResponse(int32_t reqid, const Http2Response &response, vector& out); - int doRequest(const vector &request, RequestFunc func, vector& response); - - int getMethod(int32_t reqid, Req_Type &method); - - int getUri(int32_t reqid, string &uri); - - int getHeader(int32_t reqid, TC_Http::http_header_type &header); - - int getBody(int32_t reqid, string &body); - - void appendResponseBuf(const char *buff, size_t length); void onHeaderCallback(int32_t streamId); void onHeaderCallback(int32_t streamId, const string &skey, const string &svalue); void onFrameRecvCallback(int32_t streamId); void onDataChunkRecvCallback(int32_t streamId, const char *data, size_t len); void onStreamCloseCallback(int32_t streamId); - struct DataPack - { - DataPack(){} +protected: - DataPack(const string &data, int pos):dataBuf(data), readPos(pos){} - - string dataBuf; - unsigned int readPos; - }; + Http2Context &getContext(int32_t streamId); + void deleteContext(int32_t streamId); protected: - struct RequestPack - { - RequestPack():streamId(0), bFinish(false){} - Req_Type method; - string uri; - TC_Http::http_header_type header; - string body; - int32_t streamId; - bool bFinish; - }; +// TC_SpinLock _contextLock; +// TC_ThreadMutex _contextLock; - TC_SpinLock _responseBufLock; - vector _responseBuf; +// TC_SpinLock reqLock_; +// unordered_map _mReq; - TC_SpinLock reqLock_; - unordered_map _mReq; +// unordered_map _mReq; +// unordered_map> _context; + unordered_map _context; + + vector _contextFinished; vector _reqout; - nghttp2_session *_session; - bool _bNewCon; - TC_SpinLock _nghttpLock; + TC_ThreadMutex _nghttpLock; }; ///////////////////////////////////////////////////////////////////////////////// diff --git a/util/src/tc_http.cpp b/util/src/tc_http.cpp index 3ef25ad..72a0d73 100755 --- a/util/src/tc_http.cpp +++ b/util/src/tc_http.cpp @@ -2093,6 +2093,58 @@ void TC_HttpRequest::getHostPort(string &sDomain, uint32_t &iPort) iPort = TC_Common::strto(_httpURL.getPort()); } +void TC_HttpRequest::setMethod(const char * sMethod) +{ + //解析请求类型 + if(TC_Port::strcasecmp(sMethod, "GET") ==0) //if(sMethod == "GET") + { + _requestType = REQUEST_GET; + } + else if(TC_Port::strcasecmp(sMethod, "POST") ==0) //else if(sMethod == "POST") + { + _requestType = REQUEST_POST; + } + else if(TC_Port::strcasecmp(sMethod, "PUT") ==0) + { + _requestType = REQUEST_PUT; + } + else if(TC_Port::strcasecmp(sMethod, "PATCH") ==0) + { + _requestType = REQUEST_PATCH; + } + else if(TC_Port::strcasecmp(sMethod, "OPTIONS") ==0) //else if(sMethod == "OPTIONS") + { + _requestType = REQUEST_OPTIONS; + } + else if(TC_Port::strcasecmp(sMethod, "HEAD") == 0) + { + _requestType = REQUEST_HEAD; + } + else if(TC_Port::strcasecmp(sMethod, "DELETE") == 0) + { + _requestType = REQUEST_DELETE; + } + else + { + throw TC_HttpRequest_Exception("[TC_HttpRequest::setMethod] http request command error: " + string(sMethod)); + } +} + +void TC_HttpRequest::setPath(const char *path) +{ + _httpURL._sPath = path; +} + +void TC_HttpRequest::setDomain(const char * sDomain) +{ + _httpURL._sDomain = sDomain; +} + +void TC_HttpRequest::setScheme(const char * sScheme) +{ + _httpURL._sScheme = sScheme; +} + int TC_HttpRequest::doRequest(TC_HttpResponse &stHttpRsp, int iTimeout) { //只支持短连接模式 diff --git a/util/src/tc_http2.cpp b/util/src/tc_http2.cpp index ba6fd69..ad1e423 100644 --- a/util/src/tc_http2.cpp +++ b/util/src/tc_http2.cpp @@ -51,8 +51,14 @@ int TC_Http2::settings(unsigned int maxCurrentStreams) NGHTTP2_FLAG_NONE, iv, sizeof(iv)/sizeof(iv[0])); - int rv = nghttp2_session_send(_session); - return rv; + _err = nghttp2_session_send(_session); + + return _err; +} + +const char *TC_Http2::getErrMsg() +{ + return nghttp2_strerror(_err); } /////////////////////////////////////////////////////////////////////////////////// @@ -65,19 +71,19 @@ static ssize_t str_read_callback(nghttp2_session *session, int32_t stream_id, nghttp2_data_source *source, void *user_data) { - TC_Http2Server::DataPack *dataPack = (TC_Http2Server::DataPack*)(source->ptr); - if(dataPack->readPos == dataPack->dataBuf.size()) + TC_Http2::DataPack *dataPack = (TC_Http2::DataPack*)(source->ptr); + if(dataPack->_readPos == dataPack->_length) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; return 0; } - size_t size = std::min(dataPack->dataBuf.size() - dataPack->readPos, length); + size_t size = std::min(dataPack->_length - dataPack->_readPos, length); - memcpy(buf, dataPack->dataBuf.c_str() + dataPack->readPos, size); + memcpy(buf, dataPack->_dataBuf + dataPack->_readPos, size); - dataPack->readPos += size; + dataPack->_readPos += size; - if(dataPack->readPos == dataPack->dataBuf.size()) + if(dataPack->_readPos == dataPack->_length) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; } @@ -89,7 +95,7 @@ static ssize_t send_callback(nghttp2_session *session, const uint8_t *data, size_t length, int flags, void *user_data) { TC_Http2Server *ptr = (TC_Http2Server*)user_data; - ptr->appendResponseBuf((const char*)data, length); + ptr->insertBuff((const char*)data, length); return (ssize_t)length; } @@ -97,7 +103,7 @@ static ssize_t send_callback(nghttp2_session *session, const uint8_t *data, static int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, - size_t valuelen, uint8_t flags, void *user_data) + size_t valuelen, uint8_t flags, void *user_data) { TC_Http2Server *ptr = (TC_Http2Server*)user_data; ptr->onHeaderCallback(frame->hd.stream_id, string((char*)name, namelen), string((char*)value, valuelen)); @@ -106,7 +112,7 @@ static int on_header_callback(nghttp2_session *session, static int on_begin_headers_callback(nghttp2_session *session, const nghttp2_frame *frame, - void *user_data) + void *user_data) { TC_Http2Server *ptr = (TC_Http2Server*)user_data; @@ -118,16 +124,16 @@ static int on_begin_headers_callback(nghttp2_session *session, return 0; } -static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) +static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { TC_Http2Server *ptr = (TC_Http2Server*)user_data; - switch (frame->hd.type) + switch (frame->hd.type) { case NGHTTP2_DATA: case NGHTTP2_HEADERS: /* Check that the client request has finished */ - if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) + if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { ptr->onFrameRecvCallback(frame->hd.stream_id); return 0; @@ -141,7 +147,7 @@ static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, - size_t len, void *user_data) + size_t len, void *user_data) { TC_Http2Server *ptr = (TC_Http2Server*)user_data; @@ -150,7 +156,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, return 0; } -static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) +static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) { TC_Http2Server *ptr = (TC_Http2Server*)user_data; ptr->onStreamCloseCallback(stream_id); @@ -160,93 +166,97 @@ static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, } -void TC_Http2Server::appendResponseBuf(const char *buff, size_t length) -{ - TC_LockT lock(_responseBufLock); - _responseBuf.insert(_responseBuf.end(), buff, buff + length); - -} - void TC_Http2Server::onHeaderCallback(int streamId) { - TC_LockT lock(reqLock_); - _mReq[streamId].streamId = streamId; +// TC_LockT lock(_nghttpLock); + + _context[streamId].reqId = streamId; } void TC_Http2Server::onHeaderCallback(int32_t streamId, const string &skey, const string &svalue) { - TC_LockTlock(reqLock_); - auto it = _mReq.find(streamId); - if (it != _mReq.end()) - { - it->second.header.insert(std::pair(skey, svalue)); + TC_Http2Server::Http2Context & context = getContext(streamId); - if (TC_Port::strcasecmp(skey.c_str(), ":method") == 0) - { - // string sMethod = TC_Common::upper(TC_Common::trim(svalue)); - if (TC_Port::strcasecmp(svalue.c_str(), "GET") == 0) - it->second.method = REQUEST_GET; - else if (TC_Port::strcasecmp(svalue.c_str(), "POST") == 0) - it->second.method = REQUEST_POST; - else if (TC_Port::strcasecmp(svalue.c_str(), "OPTIONS") == 0) - it->second.method = REQUEST_OPTIONS; - else if (TC_Port::strcasecmp(svalue.c_str(), "HEAD") == 0) - it->second.method = REQUEST_HEAD; - else if (TC_Port::strcasecmp(svalue.c_str(), "PUT") == 0) - it->second.method = REQUEST_PUT; - else if (TC_Port::strcasecmp(svalue.c_str(), "DELETE") == 0) - it->second.method = REQUEST_DELETE; - } - else if (TC_Port::strcasecmp(skey.c_str(), ":path") == 0) - { - it->second.uri = svalue; - } + if (TC_Port::strcasecmp(skey.c_str(), ":method") == 0) + { + context.request.setMethod(svalue.c_str()); } + else if (TC_Port::strcasecmp(skey.c_str(), ":path") == 0) + { + context.request.setPath(svalue.c_str()); + } + else if (TC_Port::strcasecmp(skey.c_str(), ":scheme") == 0) + { + context.request.setScheme(svalue.c_str()); + } + else if (TC_Port::strcasecmp(skey.c_str(), ":authority") == 0) + { + context.request.setDomain(svalue.c_str()); + } + + context.request.setHeader(skey, svalue); } -void TC_Http2Server::onFrameRecvCallback(int32_t streamId) +void TC_Http2Server::onFrameRecvCallback(int32_t streamId) { - TC_LockT lock(reqLock_); - auto it = _mReq.find(streamId); - if (it != _mReq.end()) - { - it->second.bFinish = true; + TC_Http2Server::Http2Context& context = getContext(streamId); + +// TC_LockT lock(reqLock_); +// auto it = _mReq.find(streamId); +// if (it != _mReq.end()) +// { +// it->second.bFinish = true; + + if(context.request.getHeaders().find(":method") != context.request.getHeaders().end() || + context.request.getHeaders().find(":path") != context.request.getHeaders().end() || + context.request.getHeaders().find(":scheme") != context.request.getHeaders().end()) + { + context.bFinish = true; + + _contextFinished.push_back(context); + + _reqout.insert(_reqout.end(), (char*)&streamId, (char*)&streamId + sizeof(int32_t)); + + deleteContext(streamId); - if(it->second.header.find(":method") != it->second.header.end() || - it->second.header.find(":path") != it->second.header.end() || - it->second.header.find(":scheme") != it->second.header.end()) - { - char *tmpptr = (char*)&(it->second); - _reqout.insert(_reqout.end(), (char*)&tmpptr, (char*)&tmpptr + sizeof(TC_Http2Server::RequestPack *)); - } } +// } } -void TC_Http2Server::onDataChunkRecvCallback(int32_t streamId, const char *data, size_t len) +void TC_Http2Server::onDataChunkRecvCallback(int32_t streamId, const char *data, size_t len) { - TC_LockT lock(reqLock_); - auto it = _mReq.find(streamId); - if (it != _mReq.end()) - { - it->second.body.append(data, len); - } + TC_Http2Server::Http2Context &context = getContext(streamId); + + context.request.getContent().append(data, len); +// +// TC_LockT lock(reqLock_); +// auto it = _mReq.find(streamId); +// if (it != _mReq.end()) +// { +// it->second.body.append(data, len); +// } } -void TC_Http2Server::onStreamCloseCallback(int32_t streamId) +void TC_Http2Server::onStreamCloseCallback(int32_t streamId) { - TC_LockT lock(reqLock_); - auto it = _mReq.find(streamId); - if (it != _mReq.end()) - { - if (it->second.bFinish != true) - { - _mReq.erase(streamId); - } - } +// TC_Http2Server::Http2Context & context = getContext(streamId); +// { + deleteContext(streamId); +// } + +// TC_LockT lock(reqLock_); +// auto it = _mReq.find(streamId); +// if (it != _mReq.end()) +// { +// if (it->second.bFinish != true) +// { +// _mReq.erase(streamId); +// } +// } } -TC_Http2Server::TC_Http2Server(): _session(NULL), _bNewCon(true) +TC_Http2Server::TC_Http2Server(): _bNewCon(true) { nghttp2_session_callbacks *callbacks; @@ -266,42 +276,32 @@ TC_Http2Server::TC_Http2Server(): _session(NULL), _bNewCon(true) nghttp2_session_server_new(&_session, callbacks, ((void*)this)); - // *(int32_t*)((char*)_session + 2380) = 100000000; - - //TLOGDEBUG("window size:" << nghttp2_session_get_remote_window_size(_session) << endl); - nghttp2_session_callbacks_del(callbacks); } TC_Http2Server::~TC_Http2Server() { - nghttp2_session_del(_session); } TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector &out) { - if(_bNewCon) - { - _bNewCon = false; +// cout << "parse:" << in.getBufferLength() << endl; - nghttp2_settings_entry iv[2] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 3000}, - {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 100*1024*1024}}; - nghttp2_submit_settings(_session, NGHTTP2_FLAG_NONE, iv, sizeof(iv)/sizeof(nghttp2_settings_entry)); - - nghttp2_session_send(_session); - } +// if(_bNewCon) +// { +// _bNewCon = false; +// +// } in.mergeBuffers(); auto buff = in.getBufferPointer(); - int readlen; +// int readlen; - { - TC_LockT lock2(_nghttpLock); + TC_LockT lock2(_nghttpLock); - readlen = nghttp2_session_mem_recv(_session, (uint8_t *) buff.first, buff.second); - } + int readlen = nghttp2_session_mem_recv(_session, (uint8_t *) buff.first, buff.second); if(readlen < 0) { @@ -311,7 +311,7 @@ TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector< { in.moveHeader(readlen); - TC_LockT lock1(reqLock_); +// TC_LockT lock1(reqLock_); if (_reqout.empty()) { @@ -325,233 +325,369 @@ TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector< return TC_NetWorkBuffer::PACKET_FULL; } -int TC_Http2Server::doRequest(const vector &request, vector& vtReqid) +//void TC_Http2Server::createReq(int32_t streamId) +//{ +// TC_LockT lock(reqLock_); +// +// _context[streamId] = std::make_shared(); +//} + +TC_Http2Server::Http2Context& TC_Http2Server::getContext(int32_t streamId) { - vtReqid.clear(); +// TC_LockT lock(_nghttpLock); + auto it = _context.find(streamId); + if (it != _context.end()) + { + return it->second; + } - for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Server::RequestPack *)) - { - RequestPack *ptr; - memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Server::RequestPack *)); + assert(false); - vtReqid.push_back(ptr->streamId); - } - - return 0; } -int TC_Http2Server::doResponse(int32_t reqid, const Http2Response &response, vector& out) +void TC_Http2Server::deleteContext(int32_t streamId) { - { - TC_LockT lock(reqLock_); - auto it = _mReq.find(reqid); - if (it == _mReq.end()) - return -1; - } - string sstatus = TC_Common::tostr(response.status); +// TC_LockT lock(_nghttpLock); - const char* strstatus = ":status"; - nghttp2_nv *hdrs = new nghttp2_nv[response.header.size() + 1]; - hdrs[0].flags = NGHTTP2_NV_FLAG_NONE; - hdrs[0].name = (uint8_t*)strstatus; - hdrs[0].namelen = 7; - hdrs[0].value = (uint8_t*)sstatus.c_str(); - hdrs[0].valuelen = sstatus.size(); - TC_Http::http_header_type::const_iterator it = response.header.begin(); - for (int n = 1; it != response.header.end(); n++, it++) - { - hdrs[n].flags = NGHTTP2_NV_FLAG_NONE; - hdrs[n].name = (uint8_t*)it->first.c_str(); - hdrs[n].namelen = it->first.size(); - hdrs[n].value = (uint8_t*)it->second.c_str(); - hdrs[n].valuelen = it->second.size(); - } - - DataPack dataPack; - dataPack.readPos = 0; - dataPack.dataBuf = response.body; - - nghttp2_data_provider data_prd; - data_prd.source.ptr = (void*)&dataPack; - data_prd.read_callback = server::str_read_callback; - int ret ; - { - TC_LockT lock(_nghttpLock); - - ret = nghttp2_submit_response(_session, reqid, hdrs, response.header.size()+1, &data_prd); - if (ret != 0) { - cout << "nghttp2_submit_response error" << endl; - return -1; - } - - while (nghttp2_session_want_write(_session)) { - ret = nghttp2_session_send(_session); - if (ret != 0) { - cout << "nghttp2_session_send error" << endl; - return -1; - } - } - } - - delete [] hdrs; - - { - TC_LockT lock(_responseBufLock); - out.swap(_responseBuf); - _responseBuf.clear(); - } - - { - TC_LockT lock(reqLock_); - _mReq.erase(reqid); - } - - return 0; + _context.erase(streamId); } -int TC_Http2Server::doRequest(const vector &request, TC_Http2Server::RequestFunc requestFunc, vector& response) +void TC_Http2Server::decodeRequest(vector &contexts) { - for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Server::RequestPack *)) - { - Http2Response rsp; + TC_LockT lock(_nghttpLock); - RequestPack *ptr; - memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Server::RequestPack *)); + _contextFinished.swap(contexts); - Req_Type qstatus; +// return context; +} +// +//vector TC_Http2Server::parseReqId(const vector &request) +//{ +// vector vtReqid; +// for (unsigned int i = 0; i < request.size(); i += sizeof(int32_t)) +// { +// int32_t reqId; +// memcpy(&reqId, &request[i], sizeof(int32_t)); +// vtReqid.push_back(reqId); +// } +// +// return vtReqid; +//} - const char* sMethod = ptr->header.find(":method")->second.c_str(); //TC_Common::upper(TC_Common::trim(ptr->header.find(":method")->second)); - if (TC_Port::strcasecmp(sMethod, "GET") == 0) - qstatus = REQUEST_GET; - else if (TC_Port::strcasecmp(sMethod, "POST") == 0) - qstatus = REQUEST_POST; - else if (TC_Port::strcasecmp(sMethod, "OPTIONS") == 0) - qstatus = REQUEST_OPTIONS; - else if (TC_Port::strcasecmp(sMethod, "HEAD") == 0) - qstatus = REQUEST_HEAD; - else if (TC_Port::strcasecmp(sMethod, "PUT") == 0) - qstatus = REQUEST_PUT; - else if (TC_Port::strcasecmp(sMethod, "DELETE") == 0) - qstatus = REQUEST_DELETE; - else - { - continue; - } - string sstatus = ptr->header.find(":path")->second; +int TC_Http2Server::encodeResponse(const TC_Http2Server::Http2Context &context, vector &out) +{ +// { +// TC_LockT lock(reqLock_); +// auto it = _mReq.find(reqid); +// if (it == _mReq.end()) +// return -1; +// } - requestFunc(qstatus, sstatus, ptr->header, ptr->body, rsp); +// deleteContext(context->reqId); - DataPack dataPack; - dataPack.readPos = 0; - dataPack.dataBuf = rsp.body; + string sstatus = TC_Common::tostr(context.response.getStatus()); - sstatus = TC_Common::tostr(rsp.status); + const char* strstatus = ":status"; - const char* strstatus = ":status"; - nghttp2_nv *hdrs = new nghttp2_nv[rsp.header.size() + 1]; - hdrs[0].flags = NGHTTP2_NV_FLAG_NONE; - hdrs[0].name = (uint8_t*)strstatus; - hdrs[0].namelen = 7; - hdrs[0].value = (uint8_t*)sstatus.c_str(); - hdrs[0].valuelen = sstatus.size(); - TC_Http::http_header_type::iterator it = rsp.header.begin(); - for (int n = 1; it != rsp.header.end(); n++, it++) - { - hdrs[n].flags = NGHTTP2_NV_FLAG_NONE; - hdrs[n].name = (uint8_t*)it->first.c_str(); - hdrs[n].namelen = it->first.size(); - hdrs[n].value = (uint8_t*)it->second.c_str(); - hdrs[n].valuelen = it->second.size(); - } + nghttp2_nv *hdrs = new nghttp2_nv[context.response.getHeaders().size() + 1]; + hdrs[0].flags = NGHTTP2_NV_FLAG_NONE; + hdrs[0].name = (uint8_t*)strstatus; + hdrs[0].namelen = 7; + hdrs[0].value = (uint8_t*)sstatus.c_str(); + hdrs[0].valuelen = sstatus.size(); - nghttp2_data_provider data_prd; - data_prd.source.ptr = (void*)&dataPack; - data_prd.read_callback = server::str_read_callback; - - { - TC_LockT lock(_nghttpLock); + TC_Http::http_header_type::const_iterator it = context.response.getHeaders().begin(); + for (int n = 1; it != context.response.getHeaders().end(); n++, it++) + { + hdrs[n].flags = NGHTTP2_NV_FLAG_NONE; + hdrs[n].name = (uint8_t*)it->first.c_str(); + hdrs[n].namelen = it->first.size(); + hdrs[n].value = (uint8_t*)it->second.c_str(); + hdrs[n].valuelen = it->second.size(); + } - int ret = nghttp2_submit_response(_session, ptr->streamId, hdrs, rsp.header.size()+1, &data_prd); - if (ret != 0) - { - cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl; - } - ;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret)); + const string &body = context.response.getContent(); - while (nghttp2_session_want_write(_session)) { - ret = nghttp2_session_send(_session); - if (ret != 0) - { - cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl; - } - ;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret)); - } - } + DataPack dataPack(body.c_str(), body.size()); +// dataPack.readPos = 0; +// dataPack.dataBuf = response.body; - { - TC_LockT lock(_responseBufLock); - response.insert(response.begin(), _responseBuf.begin(), _responseBuf.end()); - _responseBuf.clear(); - } + nghttp2_data_provider data_prd; + data_prd.source.ptr = (void*)&dataPack; + data_prd.read_callback = server::str_read_callback; + { + TC_LockT lock(_nghttpLock); - delete [] hdrs; - { - TC_LockT lock(reqLock_); - _mReq.erase(ptr->streamId); - } + _err = nghttp2_submit_response(_session, context.reqId, hdrs, context.response.getHeaders().size()+1, &data_prd); + if (_err != 0 ) { + delete [] hdrs; - } + return _err; + } - return 0; + while (nghttp2_session_want_write(_session)) { + _err = nghttp2_session_send(_session); + if (_err != 0) { + delete [] hdrs; + + return _err; + } + } + + this->swap(out); + } + + delete [] hdrs; + + return 0; + +// { +// TC_LockT lock(_responseBufLock); +// out.swap(_responseBuf); +// _responseBuf.clear(); +// } +// +// { +// TC_LockT lock(reqLock_); +// _mReq.erase(reqid); +// } } -int TC_Http2Server::getMethod(int32_t reqid, Req_Type &method) -{ - TC_LockT lock(reqLock_); - auto it = _mReq.find(reqid); - if (it != _mReq.end()) - method = it->second.method; - else - return -1; - - return 0; -} - -int TC_Http2Server::getUri(int32_t reqid, string &uri) -{ - TC_LockT lock(reqLock_); - auto it = _mReq.find(reqid); - if (it != _mReq.end()) - uri = it->second.uri; - else - return -1; - - return 0; -} - -int TC_Http2Server::getHeader(int32_t reqid, TC_Http::http_header_type &header) -{ - TC_LockT lock(reqLock_); - auto it = _mReq.find(reqid); - if (it != _mReq.end()) - header = it->second.header; - else - return -1; - - return 0; -} - -int TC_Http2Server::getBody(int32_t reqid, string &body) -{ - TC_LockT lock(reqLock_); - auto it = _mReq.find(reqid); - if (it != _mReq.end()) - body = it->second.body; - else - return -1; - - return 0; -} +// +//int TC_Http2Server::doRequest(const vector &request, vector& vtReqid) +//{ +// vtReqid.clear(); +// +// for (unsigned int i = 0; i < request.size(); i += sizeof(int32_t)) +// { +//// RequestPack *ptr; +//// memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Server::RequestPack *)); +// +// int32_t reqId; +// memcpy(&reqId, &request[i], sizeof(int32_t)); +// vtReqid.push_back(reqId); +// } +// +// return 0; +//} +// +//int TC_Http2Server::doResponse(int32_t reqid, const Http2Response &response, vector& out) +//{ +// { +// TC_LockT lock(reqLock_); +// auto it = _mReq.find(reqid); +// if (it == _mReq.end()) +// return -1; +// } +// string sstatus = TC_Common::tostr(response.status); +// +// const char* strstatus = ":status"; +// nghttp2_nv *hdrs = new nghttp2_nv[response.header.size() + 1]; +// hdrs[0].flags = NGHTTP2_NV_FLAG_NONE; +// hdrs[0].name = (uint8_t*)strstatus; +// hdrs[0].namelen = 7; +// hdrs[0].value = (uint8_t*)sstatus.c_str(); +// hdrs[0].valuelen = sstatus.size(); +// TC_Http::http_header_type::const_iterator it = response.header.begin(); +// for (int n = 1; it != response.header.end(); n++, it++) +// { +// hdrs[n].flags = NGHTTP2_NV_FLAG_NONE; +// hdrs[n].name = (uint8_t*)it->first.c_str(); +// hdrs[n].namelen = it->first.size(); +// hdrs[n].value = (uint8_t*)it->second.c_str(); +// hdrs[n].valuelen = it->second.size(); +// } +// +// DataPack dataPack; +// dataPack.readPos = 0; +// dataPack.dataBuf = response.body; +// +// nghttp2_data_provider data_prd; +// data_prd.source.ptr = (void*)&dataPack; +// data_prd.read_callback = server::str_read_callback; +// int ret ; +// { +// TC_LockT lock(_nghttpLock); +// +// ret = nghttp2_submit_response(_session, reqid, hdrs, response.header.size()+1, &data_prd); +// if (ret != 0) { +// cout << "nghttp2_submit_response error" << endl; +// return -1; +// } +// +// while (nghttp2_session_want_write(_session)) { +// ret = nghttp2_session_send(_session); +// if (ret != 0) { +// cout << "nghttp2_session_send error" << endl; +// return -1; +// } +// } +// } +// +// delete [] hdrs; +// +// this->swap(out); +//// { +//// TC_LockT lock(_responseBufLock); +//// out.swap(_responseBuf); +//// _responseBuf.clear(); +//// } +// +// { +// TC_LockT lock(reqLock_); +// _mReq.erase(reqid); +// } +// +// return 0; +//} +// +//int TC_Http2Server::doRequest(const vector &request, TC_Http2Server::RequestFunc requestFunc, vector& response) +//{ +// for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Server::RequestPack *)) +// { +// Http2Response rsp; +// +// RequestPack *ptr; +// memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Server::RequestPack *)); +// +// Req_Type qstatus; +// +// const char* sMethod = ptr->header.find(":method")->second.c_str(); //TC_Common::upper(TC_Common::trim(ptr->header.find(":method")->second)); +// if (TC_Port::strcasecmp(sMethod, "GET") == 0) +// qstatus = REQUEST_GET; +// else if (TC_Port::strcasecmp(sMethod, "POST") == 0) +// qstatus = REQUEST_POST; +// else if (TC_Port::strcasecmp(sMethod, "OPTIONS") == 0) +// qstatus = REQUEST_OPTIONS; +// else if (TC_Port::strcasecmp(sMethod, "HEAD") == 0) +// qstatus = REQUEST_HEAD; +// else if (TC_Port::strcasecmp(sMethod, "PUT") == 0) +// qstatus = REQUEST_PUT; +// else if (TC_Port::strcasecmp(sMethod, "DELETE") == 0) +// qstatus = REQUEST_DELETE; +// else +// { +// continue; +// } +// string sstatus = ptr->header.find(":path")->second; +// +// requestFunc(qstatus, sstatus, ptr->header, ptr->body, rsp); +// +// DataPack dataPack; +// dataPack.readPos = 0; +// dataPack.dataBuf = rsp.body; +// +// sstatus = TC_Common::tostr(rsp.status); +// +// const char* strstatus = ":status"; +// nghttp2_nv *hdrs = new nghttp2_nv[rsp.header.size() + 1]; +// hdrs[0].flags = NGHTTP2_NV_FLAG_NONE; +// hdrs[0].name = (uint8_t*)strstatus; +// hdrs[0].namelen = 7; +// hdrs[0].value = (uint8_t*)sstatus.c_str(); +// hdrs[0].valuelen = sstatus.size(); +// TC_Http::http_header_type::iterator it = rsp.header.begin(); +// for (int n = 1; it != rsp.header.end(); n++, it++) +// { +// hdrs[n].flags = NGHTTP2_NV_FLAG_NONE; +// hdrs[n].name = (uint8_t*)it->first.c_str(); +// hdrs[n].namelen = it->first.size(); +// hdrs[n].value = (uint8_t*)it->second.c_str(); +// hdrs[n].valuelen = it->second.size(); +// } +// +// nghttp2_data_provider data_prd; +// data_prd.source.ptr = (void*)&dataPack; +// data_prd.read_callback = server::str_read_callback; +// +// { +// TC_LockT lock(_nghttpLock); +// +// int ret = nghttp2_submit_response(_session, ptr->streamId, hdrs, rsp.header.size()+1, &data_prd); +// if (ret != 0) +// { +// cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl; +// } +// ;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret)); +// +// while (nghttp2_session_want_write(_session)) { +// ret = nghttp2_session_send(_session); +// if (ret != 0) +// { +// cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl; +// } +// ;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret)); +// } +// } +// +// vector out; +// swap(out); +// response.insert(response.begin(), out.begin(), out.end()); +//// +//// { +//// TC_LockT lock(_responseBufLock); +//// response.insert(response.begin(), _responseBuf.begin(), _responseBuf.end()); +//// _responseBuf.clear(); +//// } +// +// delete [] hdrs; +// { +// TC_LockT lock(reqLock_); +// _mReq.erase(ptr->streamId); +// } +// +// } +// +// return 0; +//} +// +//int TC_Http2Server::getMethod(int32_t reqid, Req_Type &method) +//{ +// TC_LockT lock(reqLock_); +// auto it = _mReq.find(reqid); +// if (it != _mReq.end()) +// method = it->second.method; +// else +// return -1; +// +// return 0; +//} +// +//int TC_Http2Server::getUri(int32_t reqid, string &uri) +//{ +// TC_LockT lock(reqLock_); +// auto it = _mReq.find(reqid); +// if (it != _mReq.end()) +// uri = it->second.uri; +// else +// return -1; +// +// return 0; +//} +// +//int TC_Http2Server::getHeader(int32_t reqid, TC_Http::http_header_type &header) +//{ +// TC_LockT lock(reqLock_); +// auto it = _mReq.find(reqid); +// if (it != _mReq.end()) +// header = it->second.header; +// else +// return -1; +// +// return 0; +//} +// +//int TC_Http2Server::getBody(int32_t reqid, string &body) +//{ +// TC_LockT lock(reqLock_); +// auto it = _mReq.find(reqid); +// if (it != _mReq.end()) +// body = it->second.body; +// else +// return -1; +// +// return 0; +//} ////////////////////////////////////////////////////////////////////////////////////////