http rpc call support

This commit is contained in:
jarodruan 2020-02-09 17:03:32 +08:00
parent 30723faaf5
commit bef3665f03
14 changed files with 490 additions and 341 deletions

View File

@ -26,7 +26,7 @@ set(TARS_VERSION "2.0.0")
add_definitions(-DTARS_VERSION="${TARS_VERSION}")
set(TARS_SSL 0)
add_definitions(-DTARS_SSL=${TARS_SSL})
set(TARS_HTTP2 0)
set(TARS_HTTP2 1)
add_definitions(-DTARS_HTTP2=${TARS_HTTP2})
set(_USE_OPENTRACKING $ENV{_USE_OPENTRACKING})

View File

@ -55,7 +55,7 @@ static TC_NetWorkBuffer::PACKET_TYPE customResponse(TC_NetWorkBuffer &in, Respon
/*
Whole package length (4 bytes) + irequestid (4 bytes) + package content
*/
static vector<char> customRequest(const RequestPacket& request)
static vector<char> customRequest(RequestPacket& request, Transceiver *)
{
unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);

View File

@ -153,7 +153,8 @@ void syncRpc(int c)
int64_t t = TC_Common::now2us();
std::map<std::string, std::string> header;
header["X-Test"] = "YYYY";
// header[":authority"] = "domain.com";
// header[":scheme"] = "http";
std::map<std::string, std::string> rheader;
//发起远程调用
@ -239,8 +240,11 @@ int main(int argc, char *argv[])
param.servantPrx->tars_async_timeout(60*1000);
ProxyProtocol proto;
proto.requestFunc = tars::http2Request;
proto.responseFunc = tars::http2Response;
proto.requestFunc = ProxyProtocol::http1Request;
proto.responseFunc = ProxyProtocol::http1Response;
// proto.requestFunc = ProxyProtocol::http2Request;
// proto.responseFunc = ProxyProtocol::http2Response;
param.servantPrx->tars_set_protocol(proto);
int64_t start = TC_Common::now2us();

View File

@ -55,7 +55,7 @@ static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, Response
4+iRequestId4+
*/
static vector<char> pushRequest(const RequestPacket& request)
static vector<char> pushRequest(RequestPacket& request, Transceiver *)
{
unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);

View File

@ -25,7 +25,7 @@
#include "servant/StatF.h"
#include "servant/StatReport.h"
#include "util/tc_nghttp2.h"
#include "util/tc_http2clientmgr.h"
// #include "util/tc_http2clientmgr.h"
#ifdef _USE_OPENTRACKING
#include "servant/text_map_carrier.h"
#endif
@ -158,18 +158,18 @@ int AdapterProxy::invoke(ReqMessage * msg)
msg->request.iRequestId = _timeoutQueue->generateId();
}
#if TARS_HTTP2
if (getObjProxy()->getProtoName() == HTTP2)
{
msg->request.iRequestId = getId(); // session Id
}
#endif
// #if TARS_HTTP2
// if (getObjProxy()->getProtoName() == HTTP2)
// {
// msg->request.iRequestId = getId(); // session Id
// }
// #endif
#ifdef _USE_OPENTRACKING
startTrack(msg);
#endif
msg->sReqData->addBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request));
msg->sReqData->addBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get()));
//交给连接发送数据,连接连上,buffer不为空,直接发送数据成功
if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData) != Transceiver::eRetError)

View File

@ -15,27 +15,16 @@
*/
#include "util/tc_epoll_server.h"
#include "util/tc_http.h"
#include "servant/AppProtocol.h"
#include "servant/Transceiver.h"
#include "servant/TarsLogger.h"
#include "tup/Tars.h"
#include <iostream>
#if TARS_HTTP2
#include "util/tc_nghttp2.h"
#include "util/tc_http2clientmgr.h"
#define MAKE_NV(NAME, VALUE, VALUELEN) \
{ \
(uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, VALUELEN, \
NGHTTP2_NV_FLAG_NONE \
}
#define MAKE_NV2(NAME, VALUE) \
{ \
(uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1, \
NGHTTP2_NV_FLAG_NONE \
}
#define MAKE_STRING_NV(NAME, VALUE) {(uint8_t*)(NAME.data()), (uint8_t*)(VALUE.data()), NAME.size(), VALUE.size(), NGHTTP2_NV_FLAG_NONE};
// #include "util/tc_http2clientmgr.h"
#endif
namespace tars
@ -47,7 +36,7 @@ TC_NetWorkBuffer::PACKET_TYPE AppProtocol::parseAdmin(TC_NetWorkBuffer &in, sha
return parse(in, out->getBuffer());
}
vector<char> ProxyProtocol::tarsRequest(const RequestPacket& request)
vector<char> ProxyProtocol::tarsRequest(RequestPacket& request, Transceiver *)
{
TarsOutputStream<BufferWriterVector> os;
@ -73,6 +62,21 @@ vector<char> ProxyProtocol::tarsRequest(const RequestPacket& request)
////////////////////////////////////////////////////////////////////////////////////
#if TARS_HTTP2
#define MAKE_NV(NAME, VALUE, VALUELEN) \
{ \
(uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, VALUELEN, \
NGHTTP2_NV_FLAG_NONE \
}
#define MAKE_NV2(NAME, VALUE) \
{ \
(uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1, \
NGHTTP2_NV_FLAG_NONE \
}
#define MAKE_STRING_NV(NAME, VALUE) {(uint8_t*)(NAME.data()), (uint8_t*)(VALUE.data()), NAME.size(), VALUE.size(), NGHTTP2_NV_FLAG_NONE};
// nghttp2读取请求包体准备发送
static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length,
@ -97,14 +101,29 @@ static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id
return len;
}
TC_NetWorkBuffer::PACKET_TYPE http1Response(TC_NetWorkBuffer &in, tars::ResponsePacket& rsp)
vector<char> ProxyProtocol::http1Request(tars::RequestPacket& request, Transceiver *)
{
TC_HttpRequest httpRequest;
httpRequest.setRequest(request.sFuncName, request.sServantName, string(request.sBuffer.data(), request.sBuffer.size()), true);
vector<char> buffer;
httpRequest.encode(buffer);
return buffer;
}
TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http1Response(TC_NetWorkBuffer &in, ResponsePacket& rsp)
{
TC_NetWorkBuffer::PACKET_TYPE flag = in.checkHttp();
if(flag == TC_NetWorkBuffer::PACKET_FULL)
{
tars::TC_HttpResponse httpRsp;
httpRsp.decode(in.getBuffers());
TC_HttpResponse httpRsp;
vector<char> buffer = in.getBuffers();
httpRsp.decode(buffer.data(), buffer.size());
// ResponsePacket rsp;
rsp.status["status"] = httpRsp.getResponseHeaderLine();
@ -123,8 +142,87 @@ TC_NetWorkBuffer::PACKET_TYPE http1Response(TC_NetWorkBuffer &in, tars::Response
// return httpRsp.getHeadLength() + httpRsp.getContentLength();
}
vector<char> encodeHttp2(RequestPacket& request, TC_NgHttp2* session)
// vector<char> encodeHttp2(RequestPacket& request, TC_NgHttp2* session)
// {
// std::vector<nghttp2_nv> nva;
// const std::string method(":method");
// nghttp2_nv nv1 = MAKE_STRING_NV(method, request.sFuncName);
// if (!request.sFuncName.empty())
// nva.push_back(nv1);
// const std::string path(":path");
// nghttp2_nv nv2 = MAKE_STRING_NV(path, request.sServantName);
// if (!request.sServantName.empty())
// nva.push_back(nv2);
// for (std::map<std::string, std::string>::const_iterator
// it(request.context.begin());
// it != request.context.end();
// ++ it)
// {
// nghttp2_nv nv = MAKE_STRING_NV(it->first, it->second);
// nva.push_back(nv);
// }
// nghttp2_data_provider* pData = NULL;
// nghttp2_data_provider data;
// if (!request.sBuffer.empty())
// {
// pData = &data;
// data.source.ptr = (void*)&request.sBuffer;
// data.read_callback = reqbody_read_callback;
// }
// int32_t sid = nghttp2_submit_request(session->session(),
// NULL,
// &nva[0],
// nva.size(),
// pData,
// NULL);
// if (sid < 0)
// {
// TLOGERROR("encodeHttp2::Fatal error: nghttp2_submit_request return: " << sid << endl);
// return vector<char>();
// }
// request.iRequestId = sid;
// nghttp2_session_send(session->session());
// // 交给tars发送
// // std::string out;
// // out.swap(session->sendBuffer());
// // return out;
// vector<char> out;
// out.assign(session->sendBuffer().begin(), session->sendBuffer().end());
// return out;
// }
// ENCODE function, called by network thread
vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *trans)
{
cout << "http2Request" << endl;
// TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(request.iRequestId);
TC_NgHttp2* session = trans->getHttp2Session();
assert(session != NULL);
cout << "http2Request:" << session << endl;
if (session->getState() == TC_NgHttp2::None)
{
session->Init();
session->settings();
}
assert (session->getState() == TC_NgHttp2::Http2);
// return encodeHttp2(request, session);
cout << "http2Request1" << endl;
std::vector<nghttp2_nv> nva;
const std::string method(":method");
@ -146,6 +244,7 @@ vector<char> encodeHttp2(RequestPacket& request, TC_NgHttp2* session)
nva.push_back(nv);
}
cout << "http2Request2" << endl;
nghttp2_data_provider* pData = NULL;
nghttp2_data_provider data;
if (!request.sBuffer.empty())
@ -163,10 +262,11 @@ vector<char> encodeHttp2(RequestPacket& request, TC_NgHttp2* session)
NULL);
if (sid < 0)
{
cerr << "Fatal error: nghttp2_submit_request return " << sid << endl;
return "";
TLOGERROR("[TARS]http2Request::Fatal error: nghttp2_submit_request return: " << sid << endl);
return vector<char>();
}
cout << "http2Request3" << endl;
request.iRequestId = sid;
nghttp2_session_send(session->session());
@ -179,50 +279,39 @@ vector<char> encodeHttp2(RequestPacket& request, TC_NgHttp2* session)
out.assign(session->sendBuffer().begin(), session->sendBuffer().end());
cout << "http2Request4:" << out.data() << endl;
return out;
}
// ENCODE function, called by network thread
vector<char> http2Request(const RequestPacket& request)
// TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, list<ResponsePacket>& done, void* userptr)
TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in, ResponsePacket& rsp)
{
TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(request.iRequestId);
if (session->getState() == TC_NgHttp2::None)
{
session->Init();
session->settings();
}
TC_NgHttp2* session = ((Transceiver*)(in.getConnection()))->getHttp2Session();
assert (session->getState() == TC_NgHttp2::Http2);
return encodeHttp2(request, session);
}
auto it = session->doneResponses().begin();
// TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, list<ResponsePacket>& done, void* userptr)
TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket& done)
{
auto it = session->_doneResponses.begin();
if(it == session->_doneResponses.end())
if(it == session->doneResponses().end())
{
vector<char> buffer = in.getBuffers();
in.clearBuffers();
Transceiver* userptr = ((Transceiver*))in->getConnection();
int sessionId = userptr->getAdapterProxy()->getId();
TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(sessionId);
assert (session->getState() == TC_NgHttp2::Http2);
// Transceiver* userptr = ((Transceiver*))in->getConnection();
// int sessionId = userptr->getAdapterProxy()->getId();
// TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(sessionId);
int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.data(), buffer.length());
int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.data(), buffer.size());
if (readlen < 0)
{
// throw std::runtime_error("nghttp2_session_mem_recv return error");
return TC_NetWorkBuffer::PACKET_ERROR;
return TC_NetWorkBuffer::PACKET_ERR;
}
}
it = session->_doneResponses.begin();
if(it == session->_doneResponses.end())
it = session->doneResponses().begin();
if(it == session->doneResponses().end())
{
return TC_NetWorkBuffer::PACKET_LESS;
}
@ -231,7 +320,7 @@ TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket
rsp.status = it->second.headers;
rsp.sBuffer.assign(it->second.body.begin(), it->second.body.end());
session->_doneResponses.erase(it);
session->doneResponses().erase(it);
// std::map<int, Http2Response>::const_iterator it(session->_doneResponses.begin());
// for (; it != session->_doneResponses.end(); ++ it)

View File

@ -28,7 +28,7 @@
#if TARS_HTTP2
#include "util/tc_nghttp2.h"
#include "util/tc_http2clientmgr.h"
// #include "util/tc_http2clientmgr.h"
#endif
namespace tars
{
@ -163,6 +163,7 @@ void Transceiver::setConnected()
_onConnect();
TLOGTARS("[TARS][tcp setConnected, " << _adapterProxy->getObjProxy()->name() << ",fd:" << _fd << "]" << endl);
}
@ -248,7 +249,7 @@ bool Transceiver::sendAuthData(const BasicAuthInfo& info)
request.sBuffer.assign(out.begin(), out.end());
// vector<char> toSend;
_sendBuffer->addBuffer(objPrx->getProxyProtocol().requestFunc(request));
_sendBuffer->addBuffer(objPrx->getProxyProtocol().requestFunc(request, this));
// _sendBuffer.addBuffer(toSend);
@ -276,7 +277,9 @@ void Transceiver::close()
#endif
#if TARS_HTTP2
Http2ClientSessionManager::getInstance()->delSession(_adapterProxy->getId());
// Http2ClientSessionManager::getInstance()->delSession(_adapterProxy->getId());
nghttp2_session_del(_http2Session->session());
_http2Session = NULL;
#endif
_adapterProxy->getObjProxy()->getCommunicatorEpoll()->delFd(_fd,&_fdInfo,EPOLLIN|EPOLLOUT);
@ -300,6 +303,24 @@ void Transceiver::close()
TLOGTARS("[TARS][trans close:"<< _adapterProxy->getObjProxy()->name()<< "," << _ep.desc() << "]" << endl);
}
#if TARS_HTTP2
TC_NgHttp2* Transceiver::getHttp2Session()
{
if(_http2Session == NULL)
{
_http2Session = new TC_NgHttp2(false);
// if (_http2Session->getState() == TC_NgHttp2::None)
// {
// _http2Session->Init();
// _http2Session->settings();
// }
}
return _http2Session;
}
#endif
// int Transceiver::doRequest()
// {
// if(!isValid())

View File

@ -33,6 +33,8 @@ using namespace tup;
namespace tars
{
class Transceiver;
#define TARS_NET_MIN_PACKAGE_SIZE 5
#define TARS_NET_MAX_PACKAGE_SIZE 1024*1024*10
@ -131,13 +133,8 @@ public:
}
};
typedef std::function<vector<char>(const RequestPacket&)> request_protocol;
/**
* ,
*
*/
typedef std::function<vector<char>(RequestPacket&, Transceiver *)> request_protocol;
typedef std::function<TC_NetWorkBuffer::PACKET_TYPE(TC_NetWorkBuffer&, ResponsePacket&)> response_protocol;
// typedef std::function<TC_NetWorkBuffer::PACKET_TYPE(TC_NetWorkBuffer&, ResponsePacket&)> response_ex_protocol;
//////////////////////////////////////////////////////////////////////
/**
@ -151,12 +148,23 @@ public:
*/
ProxyProtocol() : requestFunc(streamRequest) {}
#if TARS_HTTP2
static vector<char> http1Request(tars::RequestPacket& request, Transceiver *);
static TC_NetWorkBuffer::PACKET_TYPE http1Response(TC_NetWorkBuffer &in, ResponsePacket& done);
// ENCODE function, called by network thread
static vector<char> http2Request(tars::RequestPacket& request, Transceiver *);
// DECODE function, called by network thread
static TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket& done);
#endif
/**
*
* @param request
* @param buff
*/
static const vector<char> &streamRequest(const RequestPacket& request)
static vector<char> streamRequest(RequestPacket& request, Transceiver *)
{
return request.sBuffer;
}
@ -245,12 +253,6 @@ public:
return streamResponse<offset, T, netorder, idOffset, K, idNetorder, TARS_NET_MAX_PACKAGE_SIZE>(in, done);
}
/**
* tup响应包(tup的响应会放在ResponsePacket的buffer中)
* @param request
* @param buff
*/
/**
* wup响应包(wup的响应会放在ResponsePacket的buffer中)
* @param request
@ -294,8 +296,6 @@ public:
is.setBuffer(buffer.c_str() + sizeof(tars::Int32), head);
// is.setBuffer(recvBuffer + pos + sizeof(tars::Int32), head);
//tup回来是requestpackage
RequestPacket rsp;
@ -363,7 +363,7 @@ public:
* @param request
* @param buff
*/
static vector<char> tarsRequest(const RequestPacket& request);
static vector<char> tarsRequest(RequestPacket& request, Transceiver *);
/**
* tars响应包解析
@ -471,19 +471,8 @@ public:
request_protocol requestFunc;
response_protocol responseFunc;
// response_ex_protocol responseExFunc;
};
vector<char> http1Request(const tars::RequestPacket& request);
TC_NetWorkBuffer::PACKET_TYPE http1Response(TC_NetWorkBuffer &in, ResponsePacket& done);
// ENCODE function, called by network thread
vector<char> http2Request(const tars::RequestPacket& request);
// DECODE function, called by network thread
TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket& done);
//////////////////////////////////////////////////////////////////////
}

View File

@ -33,6 +33,10 @@ namespace tars
class TC_OpenSSL;
#endif
#if TARS_HTTP2
class TC_NgHttp2;
#endif
class AdapterProxy;
//////////////////////////////////////////////////////////
@ -221,6 +225,10 @@ public:
*
*/
bool sendAuthData(const BasicAuthInfo& );
#if TARS_HTTP2
TC_NgHttp2* getHttp2Session();
#endif
protected:
/**
**
@ -272,6 +280,9 @@ protected:
std::unique_ptr<TC_OpenSSL> _openssl;
#endif
#if TARS_HTTP2
TC_NgHttp2* _http2Session = NULL;
#endif
/*
* buffer
*/

View File

@ -26,19 +26,6 @@ using namespace tars;
namespace tars
{
struct node_T_new
{
/**
*hash值
*/
int32_t iHashCode;
/**
*
*/
unsigned int iIndex;
};
enum TC_HashAlgorithmType
{
E_TC_CONHASH_KETAMAHASH = 0,
@ -55,10 +42,7 @@ public:
virtual TC_HashAlgorithmType getHashType() = 0;
protected:
int32_t subTo32Bit(int32_t hash)
{
return (hash & 0xFFFFFFFFL);
}
int32_t subTo32Bit(int32_t hash) { return (hash & 0xFFFFFFFFL); }
};
@ -70,23 +54,8 @@ typedef TC_AutoPtr<TC_HashAlgorithm> TC_HashAlgorithmPtr;
class TC_KetamaHashAlg : public TC_HashAlgorithm
{
public:
virtual int32_t hash(const string & sKey)
{
string sMd5 = TC_MD5::md5bin(sKey);
const char *p = (const char *) sMd5.c_str();
int32_t hash = ((int32_t)(p[3] & 0xFF) << 24)
| ((int32_t)(p[2] & 0xFF) << 16)
| ((int32_t)(p[1] & 0xFF) << 8)
| ((int32_t)(p[0] & 0xFF));
return subTo32Bit(hash);
}
virtual TC_HashAlgorithmType getHashType()
{
return E_TC_CONHASH_KETAMAHASH;
}
virtual int32_t hash(const string & sKey);
virtual TC_HashAlgorithmType getHashType();
};
/**
@ -95,20 +64,8 @@ public:
class TC_DefaultHashAlg : public TC_HashAlgorithm
{
public:
virtual int32_t hash(const string & sKey)
{
string sMd5 = TC_MD5::md5bin(sKey);
const char *p = (const char *) sMd5.c_str();
int32_t hash = (*(int*)(p)) ^ (*(int*)(p+4)) ^ (*(int*)(p+8)) ^ (*(int*)(p+12));
return subTo32Bit(hash);
}
virtual TC_HashAlgorithmType getHashType()
{
return E_TC_CONHASH_DEFAULTHASH;
}
virtual int32_t hash(const string & sKey);
virtual TC_HashAlgorithmType getHashType();
};
/**
@ -117,34 +74,7 @@ public:
class TC_HashAlgFactory
{
public:
static TC_HashAlgorithm *getHashAlg()
{
TC_HashAlgorithm *ptrHashAlg = new TC_DefaultHashAlg();
return ptrHashAlg;
}
static TC_HashAlgorithm *getHashAlg(TC_HashAlgorithmType hashType)
{
TC_HashAlgorithm *ptrHashAlg = NULL;
switch(hashType)
{
case E_TC_CONHASH_KETAMAHASH:
{
ptrHashAlg = new TC_KetamaHashAlg();
break;
}
case E_TC_CONHASH_DEFAULTHASH:
default:
{
ptrHashAlg = new TC_DefaultHashAlg();
break;
}
}
return ptrHashAlg;
}
static TC_HashAlgorithm *getHashAlg(TC_HashAlgorithmType hashType);
};
/**
@ -154,93 +84,42 @@ class TC_ConsistentHashNew
{
public:
/**
* @brief
*/
TC_ConsistentHashNew()
struct node_T_new
{
_ptrHashAlg = TC_HashAlgFactory::getHashAlg();
}
/**
*hash值
*/
int32_t iHashCode;
/**
*
*/
unsigned int iIndex;
};
/**
* @brief
*/
TC_ConsistentHashNew(TC_HashAlgorithmType hashType)
{
_ptrHashAlg = TC_HashAlgFactory::getHashAlg(hashType);
}
TC_ConsistentHashNew();
/**
* @brief .
*
* @param m1 node_T_new类型的对象
* @param m2 node_T_new类型的对象
* @return less or not less返回turefalse
* @brief
*/
static bool less_hash(const node_T_new & m1, const node_T_new & m2)
{
return m1.iHashCode < m2.iHashCode;
}
TC_ConsistentHashNew(TC_HashAlgorithmType hashType);
/**
* @brief .
* @brief
*
* @param node
* @param index
* @return hash值
*/
int sortNode()
{
sort(_vHashList.begin(), _vHashList.end(), less_hash);
return 0;
}
void sortNode();
/**
* @brief
*
*/
void printNode()
{
map<unsigned int, unsigned int> mapNode;
size_t size = _vHashList.size();
for (size_t i = 0; i < size; i++)
{
if (i == 0)
{
unsigned int value = 0xFFFFFFFF - _vHashList[size - 1].iHashCode + _vHashList[0].iHashCode;
mapNode[_vHashList[0].iIndex] = value;
}
else
{
unsigned int value = _vHashList[i].iHashCode - _vHashList[i - 1].iHashCode;
if (mapNode.find(_vHashList[i].iIndex) != mapNode.end())
{
value += mapNode[_vHashList[i].iIndex];
}
mapNode[_vHashList[i].iIndex] = value;
}
cout << "printNode: " << _vHashList[i].iHashCode << "|" << _vHashList[i].iIndex << "|" << mapNode[_vHashList[i].iIndex] << endl;
}
map<unsigned int, unsigned int>::iterator it = mapNode.begin();
double avg = 100;
double sum = 0;
while (it != mapNode.end())
{
double tmp = it->second;
cerr << "result: " << it->first << "|" << it->second << "|" << (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg) << endl;
sum += (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg) * (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg);
it++;
}
cerr << "variance: " << sum / mapNode.size() << ", size: " << _vHashList.size() << endl;
}
void printNode();
/**
* @brief .
@ -250,46 +129,7 @@ public:
* @param weight 1
* @return
*/
int addNode(const string & node, unsigned int index, int weight = 1)
{
if (_ptrHashAlg.get() == NULL)
{
return -1;
}
node_T_new stItem;
stItem.iIndex = index;
for (int j = 0; j < weight; j++)
{
string virtualNode = node + "_" + TC_Common::tostr<int>(j);
// TODO: 目前写了2 种hash 算法,可以根据需要选择一种,
// TODO: 其中KEMATA 为参考memcached client 的hash 算法default 为原有的hash 算法,测试结论在表格里有
if (_ptrHashAlg->getHashType() == E_TC_CONHASH_KETAMAHASH)
{
string sMd5 = TC_MD5::md5bin(virtualNode);
char *p = (char *) sMd5.c_str();
for (int i = 0; i < 4; i++)
{
stItem.iHashCode = ((int32_t)(p[i * 4 + 3] & 0xFF) << 24)
| ((int32_t)(p[i * 4 + 2] & 0xFF) << 16)
| ((int32_t)(p[i * 4 + 1] & 0xFF) << 8)
| ((int32_t)(p[i * 4 + 0] & 0xFF));
stItem.iIndex = index;
_vHashList.push_back(stItem);
}
}
else
{
stItem.iHashCode = _ptrHashAlg->hash(virtualNode);
_vHashList.push_back(stItem);
}
}
return 0;
}
int addNode(const string & node, unsigned int index, int weight = 1);
/**
* @brief key对应到的节点node的下标.
@ -298,18 +138,7 @@ public:
* @param iIndex
* @return 0: -1:
*/
int getIndex(const string & key, unsigned int & iIndex)
{
if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0)
{
iIndex = 0;
return -1;
}
int32_t iCode = _ptrHashAlg->hash(TC_MD5::md5bin(key));
return getIndex(iCode, iIndex);
}
int getIndex(const string & key, unsigned int & iIndex);
/**
* @brief hashcode对应到的节点node的下标.
@ -318,60 +147,20 @@ public:
* @param iIndex
* @return 0: -1:
*/
int getIndex(int32_t hashcode, unsigned int & iIndex)
{
if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0)
{
iIndex = 0;
return -1;
}
// 只保留32位
size_t iCode = (hashcode & 0xFFFFFFFFL);
int low = 0;
size_t high = _vHashList.size();
if(iCode <= _vHashList[0].iHashCode || iCode > _vHashList[high-1].iHashCode)
{
iIndex = _vHashList[0].iIndex;
return 0;
}
while (low < high - 1)
{
int mid = (low + high) / 2;
if (_vHashList[mid].iHashCode > iCode)
{
high = mid;
}
else
{
low = mid;
}
}
iIndex = _vHashList[low+1].iIndex;
return 0;
}
int getIndex(int32_t hashcode, unsigned int & iIndex);
/**
* @brief hash列表的长度.
*
* @return
*/
size_t size()
{
return _vHashList.size();
}
size_t size() { return _vHashList.size(); }
/**
* @brief hash列表.
*
*/
void clear()
{
_vHashList.clear();
}
void clear() { _vHashList.clear(); }
protected:
vector<node_T_new> _vHashList;

View File

@ -27,8 +27,8 @@
namespace tars
{
struct RequestPacket;
struct ResponsePacket;
// struct RequestPacket;
// struct ResponsePacket;
enum ResponseState
{
@ -154,6 +154,10 @@ public:
*/
nghttp2_session* session() const;
/**
* @brief
*/
std::map<int, Http2Response> &doneResponses() { return _doneResponses; }
private:
/**
* session

View File

@ -0,0 +1,241 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#include "util/tc_consistent_hash_new.h"
// #include "util/tc_autoptr.h"
// #include "util/tc_hash_fun.h"
using namespace tars;
namespace tars
{
int32_t TC_KetamaHashAlg::hash(const string & sKey)
{
string sMd5 = TC_MD5::md5bin(sKey);
const char *p = (const char *) sMd5.c_str();
int32_t hash = ((int32_t)(p[3] & 0xFF) << 24)
| ((int32_t)(p[2] & 0xFF) << 16)
| ((int32_t)(p[1] & 0xFF) << 8)
| ((int32_t)(p[0] & 0xFF));
return subTo32Bit(hash);
}
TC_HashAlgorithmType TC_KetamaHashAlg::getHashType()
{
return E_TC_CONHASH_KETAMAHASH;
}
int32_t TC_DefaultHashAlg::hash(const string & sKey)
{
string sMd5 = TC_MD5::md5bin(sKey);
const char *p = (const char *) sMd5.c_str();
int32_t hash = (*(int*)(p)) ^ (*(int*)(p+4)) ^ (*(int*)(p+8)) ^ (*(int*)(p+12));
return subTo32Bit(hash);
}
TC_HashAlgorithmType TC_DefaultHashAlg::getHashType()
{
return E_TC_CONHASH_DEFAULTHASH;
}
TC_HashAlgorithm *TC_HashAlgFactory::getHashAlg(TC_HashAlgorithmType hashType)
{
TC_HashAlgorithm *ptrHashAlg = NULL;
switch(hashType)
{
case E_TC_CONHASH_KETAMAHASH:
{
ptrHashAlg = new TC_KetamaHashAlg();
break;
}
case E_TC_CONHASH_DEFAULTHASH:
default:
{
ptrHashAlg = new TC_DefaultHashAlg();
break;
}
}
return ptrHashAlg;
}
TC_ConsistentHashNew::TC_ConsistentHashNew()
{
_ptrHashAlg = TC_HashAlgFactory::getHashAlg(E_TC_CONHASH_DEFAULTHASH);
}
TC_ConsistentHashNew::TC_ConsistentHashNew(TC_HashAlgorithmType hashType)
{
_ptrHashAlg = TC_HashAlgFactory::getHashAlg(hashType);
}
/**
* @brief .
*
* @param m1 node_T_new类型的对象
* @param m2 node_T_new类型的对象
* @return less or not less返回turefalse
*/
static bool less_hash(const TC_ConsistentHashNew::node_T_new & m1, const TC_ConsistentHashNew::node_T_new & m2)
{
return m1.iHashCode < m2.iHashCode;
}
void TC_ConsistentHashNew::sortNode()
{
sort(_vHashList.begin(), _vHashList.end(), less_hash);
}
void TC_ConsistentHashNew::printNode()
{
map<unsigned int, unsigned int> mapNode;
size_t size = _vHashList.size();
for (size_t i = 0; i < size; i++)
{
if (i == 0)
{
unsigned int value = 0xFFFFFFFF - _vHashList[size - 1].iHashCode + _vHashList[0].iHashCode;
mapNode[_vHashList[0].iIndex] = value;
}
else
{
unsigned int value = _vHashList[i].iHashCode - _vHashList[i - 1].iHashCode;
if (mapNode.find(_vHashList[i].iIndex) != mapNode.end())
{
value += mapNode[_vHashList[i].iIndex];
}
mapNode[_vHashList[i].iIndex] = value;
}
cout << "printNode: " << _vHashList[i].iHashCode << "|" << _vHashList[i].iIndex << "|" << mapNode[_vHashList[i].iIndex] << endl;
}
map<unsigned int, unsigned int>::iterator it = mapNode.begin();
double avg = 100;
double sum = 0;
while (it != mapNode.end())
{
double tmp = it->second;
cerr << "result: " << it->first << "|" << it->second << "|" << (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg) << endl;
sum += (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg) * (tmp * 100 * mapNode.size() / 0xFFFFFFFF - avg);
it++;
}
cerr << "variance: " << sum / mapNode.size() << ", size: " << _vHashList.size() << endl;
}
int TC_ConsistentHashNew::addNode(const string & node, unsigned int index, int weight)
{
if (_ptrHashAlg.get() == NULL)
{
return -1;
}
node_T_new stItem;
stItem.iIndex = index;
for (int j = 0; j < weight; j++)
{
string virtualNode = node + "_" + TC_Common::tostr<int>(j);
// TODO: 目前写了2 种hash 算法,可以根据需要选择一种,
// TODO: 其中KEMATA 为参考memcached client 的hash 算法default 为原有的hash 算法,测试结论在表格里有
if (_ptrHashAlg->getHashType() == E_TC_CONHASH_KETAMAHASH)
{
string sMd5 = TC_MD5::md5bin(virtualNode);
char *p = (char *) sMd5.c_str();
for (int i = 0; i < 4; i++)
{
stItem.iHashCode = ((int32_t)(p[i * 4 + 3] & 0xFF) << 24)
| ((int32_t)(p[i * 4 + 2] & 0xFF) << 16)
| ((int32_t)(p[i * 4 + 1] & 0xFF) << 8)
| ((int32_t)(p[i * 4 + 0] & 0xFF));
stItem.iIndex = index;
_vHashList.push_back(stItem);
}
}
else
{
stItem.iHashCode = _ptrHashAlg->hash(virtualNode);
_vHashList.push_back(stItem);
}
}
return 0;
}
int TC_ConsistentHashNew::getIndex(const string & key, unsigned int & iIndex)
{
if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0)
{
iIndex = 0;
return -1;
}
int32_t iCode = _ptrHashAlg->hash(TC_MD5::md5bin(key));
return getIndex(iCode, iIndex);
}
int TC_ConsistentHashNew::getIndex(int32_t hashcode, unsigned int & iIndex)
{
if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0)
{
iIndex = 0;
return -1;
}
// 只保留32位
size_t iCode = (hashcode & 0xFFFFFFFFL);
int low = 0;
size_t high = _vHashList.size();
if(iCode <= _vHashList[0].iHashCode || iCode > _vHashList[high-1].iHashCode)
{
iIndex = _vHashList[0].iIndex;
return 0;
}
while (low < high - 1)
{
int mid = (low + high) / 2;
if (_vHashList[mid].iHashCode > iCode)
{
high = mid;
}
else
{
low = mid;
}
}
iIndex = _vHashList[low+1].iIndex;
return 0;
}
}

View File

@ -1532,20 +1532,20 @@ void TC_HttpRequest::encode(int iRequestType, ostream &os)
void TC_HttpRequest::setRequest(const string& method, const string &sUrl, const std::string& body, bool bNewCreateHost)
{
std::string lowMethod(method);
std::transform(method.begin(), method.end(), lowMethod.begin(), ::tolower);
// std::string lowMethod(method);
// std::transform(method.begin(), method.end(), lowMethod.begin(), ::tolower);
if (lowMethod == "get")
if (TC_Port::strncasecmp(method.c_str(), "GET", 3) == 0)
setGetRequest(sUrl, bNewCreateHost);
else if (lowMethod == "head")
else if (TC_Port::strncasecmp(method.c_str(), "HEAD", 4) == 0)
setHeadRequest(sUrl, bNewCreateHost);
else if (lowMethod == "post")
else if (TC_Port::strncasecmp(method.c_str(), "POST", 4) == 0)
setPostRequest(sUrl, body, bNewCreateHost);
else if (lowMethod == "put")
else if (TC_Port::strncasecmp(method.c_str(), "PUT", 3) == 0)
setPutRequest(sUrl, body, bNewCreateHost);
else if (lowMethod == "delete")
else if (TC_Port::strncasecmp(method.c_str(), "DELETE", 6) == 0)
setDeleteRequest(sUrl, body, bNewCreateHost);
else if (lowMethod == "patch")
else if (TC_Port::strncasecmp(method.c_str(), "PATH", 5) == 0)
setPatchRequest(sUrl, body, bNewCreateHost);
}

View File

@ -18,10 +18,11 @@
#include <string>
#include <algorithm>
#include <iostream>
#include "nghttp2/nghttp2.h"
#include "util/tc_nghttp2.h"
#include "util/tc_http2clientmgr.h"
// #include "util/tc_http2clientmgr.h"
#include "util/tc_base64.h"