diff --git a/servant/libservant/AppProtocol.cpp b/servant/libservant/AppProtocol.cpp index 9dd6176..dca2d2e 100644 --- a/servant/libservant/AppProtocol.cpp +++ b/servant/libservant/AppProtocol.cpp @@ -18,6 +18,7 @@ #include "util/tc_http.h" #include "servant/AppProtocol.h" #include "servant/Transceiver.h" +#include "servant/AdapterProxy.h" #include "servant/TarsLogger.h" #include "tup/Tars.h" #include @@ -92,7 +93,7 @@ static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id } ssize_t len = length > body->size() ? body->size() : length; - std::memcpy(buf, &(*body)[0], len); + memcpy(buf, body->data(), len); vector::iterator end = body->begin(); std::advance(end, len); @@ -101,8 +102,10 @@ static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id return len; } -vector ProxyProtocol::http1Request(tars::RequestPacket& request, Transceiver *) +vector ProxyProtocol::http1Request(tars::RequestPacket& request, Transceiver *trans) { + request.iRequestId = trans->getAdapterProxy()->getId(); + TC_HttpRequest httpRequest; httpRequest.setRequest(request.sFuncName, request.sServantName, string(request.sBuffer.data(), request.sBuffer.size()), true); @@ -114,32 +117,51 @@ vector ProxyProtocol::http1Request(tars::RequestPacket& request, Transceiv return buffer; } +struct Http1Context +{ + string buff; + + TC_HttpResponse httpRsp; +}; + TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http1Response(TC_NetWorkBuffer &in, ResponsePacket& rsp) { - TC_NetWorkBuffer::PACKET_TYPE flag = in.checkHttp(); + void *contextData = in.getContextData(); - if(flag == TC_NetWorkBuffer::PACKET_FULL) + if(contextData == NULL) { - TC_HttpResponse httpRsp; - vector buffer = in.getBuffers(); + contextData = in.setContextData(new Http1Context()); + } - httpRsp.decode(buffer.data(), buffer.size()); + Http1Context *context = (Http1Context*)contextData; - // ResponsePacket rsp; - rsp.status["status"] = httpRsp.getResponseHeaderLine(); - for (const auto& kv : httpRsp.getHeaders()) + context->buff.append(in.getBuffersString()); + in.clearBuffers(); + + if(context->httpRsp.incrementDecode(context->buff)) + { + rsp.iRequestId = ((Transceiver*)(in.getConnection()))->getAdapterProxy()->getId(); + + rsp.status["status"] = context->httpRsp.getResponseHeaderLine(); + for (const auto& kv : context->httpRsp.getHeaders()) { // 响应的头部 rsp.status[kv.first] = kv.second; } - rsp.sBuffer.assign(httpRsp.getContent().begin(), httpRsp.getContent().end()); + rsp.sBuffer.assign(context->httpRsp.getContent().begin(), context->httpRsp.getContent().end()); + + delete context; + + context = NULL; + + in.setContextData(NULL); + + return TC_NetWorkBuffer::PACKET_FULL; + } - return flag; - - // done.push_back(rsp); - // return httpRsp.getHeadLength() + httpRsp.getContentLength(); + return TC_NetWorkBuffer::PACKET_LESS; } // vector encodeHttp2(RequestPacket& request, TC_NgHttp2* session) diff --git a/servant/libservant/Transceiver.cpp b/servant/libservant/Transceiver.cpp index d9a7c4e..d6f001c 100755 --- a/servant/libservant/Transceiver.cpp +++ b/servant/libservant/Transceiver.cpp @@ -278,8 +278,11 @@ void Transceiver::close() #if TARS_HTTP2 // Http2ClientSessionManager::getInstance()->delSession(_adapterProxy->getId()); - nghttp2_session_del(_http2Session->session()); - _http2Session = NULL; + if(_http2Session) + { + nghttp2_session_del(_http2Session->session()); + _http2Session = NULL; + } #endif _adapterProxy->getObjProxy()->getCommunicatorEpoll()->delFd(_fd,&_fdInfo,EPOLLIN|EPOLLOUT); @@ -489,6 +492,9 @@ int TcpTransceiver::doResponse() int iRet = 0; int recvCount = 0; + + shared_ptr rsp = std::make_shared(); + do { char buff[BUFFER_SIZE] = {0x00}; @@ -504,8 +510,6 @@ int TcpTransceiver::doResponse() TC_NetWorkBuffer::PACKET_TYPE ret; do { - shared_ptr rsp = std::make_shared(); - ret = _adapterProxy->getObjProxy()->getProxyProtocol().responseFunc(_recvBuffer, *rsp.get()); if (ret == TC_NetWorkBuffer::PACKET_ERR) { @@ -515,6 +519,8 @@ int TcpTransceiver::doResponse() } else if (ret == TC_NetWorkBuffer::PACKET_FULL) { _adapterProxy->finishInvoke(rsp); + + rsp = std::make_shared(); } else { break; diff --git a/servant/protocol b/servant/protocol index 7c22d46..2e3c1ab 160000 --- a/servant/protocol +++ b/servant/protocol @@ -1 +1 @@ -Subproject commit 7c22d46777d76c8c08c2161f7cb4d9f0f45991d7 +Subproject commit 2e3c1abd10ad801f74f7673f54f7d45b7962318f diff --git a/util/include/util/tc_network_buffer.h b/util/include/util/tc_network_buffer.h index bfa7622..53a93af 100755 --- a/util/include/util/tc_network_buffer.h +++ b/util/include/util/tc_network_buffer.h @@ -134,6 +134,18 @@ public: */ void* getConnection() { return _connection; } + /** + * 设置上下文数据, 可以业务存放数据 + * @param buff + */ + void* setContextData(void *contextData) { _contextData = contextData; return _contextData; } + + /** + * 获取上下文数据, 给业务存放数据 + * @param buff + */ + void *getContextData() { return _contextData; } + /** * 增加buffer * @param buff @@ -181,6 +193,12 @@ public: */ vector getBuffers() const; + /** + * 返回所有buffer(将所有buffer拼接起来, 注意性能) + * @return string + */ + string getBuffersString() const; + /** * 读取len字节的buffer(避免len个字节被分割到多个buffer的情况)(注意: 不往后移动) * @param len @@ -371,6 +389,11 @@ protected: */ void* _connection = NULL; + /** + * contextData for use + */ + void* _contextData = NULL; + /** * buffer list */ diff --git a/util/src/tc_network_buffer.cpp b/util/src/tc_network_buffer.cpp index a0ab7fa..a7729a1 100755 --- a/util/src/tc_network_buffer.cpp +++ b/util/src/tc_network_buffer.cpp @@ -58,6 +58,32 @@ pair TC_NetWorkBuffer::getBufferPointer() const return make_pair(it->data() + _pos, it->size() - _pos); } +string TC_NetWorkBuffer::getBuffersString() const +{ + string buffer; + buffer.resize(_length); + + auto it = _bufferList.begin(); + + size_t pos = 0; + while(it != _bufferList.end()) + { + if(it == _bufferList.begin()) + { + memcpy(&buffer[pos], it->data() + _pos, it->size() - _pos); + pos += it->size() - _pos; + } + else + { + memcpy(&buffer[pos], it->data(), it->size()); + pos += it->size(); + } + ++it; + } + + return buffer; +} + vector TC_NetWorkBuffer::getBuffers() const { vector buffer; @@ -293,6 +319,7 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::checkHttp() } catch (exception &ex) { + cout << ex.what() << endl; return PACKET_ERR; } diff --git a/util/src/tc_nghttp2.cpp b/util/src/tc_nghttp2.cpp index f5040df..de51903 100644 --- a/util/src/tc_nghttp2.cpp +++ b/util/src/tc_nghttp2.cpp @@ -216,7 +216,7 @@ int TC_NgHttp2::settings(unsigned int maxCurrentStreams) void TC_NgHttp2::onNegotiateDone(bool succ) { - assert (_state == Negotiating); + // assert (_state == Negotiating); _state = succ ? Http2: Http1; if (succ) {