diff --git a/CMakeLists.txt b/CMakeLists.txt index 44248bd..b0160c7 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,7 +26,7 @@ set(TARS_VERSION "2.0.0") add_definitions(-DTARS_VERSION="${TARS_VERSION}") set(TARS_SSL 0) add_definitions(-DTARS_SSL=${TARS_SSL}) -set(TARS_HTTP2 0) +set(TARS_HTTP2 1) add_definitions(-DTARS_HTTP2=${TARS_HTTP2}) set(_USE_OPENTRACKING $ENV{_USE_OPENTRACKING}) diff --git a/examples/CustomDemo/CustomClient/main.cpp b/examples/CustomDemo/CustomClient/main.cpp index a653346..df20569 100644 --- a/examples/CustomDemo/CustomClient/main.cpp +++ b/examples/CustomDemo/CustomClient/main.cpp @@ -55,7 +55,7 @@ static TC_NetWorkBuffer::PACKET_TYPE customResponse(TC_NetWorkBuffer &in, Respon /* Whole package length (4 bytes) + irequestid (4 bytes) + package content */ -static vector customRequest(const RequestPacket& request) +static vector customRequest(RequestPacket& request, Transceiver *) { unsigned int net_bufflength = htonl(request.sBuffer.size()+8); unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength); diff --git a/examples/HttpDemo/HttpClient/main.cpp b/examples/HttpDemo/HttpClient/main.cpp index 2a1f8f2..b714784 100644 --- a/examples/HttpDemo/HttpClient/main.cpp +++ b/examples/HttpDemo/HttpClient/main.cpp @@ -153,7 +153,8 @@ void syncRpc(int c) int64_t t = TC_Common::now2us(); std::map header; - header["X-Test"] = "YYYY"; + // header[":authority"] = "domain.com"; + // header[":scheme"] = "http"; std::map rheader; //发起远程调用 @@ -239,8 +240,11 @@ int main(int argc, char *argv[]) param.servantPrx->tars_async_timeout(60*1000); ProxyProtocol proto; - proto.requestFunc = tars::http2Request; - proto.responseFunc = tars::http2Response; + proto.requestFunc = ProxyProtocol::http1Request; + proto.responseFunc = ProxyProtocol::http1Response; + // proto.requestFunc = ProxyProtocol::http2Request; + // proto.responseFunc = ProxyProtocol::http2Response; + param.servantPrx->tars_set_protocol(proto); int64_t start = TC_Common::now2us(); diff --git a/examples/PushDemo/PushClient/TestRecvThread.cpp b/examples/PushDemo/PushClient/TestRecvThread.cpp index 517e3f5..8d7f434 100755 --- a/examples/PushDemo/PushClient/TestRecvThread.cpp +++ b/examples/PushDemo/PushClient/TestRecvThread.cpp @@ -55,7 +55,7 @@ static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, Response 请求包编码函数,本函数的打包格式为 整个包长度(4字节)+iRequestId(4字节)+包内容 */ -static vector pushRequest(const RequestPacket& request) +static vector pushRequest(RequestPacket& request, Transceiver *) { unsigned int net_bufflength = htonl(request.sBuffer.size()+8); unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength); diff --git a/servant/libservant/AdapterProxy.cpp b/servant/libservant/AdapterProxy.cpp index 8b82b7a..39c517d 100755 --- a/servant/libservant/AdapterProxy.cpp +++ b/servant/libservant/AdapterProxy.cpp @@ -25,7 +25,7 @@ #include "servant/StatF.h" #include "servant/StatReport.h" #include "util/tc_nghttp2.h" -#include "util/tc_http2clientmgr.h" +// #include "util/tc_http2clientmgr.h" #ifdef _USE_OPENTRACKING #include "servant/text_map_carrier.h" #endif @@ -158,18 +158,18 @@ int AdapterProxy::invoke(ReqMessage * msg) msg->request.iRequestId = _timeoutQueue->generateId(); } -#if TARS_HTTP2 - if (getObjProxy()->getProtoName() == HTTP2) - { - msg->request.iRequestId = getId(); // session Id - } -#endif +// #if TARS_HTTP2 +// if (getObjProxy()->getProtoName() == HTTP2) +// { +// msg->request.iRequestId = getId(); // session Id +// } +// #endif #ifdef _USE_OPENTRACKING startTrack(msg); #endif - msg->sReqData->addBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request)); + msg->sReqData->addBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get())); //交给连接发送数据,连接连上,buffer不为空,直接发送数据成功 if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData) != Transceiver::eRetError) diff --git a/servant/libservant/AppProtocol.cpp b/servant/libservant/AppProtocol.cpp index 171272c..9dd6176 100644 --- a/servant/libservant/AppProtocol.cpp +++ b/servant/libservant/AppProtocol.cpp @@ -15,27 +15,16 @@ */ #include "util/tc_epoll_server.h" +#include "util/tc_http.h" #include "servant/AppProtocol.h" +#include "servant/Transceiver.h" +#include "servant/TarsLogger.h" #include "tup/Tars.h" #include #if TARS_HTTP2 #include "util/tc_nghttp2.h" -#include "util/tc_http2clientmgr.h" - -#define MAKE_NV(NAME, VALUE, VALUELEN) \ - { \ - (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, VALUELEN, \ - NGHTTP2_NV_FLAG_NONE \ - } - -#define MAKE_NV2(NAME, VALUE) \ - { \ - (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1, \ - NGHTTP2_NV_FLAG_NONE \ - } - -#define MAKE_STRING_NV(NAME, VALUE) {(uint8_t*)(NAME.data()), (uint8_t*)(VALUE.data()), NAME.size(), VALUE.size(), NGHTTP2_NV_FLAG_NONE}; +// #include "util/tc_http2clientmgr.h" #endif namespace tars @@ -47,7 +36,7 @@ TC_NetWorkBuffer::PACKET_TYPE AppProtocol::parseAdmin(TC_NetWorkBuffer &in, sha return parse(in, out->getBuffer()); } -vector ProxyProtocol::tarsRequest(const RequestPacket& request) +vector ProxyProtocol::tarsRequest(RequestPacket& request, Transceiver *) { TarsOutputStream os; @@ -73,6 +62,21 @@ vector ProxyProtocol::tarsRequest(const RequestPacket& request) //////////////////////////////////////////////////////////////////////////////////// #if TARS_HTTP2 + +#define MAKE_NV(NAME, VALUE, VALUELEN) \ + { \ + (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, VALUELEN, \ + NGHTTP2_NV_FLAG_NONE \ + } + +#define MAKE_NV2(NAME, VALUE) \ + { \ + (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1, \ + NGHTTP2_NV_FLAG_NONE \ + } + +#define MAKE_STRING_NV(NAME, VALUE) {(uint8_t*)(NAME.data()), (uint8_t*)(VALUE.data()), NAME.size(), VALUE.size(), NGHTTP2_NV_FLAG_NONE}; + // nghttp2读取请求包体,准备发送 static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length, @@ -97,14 +101,29 @@ static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id return len; } -TC_NetWorkBuffer::PACKET_TYPE http1Response(TC_NetWorkBuffer &in, tars::ResponsePacket& rsp) +vector ProxyProtocol::http1Request(tars::RequestPacket& request, Transceiver *) +{ + TC_HttpRequest httpRequest; + + httpRequest.setRequest(request.sFuncName, request.sServantName, string(request.sBuffer.data(), request.sBuffer.size()), true); + + vector buffer; + + httpRequest.encode(buffer); + + return buffer; +} + +TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http1Response(TC_NetWorkBuffer &in, ResponsePacket& rsp) { TC_NetWorkBuffer::PACKET_TYPE flag = in.checkHttp(); if(flag == TC_NetWorkBuffer::PACKET_FULL) { - tars::TC_HttpResponse httpRsp; - httpRsp.decode(in.getBuffers()); + TC_HttpResponse httpRsp; + vector buffer = in.getBuffers(); + + httpRsp.decode(buffer.data(), buffer.size()); // ResponsePacket rsp; rsp.status["status"] = httpRsp.getResponseHeaderLine(); @@ -123,8 +142,87 @@ TC_NetWorkBuffer::PACKET_TYPE http1Response(TC_NetWorkBuffer &in, tars::Response // return httpRsp.getHeadLength() + httpRsp.getContentLength(); } -vector encodeHttp2(RequestPacket& request, TC_NgHttp2* session) +// vector encodeHttp2(RequestPacket& request, TC_NgHttp2* session) +// { +// std::vector nva; + +// const std::string method(":method"); +// nghttp2_nv nv1 = MAKE_STRING_NV(method, request.sFuncName); +// if (!request.sFuncName.empty()) +// nva.push_back(nv1); + +// const std::string path(":path"); +// nghttp2_nv nv2 = MAKE_STRING_NV(path, request.sServantName); +// if (!request.sServantName.empty()) +// nva.push_back(nv2); + +// for (std::map::const_iterator +// it(request.context.begin()); +// it != request.context.end(); +// ++ it) +// { +// nghttp2_nv nv = MAKE_STRING_NV(it->first, it->second); +// nva.push_back(nv); +// } + +// nghttp2_data_provider* pData = NULL; +// nghttp2_data_provider data; +// if (!request.sBuffer.empty()) +// { +// pData = &data; +// data.source.ptr = (void*)&request.sBuffer; +// data.read_callback = reqbody_read_callback; +// } + +// int32_t sid = nghttp2_submit_request(session->session(), +// NULL, +// &nva[0], +// nva.size(), +// pData, +// NULL); +// if (sid < 0) +// { +// TLOGERROR("encodeHttp2::Fatal error: nghttp2_submit_request return: " << sid << endl); +// return vector(); +// } + +// request.iRequestId = sid; +// nghttp2_session_send(session->session()); + +// // 交给tars发送 +// // std::string out; +// // out.swap(session->sendBuffer()); +// // return out; + +// vector out; + +// out.assign(session->sendBuffer().begin(), session->sendBuffer().end()); + +// return out; +// } + +// ENCODE function, called by network thread +vector ProxyProtocol::http2Request(RequestPacket& request, Transceiver *trans) { + cout << "http2Request" << endl; + // TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(request.iRequestId); + TC_NgHttp2* session = trans->getHttp2Session(); + + assert(session != NULL); + + cout << "http2Request:" << session << endl; + + if (session->getState() == TC_NgHttp2::None) + { + session->Init(); + session->settings(); + } + + assert (session->getState() == TC_NgHttp2::Http2); + + // return encodeHttp2(request, session); + cout << "http2Request1" << endl; + std::vector nva; const std::string method(":method"); @@ -146,6 +244,7 @@ vector encodeHttp2(RequestPacket& request, TC_NgHttp2* session) nva.push_back(nv); } + cout << "http2Request2" << endl; nghttp2_data_provider* pData = NULL; nghttp2_data_provider data; if (!request.sBuffer.empty()) @@ -163,10 +262,11 @@ vector encodeHttp2(RequestPacket& request, TC_NgHttp2* session) NULL); if (sid < 0) { - cerr << "Fatal error: nghttp2_submit_request return " << sid << endl; - return ""; + TLOGERROR("[TARS]http2Request::Fatal error: nghttp2_submit_request return: " << sid << endl); + return vector(); } + cout << "http2Request3" << endl; request.iRequestId = sid; nghttp2_session_send(session->session()); @@ -179,50 +279,39 @@ vector encodeHttp2(RequestPacket& request, TC_NgHttp2* session) out.assign(session->sendBuffer().begin(), session->sendBuffer().end()); + cout << "http2Request4:" << out.data() << endl; return out; } -// ENCODE function, called by network thread -vector http2Request(const RequestPacket& request) +// TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, list& done, void* userptr) +TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in, ResponsePacket& rsp) { - TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(request.iRequestId); - if (session->getState() == TC_NgHttp2::None) - { - session->Init(); - session->settings(); - } + TC_NgHttp2* session = ((Transceiver*)(in.getConnection()))->getHttp2Session(); assert (session->getState() == TC_NgHttp2::Http2); - return encodeHttp2(request, session); -} + auto it = session->doneResponses().begin(); -// TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, list& done, void* userptr) -TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket& done) -{ - auto it = session->_doneResponses.begin(); - - if(it == session->_doneResponses.end()) + if(it == session->doneResponses().end()) { vector buffer = in.getBuffers(); in.clearBuffers(); - Transceiver* userptr = ((Transceiver*))in->getConnection(); - int sessionId = userptr->getAdapterProxy()->getId(); - TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(sessionId); - assert (session->getState() == TC_NgHttp2::Http2); + // Transceiver* userptr = ((Transceiver*))in->getConnection(); + // int sessionId = userptr->getAdapterProxy()->getId(); + // TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(sessionId); - int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.data(), buffer.length()); + int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.data(), buffer.size()); if (readlen < 0) { // throw std::runtime_error("nghttp2_session_mem_recv return error"); - return TC_NetWorkBuffer::PACKET_ERROR; + return TC_NetWorkBuffer::PACKET_ERR; } } - it = session->_doneResponses.begin(); - if(it == session->_doneResponses.end()) + it = session->doneResponses().begin(); + if(it == session->doneResponses().end()) { return TC_NetWorkBuffer::PACKET_LESS; } @@ -231,7 +320,7 @@ TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket rsp.status = it->second.headers; rsp.sBuffer.assign(it->second.body.begin(), it->second.body.end()); - session->_doneResponses.erase(it); + session->doneResponses().erase(it); // std::map::const_iterator it(session->_doneResponses.begin()); // for (; it != session->_doneResponses.end(); ++ it) diff --git a/servant/libservant/Transceiver.cpp b/servant/libservant/Transceiver.cpp index 5f3b321..d9a7c4e 100755 --- a/servant/libservant/Transceiver.cpp +++ b/servant/libservant/Transceiver.cpp @@ -28,7 +28,7 @@ #if TARS_HTTP2 #include "util/tc_nghttp2.h" -#include "util/tc_http2clientmgr.h" +// #include "util/tc_http2clientmgr.h" #endif namespace tars { @@ -163,6 +163,7 @@ void Transceiver::setConnected() _onConnect(); + TLOGTARS("[TARS][tcp setConnected, " << _adapterProxy->getObjProxy()->name() << ",fd:" << _fd << "]" << endl); } @@ -248,7 +249,7 @@ bool Transceiver::sendAuthData(const BasicAuthInfo& info) request.sBuffer.assign(out.begin(), out.end()); // vector toSend; - _sendBuffer->addBuffer(objPrx->getProxyProtocol().requestFunc(request)); + _sendBuffer->addBuffer(objPrx->getProxyProtocol().requestFunc(request, this)); // _sendBuffer.addBuffer(toSend); @@ -276,7 +277,9 @@ void Transceiver::close() #endif #if TARS_HTTP2 - Http2ClientSessionManager::getInstance()->delSession(_adapterProxy->getId()); + // Http2ClientSessionManager::getInstance()->delSession(_adapterProxy->getId()); + nghttp2_session_del(_http2Session->session()); + _http2Session = NULL; #endif _adapterProxy->getObjProxy()->getCommunicatorEpoll()->delFd(_fd,&_fdInfo,EPOLLIN|EPOLLOUT); @@ -300,6 +303,24 @@ void Transceiver::close() TLOGTARS("[TARS][trans close:"<< _adapterProxy->getObjProxy()->name()<< "," << _ep.desc() << "]" << endl); } +#if TARS_HTTP2 +TC_NgHttp2* Transceiver::getHttp2Session() +{ + if(_http2Session == NULL) + { + _http2Session = new TC_NgHttp2(false); + + // if (_http2Session->getState() == TC_NgHttp2::None) + // { + // _http2Session->Init(); + // _http2Session->settings(); + // } + } + + return _http2Session; +} + +#endif // int Transceiver::doRequest() // { // if(!isValid()) diff --git a/servant/servant/AppProtocol.h b/servant/servant/AppProtocol.h index 5883bd9..248e7e1 100644 --- a/servant/servant/AppProtocol.h +++ b/servant/servant/AppProtocol.h @@ -33,6 +33,8 @@ using namespace tup; namespace tars { +class Transceiver; + #define TARS_NET_MIN_PACKAGE_SIZE 5 #define TARS_NET_MAX_PACKAGE_SIZE 1024*1024*10 @@ -131,13 +133,8 @@ public: } }; -typedef std::function(const RequestPacket&)> request_protocol; -/** - * 接收协议处理, 返回值表示解析了多少字节 - * 框架层会自动对处理了包做处理 - */ +typedef std::function(RequestPacket&, Transceiver *)> request_protocol; typedef std::function response_protocol; -// typedef std::function response_ex_protocol; ////////////////////////////////////////////////////////////////////// /** @@ -151,12 +148,23 @@ public: */ ProxyProtocol() : requestFunc(streamRequest) {} +#if TARS_HTTP2 + static vector http1Request(tars::RequestPacket& request, Transceiver *); + static TC_NetWorkBuffer::PACKET_TYPE http1Response(TC_NetWorkBuffer &in, ResponsePacket& done); + + // ENCODE function, called by network thread + static vector http2Request(tars::RequestPacket& request, Transceiver *); + + // DECODE function, called by network thread + static TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket& done); +#endif + /** * 普通二进制请求包 * @param request * @param buff */ - static const vector &streamRequest(const RequestPacket& request) + static vector streamRequest(RequestPacket& request, Transceiver *) { return request.sBuffer; } @@ -245,12 +253,6 @@ public: return streamResponse(in, done); } - /** - * tup响应包(tup的响应会放在ResponsePacket的buffer中) - * @param request - * @param buff - */ - /** * wup响应包(wup的响应会放在ResponsePacket的buffer中) * @param request @@ -294,8 +296,6 @@ public: is.setBuffer(buffer.c_str() + sizeof(tars::Int32), head); - // is.setBuffer(recvBuffer + pos + sizeof(tars::Int32), head); - //tup回来是requestpackage RequestPacket rsp; @@ -363,7 +363,7 @@ public: * @param request * @param buff */ - static vector tarsRequest(const RequestPacket& request); + static vector tarsRequest(RequestPacket& request, Transceiver *); /** * tars响应包解析 @@ -471,19 +471,8 @@ public: request_protocol requestFunc; response_protocol responseFunc; - - // response_ex_protocol responseExFunc; }; -vector http1Request(const tars::RequestPacket& request); -TC_NetWorkBuffer::PACKET_TYPE http1Response(TC_NetWorkBuffer &in, ResponsePacket& done); - -// ENCODE function, called by network thread -vector http2Request(const tars::RequestPacket& request); - -// DECODE function, called by network thread -TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket& done); - ////////////////////////////////////////////////////////////////////// } diff --git a/servant/servant/Transceiver.h b/servant/servant/Transceiver.h index 5f8d2ad..cfbcaa8 100755 --- a/servant/servant/Transceiver.h +++ b/servant/servant/Transceiver.h @@ -33,6 +33,10 @@ namespace tars class TC_OpenSSL; #endif +#if TARS_HTTP2 + class TC_NgHttp2; +#endif + class AdapterProxy; ////////////////////////////////////////////////////////// @@ -221,6 +225,10 @@ public: * 发送鉴权数据 */ bool sendAuthData(const BasicAuthInfo& ); + +#if TARS_HTTP2 + TC_NgHttp2* getHttp2Session(); +#endif protected: /** ** 物理连接成功回调 @@ -272,6 +280,9 @@ protected: std::unique_ptr _openssl; #endif +#if TARS_HTTP2 + TC_NgHttp2* _http2Session = NULL; +#endif /* * 发送buffer */ diff --git a/util/include/util/tc_consistent_hash_new.h b/util/include/util/tc_consistent_hash_new.h index 573797c..95b71b6 100755 --- a/util/include/util/tc_consistent_hash_new.h +++ b/util/include/util/tc_consistent_hash_new.h @@ -26,19 +26,6 @@ using namespace tars; namespace tars { -struct node_T_new -{ - /** - *节点hash值 - */ - int32_t iHashCode; - - /** - *节点下标 - */ - unsigned int iIndex; -}; - enum TC_HashAlgorithmType { E_TC_CONHASH_KETAMAHASH = 0, @@ -55,10 +42,7 @@ public: virtual TC_HashAlgorithmType getHashType() = 0; protected: - int32_t subTo32Bit(int32_t hash) - { - return (hash & 0xFFFFFFFFL); - } + int32_t subTo32Bit(int32_t hash) { return (hash & 0xFFFFFFFFL); } }; @@ -70,23 +54,8 @@ typedef TC_AutoPtr TC_HashAlgorithmPtr; class TC_KetamaHashAlg : public TC_HashAlgorithm { public: - virtual int32_t hash(const string & sKey) - { - string sMd5 = TC_MD5::md5bin(sKey); - const char *p = (const char *) sMd5.c_str(); - - int32_t hash = ((int32_t)(p[3] & 0xFF) << 24) - | ((int32_t)(p[2] & 0xFF) << 16) - | ((int32_t)(p[1] & 0xFF) << 8) - | ((int32_t)(p[0] & 0xFF)); - - return subTo32Bit(hash); - } - - virtual TC_HashAlgorithmType getHashType() - { - return E_TC_CONHASH_KETAMAHASH; - } + virtual int32_t hash(const string & sKey); + virtual TC_HashAlgorithmType getHashType(); }; /** @@ -95,20 +64,8 @@ public: class TC_DefaultHashAlg : public TC_HashAlgorithm { public: - virtual int32_t hash(const string & sKey) - { - string sMd5 = TC_MD5::md5bin(sKey); - const char *p = (const char *) sMd5.c_str(); - - int32_t hash = (*(int*)(p)) ^ (*(int*)(p+4)) ^ (*(int*)(p+8)) ^ (*(int*)(p+12)); - - return subTo32Bit(hash); - } - - virtual TC_HashAlgorithmType getHashType() - { - return E_TC_CONHASH_DEFAULTHASH; - } + virtual int32_t hash(const string & sKey); + virtual TC_HashAlgorithmType getHashType(); }; /** @@ -117,34 +74,7 @@ public: class TC_HashAlgFactory { public: - static TC_HashAlgorithm *getHashAlg() - { - TC_HashAlgorithm *ptrHashAlg = new TC_DefaultHashAlg(); - - return ptrHashAlg; - } - - static TC_HashAlgorithm *getHashAlg(TC_HashAlgorithmType hashType) - { - TC_HashAlgorithm *ptrHashAlg = NULL; - - switch(hashType) - { - case E_TC_CONHASH_KETAMAHASH: - { - ptrHashAlg = new TC_KetamaHashAlg(); - break; - } - case E_TC_CONHASH_DEFAULTHASH: - default: - { - ptrHashAlg = new TC_DefaultHashAlg(); - break; - } - } - - return ptrHashAlg; - } + static TC_HashAlgorithm *getHashAlg(TC_HashAlgorithmType hashType); }; /** @@ -154,93 +84,42 @@ class TC_ConsistentHashNew { public: - /** - * @brief 构造函数 - */ - TC_ConsistentHashNew() + struct node_T_new { - _ptrHashAlg = TC_HashAlgFactory::getHashAlg(); - } + /** + *节点hash值 + */ + int32_t iHashCode; + + /** + *节点下标 + */ + unsigned int iIndex; + }; /** * @brief 构造函数 */ - TC_ConsistentHashNew(TC_HashAlgorithmType hashType) - { - _ptrHashAlg = TC_HashAlgFactory::getHashAlg(hashType); - } + TC_ConsistentHashNew(); /** - * @brief 节点比较. - * - * @param m1 node_T_new类型的对象,比较节点之一 - * @param m2 node_T_new类型的对象,比较节点之一 - * @return less or not 比较结果,less返回ture,否则返回false + * @brief 构造函数 */ - static bool less_hash(const node_T_new & m1, const node_T_new & m2) - { - return m1.iHashCode < m2.iHashCode; - } + TC_ConsistentHashNew(TC_HashAlgorithmType hashType); /** - * @brief 增加节点. + * @brief 排序 * * @param node 节点名称 * @param index 节点的下标值 - * @return 节点的hash值 */ - int sortNode() - { - sort(_vHashList.begin(), _vHashList.end(), less_hash); - - return 0; - } + void sortNode(); /** * @brief 打印节点信息 * */ - void printNode() - { - map mapNode; - size_t size = _vHashList.size(); - - for (size_t i = 0; i < size; i++) - { - if (i == 0) - { - unsigned int value = 0xFFFFFFFF - _vHashList[size - 1].iHashCode + _vHashList[0].iHashCode; - mapNode[_vHashList[0].iIndex] = value; - } - else - { - unsigned int value = _vHashList[i].iHashCode - _vHashList[i - 1].iHashCode; - - if (mapNode.find(_vHashList[i].iIndex) != mapNode.end()) - { - value += mapNode[_vHashList[i].iIndex]; - } - - mapNode[_vHashList[i].iIndex] = value; - } - - cout << "printNode: " << _vHashList[i].iHashCode << "|" << _vHashList[i].iIndex << "|" << mapNode[_vHashList[i].iIndex] << endl; - } - - map::iterator it = mapNode.begin(); - double avg = 100; - double sum = 0; - - while (it != mapNode.end()) - { - double tmp = it->second; - cerr << "result: " << it->first << "|" << it->second << "|" << (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg) << endl; - sum += (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg) * (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg); - it++; - } - - cerr << "variance: " << sum / mapNode.size() << ", size: " << _vHashList.size() << endl; - } + void printNode(); /** * @brief 增加节点. @@ -250,46 +129,7 @@ public: * @param weight 节点的权重,默认为1 * @return 是否成功 */ - int addNode(const string & node, unsigned int index, int weight = 1) - { - if (_ptrHashAlg.get() == NULL) - { - return -1; - } - - node_T_new stItem; - stItem.iIndex = index; - - for (int j = 0; j < weight; j++) - { - string virtualNode = node + "_" + TC_Common::tostr(j); - - // TODO: 目前写了2 种hash 算法,可以根据需要选择一种, - // TODO: 其中KEMATA 为参考memcached client 的hash 算法,default 为原有的hash 算法,测试结论在表格里有 - if (_ptrHashAlg->getHashType() == E_TC_CONHASH_KETAMAHASH) - { - string sMd5 = TC_MD5::md5bin(virtualNode); - char *p = (char *) sMd5.c_str(); - - for (int i = 0; i < 4; i++) - { - stItem.iHashCode = ((int32_t)(p[i * 4 + 3] & 0xFF) << 24) - | ((int32_t)(p[i * 4 + 2] & 0xFF) << 16) - | ((int32_t)(p[i * 4 + 1] & 0xFF) << 8) - | ((int32_t)(p[i * 4 + 0] & 0xFF)); - stItem.iIndex = index; - _vHashList.push_back(stItem); - } - } - else - { - stItem.iHashCode = _ptrHashAlg->hash(virtualNode); - _vHashList.push_back(stItem); - } - } - - return 0; - } + int addNode(const string & node, unsigned int index, int weight = 1); /** * @brief 获取某key对应到的节点node的下标. @@ -298,18 +138,7 @@ public: * @param iIndex 对应到的节点下标 * @return 0:获取成功 -1:没有被添加的节点 */ - int getIndex(const string & key, unsigned int & iIndex) - { - if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0) - { - iIndex = 0; - return -1; - } - - int32_t iCode = _ptrHashAlg->hash(TC_MD5::md5bin(key)); - - return getIndex(iCode, iIndex); - } + int getIndex(const string & key, unsigned int & iIndex); /** * @brief 获取某hashcode对应到的节点node的下标. @@ -318,60 +147,20 @@ public: * @param iIndex 对应到的节点下标 * @return 0:获取成功 -1:没有被添加的节点 */ - int getIndex(int32_t hashcode, unsigned int & iIndex) - { - if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0) - { - iIndex = 0; - return -1; - } - - // 只保留32位 - size_t iCode = (hashcode & 0xFFFFFFFFL); - - int low = 0; - size_t high = _vHashList.size(); - - if(iCode <= _vHashList[0].iHashCode || iCode > _vHashList[high-1].iHashCode) - { - iIndex = _vHashList[0].iIndex; - return 0; - } - - while (low < high - 1) - { - int mid = (low + high) / 2; - if (_vHashList[mid].iHashCode > iCode) - { - high = mid; - } - else - { - low = mid; - } - } - iIndex = _vHashList[low+1].iIndex; - return 0; - } + int getIndex(int32_t hashcode, unsigned int & iIndex); /** * @brief 获取当前hash列表的长度. * * @return 长度值 */ - size_t size() - { - return _vHashList.size(); - } + size_t size() { return _vHashList.size(); } /** * @brief 清空当前的hash列表. * */ - void clear() - { - _vHashList.clear(); - } + void clear() { _vHashList.clear(); } protected: vector _vHashList; diff --git a/util/include/util/tc_nghttp2.h b/util/include/util/tc_nghttp2.h index fedc82f..055fc37 100644 --- a/util/include/util/tc_nghttp2.h +++ b/util/include/util/tc_nghttp2.h @@ -27,8 +27,8 @@ namespace tars { -struct RequestPacket; -struct ResponsePacket; +// struct RequestPacket; +// struct ResponsePacket; enum ResponseState { @@ -154,6 +154,10 @@ public: */ nghttp2_session* session() const; + /** + * @brief 响应包 + */ + std::map &doneResponses() { return _doneResponses; } private: /** * session diff --git a/util/src/tc_consistent_hash_new.cpp b/util/src/tc_consistent_hash_new.cpp new file mode 100755 index 0000000..6211395 --- /dev/null +++ b/util/src/tc_consistent_hash_new.cpp @@ -0,0 +1,241 @@ +/** + * Tencent is pleased to support the open source community by making Tars available. + * + * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +#include "util/tc_consistent_hash_new.h" +// #include "util/tc_autoptr.h" +// #include "util/tc_hash_fun.h" + +using namespace tars; + +namespace tars +{ + +int32_t TC_KetamaHashAlg::hash(const string & sKey) +{ + string sMd5 = TC_MD5::md5bin(sKey); + const char *p = (const char *) sMd5.c_str(); + + int32_t hash = ((int32_t)(p[3] & 0xFF) << 24) + | ((int32_t)(p[2] & 0xFF) << 16) + | ((int32_t)(p[1] & 0xFF) << 8) + | ((int32_t)(p[0] & 0xFF)); + + return subTo32Bit(hash); +} + +TC_HashAlgorithmType TC_KetamaHashAlg::getHashType() +{ + return E_TC_CONHASH_KETAMAHASH; +} + +int32_t TC_DefaultHashAlg::hash(const string & sKey) +{ + string sMd5 = TC_MD5::md5bin(sKey); + const char *p = (const char *) sMd5.c_str(); + + int32_t hash = (*(int*)(p)) ^ (*(int*)(p+4)) ^ (*(int*)(p+8)) ^ (*(int*)(p+12)); + + return subTo32Bit(hash); +} + +TC_HashAlgorithmType TC_DefaultHashAlg::getHashType() +{ + return E_TC_CONHASH_DEFAULTHASH; +} + +TC_HashAlgorithm *TC_HashAlgFactory::getHashAlg(TC_HashAlgorithmType hashType) +{ + TC_HashAlgorithm *ptrHashAlg = NULL; + + switch(hashType) + { + case E_TC_CONHASH_KETAMAHASH: + { + ptrHashAlg = new TC_KetamaHashAlg(); + break; + } + case E_TC_CONHASH_DEFAULTHASH: + default: + { + ptrHashAlg = new TC_DefaultHashAlg(); + break; + } + } + + return ptrHashAlg; +} + +TC_ConsistentHashNew::TC_ConsistentHashNew() +{ + _ptrHashAlg = TC_HashAlgFactory::getHashAlg(E_TC_CONHASH_DEFAULTHASH); +} + +TC_ConsistentHashNew::TC_ConsistentHashNew(TC_HashAlgorithmType hashType) +{ + _ptrHashAlg = TC_HashAlgFactory::getHashAlg(hashType); +} + +/** + * @brief 节点比较. + * + * @param m1 node_T_new类型的对象,比较节点之一 + * @param m2 node_T_new类型的对象,比较节点之一 + * @return less or not 比较结果,less返回ture,否则返回false + */ +static bool less_hash(const TC_ConsistentHashNew::node_T_new & m1, const TC_ConsistentHashNew::node_T_new & m2) +{ + return m1.iHashCode < m2.iHashCode; +} + +void TC_ConsistentHashNew::sortNode() +{ + sort(_vHashList.begin(), _vHashList.end(), less_hash); +} + +void TC_ConsistentHashNew::printNode() +{ + map mapNode; + size_t size = _vHashList.size(); + + for (size_t i = 0; i < size; i++) + { + if (i == 0) + { + unsigned int value = 0xFFFFFFFF - _vHashList[size - 1].iHashCode + _vHashList[0].iHashCode; + mapNode[_vHashList[0].iIndex] = value; + } + else + { + unsigned int value = _vHashList[i].iHashCode - _vHashList[i - 1].iHashCode; + + if (mapNode.find(_vHashList[i].iIndex) != mapNode.end()) + { + value += mapNode[_vHashList[i].iIndex]; + } + + mapNode[_vHashList[i].iIndex] = value; + } + + cout << "printNode: " << _vHashList[i].iHashCode << "|" << _vHashList[i].iIndex << "|" << mapNode[_vHashList[i].iIndex] << endl; + } + + map::iterator it = mapNode.begin(); + double avg = 100; + double sum = 0; + + while (it != mapNode.end()) + { + double tmp = it->second; + cerr << "result: " << it->first << "|" << it->second << "|" << (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg) << endl; + sum += (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg) * (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg); + it++; + } + + cerr << "variance: " << sum / mapNode.size() << ", size: " << _vHashList.size() << endl; +} + +int TC_ConsistentHashNew::addNode(const string & node, unsigned int index, int weight) +{ + if (_ptrHashAlg.get() == NULL) + { + return -1; + } + + node_T_new stItem; + stItem.iIndex = index; + + for (int j = 0; j < weight; j++) + { + string virtualNode = node + "_" + TC_Common::tostr(j); + + // TODO: 目前写了2 种hash 算法,可以根据需要选择一种, + // TODO: 其中KEMATA 为参考memcached client 的hash 算法,default 为原有的hash 算法,测试结论在表格里有 + if (_ptrHashAlg->getHashType() == E_TC_CONHASH_KETAMAHASH) + { + string sMd5 = TC_MD5::md5bin(virtualNode); + char *p = (char *) sMd5.c_str(); + + for (int i = 0; i < 4; i++) + { + stItem.iHashCode = ((int32_t)(p[i * 4 + 3] & 0xFF) << 24) + | ((int32_t)(p[i * 4 + 2] & 0xFF) << 16) + | ((int32_t)(p[i * 4 + 1] & 0xFF) << 8) + | ((int32_t)(p[i * 4 + 0] & 0xFF)); + stItem.iIndex = index; + _vHashList.push_back(stItem); + } + } + else + { + stItem.iHashCode = _ptrHashAlg->hash(virtualNode); + _vHashList.push_back(stItem); + } + } + + return 0; +} + +int TC_ConsistentHashNew::getIndex(const string & key, unsigned int & iIndex) +{ + if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0) + { + iIndex = 0; + return -1; + } + + int32_t iCode = _ptrHashAlg->hash(TC_MD5::md5bin(key)); + + return getIndex(iCode, iIndex); +} + + +int TC_ConsistentHashNew::getIndex(int32_t hashcode, unsigned int & iIndex) +{ + if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0) + { + iIndex = 0; + return -1; + } + + // 只保留32位 + size_t iCode = (hashcode & 0xFFFFFFFFL); + + int low = 0; + size_t high = _vHashList.size(); + + if(iCode <= _vHashList[0].iHashCode || iCode > _vHashList[high-1].iHashCode) + { + iIndex = _vHashList[0].iIndex; + return 0; + } + + while (low < high - 1) + { + int mid = (low + high) / 2; + if (_vHashList[mid].iHashCode > iCode) + { + high = mid; + } + else + { + low = mid; + } + } + iIndex = _vHashList[low+1].iIndex; + return 0; +} + +} diff --git a/util/src/tc_http.cpp b/util/src/tc_http.cpp index a921315..3ef25ad 100755 --- a/util/src/tc_http.cpp +++ b/util/src/tc_http.cpp @@ -1532,20 +1532,20 @@ void TC_HttpRequest::encode(int iRequestType, ostream &os) void TC_HttpRequest::setRequest(const string& method, const string &sUrl, const std::string& body, bool bNewCreateHost) { - std::string lowMethod(method); - std::transform(method.begin(), method.end(), lowMethod.begin(), ::tolower); + // std::string lowMethod(method); + // std::transform(method.begin(), method.end(), lowMethod.begin(), ::tolower); - if (lowMethod == "get") + if (TC_Port::strncasecmp(method.c_str(), "GET", 3) == 0) setGetRequest(sUrl, bNewCreateHost); - else if (lowMethod == "head") + else if (TC_Port::strncasecmp(method.c_str(), "HEAD", 4) == 0) setHeadRequest(sUrl, bNewCreateHost); - else if (lowMethod == "post") + else if (TC_Port::strncasecmp(method.c_str(), "POST", 4) == 0) setPostRequest(sUrl, body, bNewCreateHost); - else if (lowMethod == "put") + else if (TC_Port::strncasecmp(method.c_str(), "PUT", 3) == 0) setPutRequest(sUrl, body, bNewCreateHost); - else if (lowMethod == "delete") + else if (TC_Port::strncasecmp(method.c_str(), "DELETE", 6) == 0) setDeleteRequest(sUrl, body, bNewCreateHost); - else if (lowMethod == "patch") + else if (TC_Port::strncasecmp(method.c_str(), "PATH", 5) == 0) setPatchRequest(sUrl, body, bNewCreateHost); } diff --git a/util/src/tc_nghttp2.cpp b/util/src/tc_nghttp2.cpp index 38e511e..f5040df 100644 --- a/util/src/tc_nghttp2.cpp +++ b/util/src/tc_nghttp2.cpp @@ -18,10 +18,11 @@ #include #include +#include #include "nghttp2/nghttp2.h" #include "util/tc_nghttp2.h" -#include "util/tc_http2clientmgr.h" +// #include "util/tc_http2clientmgr.h" #include "util/tc_base64.h"