change tars::ProxyProtocol::requestFunc

This commit is contained in:
jarodruan 2020-02-09 14:31:30 +08:00
parent cb4ecb9d3e
commit 30723faaf5
20 changed files with 436 additions and 707 deletions

View File

@ -22,7 +22,7 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
set(TARS_VERSION "1.4.0") set(TARS_VERSION "2.0.0")
add_definitions(-DTARS_VERSION="${TARS_VERSION}") add_definitions(-DTARS_VERSION="${TARS_VERSION}")
set(TARS_SSL 0) set(TARS_SSL 0)
add_definitions(-DTARS_SSL=${TARS_SSL}) add_definitions(-DTARS_SSL=${TARS_SSL})
@ -52,7 +52,7 @@ ExternalProject_Add(thirdparty
# This is the upstream source code repackaged in a .tar.gz for # This is the upstream source code repackaged in a .tar.gz for
# compatibility with older CMake. Also the tests/ directory # compatibility with older CMake. Also the tests/ directory
# removed to save space. # removed to save space.
URL http://cdn.tarsyun.com/src/20200208130936.tgz URL http://cdn.tarsyun.com/src/ThirdParty.tgz
# URL /Users/jarod/centos/TarsCpp/build/thirdparty/src/20200208092837.tgz # URL /Users/jarod/centos/TarsCpp/build/thirdparty/src/20200208092837.tgz
PREFIX ${CMAKE_BINARY_DIR}/thirdparty PREFIX ${CMAKE_BINARY_DIR}/thirdparty
INSTALL_DIR ${CMAKE_SOURCE_DIR} INSTALL_DIR ${CMAKE_SOURCE_DIR}
@ -61,22 +61,30 @@ ExternalProject_Add(thirdparty
set(THIRDPARTY_PATH "${CMAKE_BINARY_DIR}/thirdparty") set(THIRDPARTY_PATH "${CMAKE_BINARY_DIR}/thirdparty")
set(MYSQL_DIR_INC "${THIRDPARTY_PATH}/include/mysql") set(MYSQL_DIR_INC "${THIRDPARTY_PATH}/include/mysql")
set(THIRDPARTY_LIB_PATH "${THIRDPARTY_PATH}/lib")
set(LIB_MYSQL)
IF (WIN32) IF (WIN32)
set(MYSQL_DIR_LIB "${THIRDPARTY_PATH}/lib/win")
set(LIB_MYSQL "libmysql") set(LIB_MYSQL "libmysql")
if(TARS_HTTP2)
link_libraries(libnghttp2_static)
endif()
ELSE() ELSE()
IF(APPLE) link_libraries(pthread dl)
set(MYSQL_DIR_LIB "${THIRDPARTY_PATH}/lib/mac")
ELSE(LINUX) set(LIB_MYSQL "mysqlclient")
set(MYSQL_DIR_LIB "${THIRDPARTY_PATH}/lib/linux")
link_libraries(pthread dl) if(TARS_HTTP2)
ENDIF() link_libraries(nghttp2)
set(LIB_MYSQL "libmysqlclient") endif()
ENDIF() ENDIF()
include_directories(${THIRDPARTY_PATH}/include)
include_directories(${MYSQL_DIR_INC}) include_directories(${MYSQL_DIR_INC})
link_directories(${THIRDPARTY_LIB_PATH})
link_libraries(${mysqlclient})
#------------------------------------------------------------- #-------------------------------------------------------------
IF (APPLE) IF (APPLE)
link_libraries(iconv) link_libraries(iconv)

View File

@ -55,7 +55,7 @@ static TC_NetWorkBuffer::PACKET_TYPE customResponse(TC_NetWorkBuffer &in, Respon
/* /*
Whole package length (4 bytes) + irequestid (4 bytes) + package content Whole package length (4 bytes) + irequestid (4 bytes) + package content
*/ */
static void customRequest(const RequestPacket& request, shared_ptr<TC_NetWorkBuffer::SendBuffer>& sbuff) static vector<char> customRequest(const RequestPacket& request)
{ {
unsigned int net_bufflength = htonl(request.sBuffer.size()+8); unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength); unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
@ -71,7 +71,7 @@ static void customRequest(const RequestPacket& request, shared_ptr<TC_NetWorkBuf
memcpy(buffer.data() + sizeof(unsigned int), netrequestIdptr, sizeof(unsigned int)); memcpy(buffer.data() + sizeof(unsigned int), netrequestIdptr, sizeof(unsigned int));
memcpy(buffer.data() + sizeof(unsigned int) * 2, request.sBuffer.data(), request.sBuffer.size()); memcpy(buffer.data() + sizeof(unsigned int) * 2, request.sBuffer.data(), request.sBuffer.size());
sbuff->addBuffer(buffer); return buffer;
} }
class CustomCallBack : public ServantProxyCallback class CustomCallBack : public ServantProxyCallback

View File

@ -16,69 +16,73 @@
#include <iostream> #include <iostream>
#include "util/tc_http.h" #include "util/tc_http.h"
#include "util/tc_option.h"
#include "util/tc_common.h" #include "util/tc_common.h"
#include "util/tc_clientsocket.h" #include "util/tc_clientsocket.h"
#include "util/tc_thread_pool.h" #include "util/tc_thread_pool.h"
#include "tup/Tars.h" #include "tup/Tars.h"
#include "tup/tup.h" #include "tup/tup.h"
#include "util/tc_timeprovider.h" #include "util/tc_timeprovider.h"
#include "servant/Application.h"
using namespace std; using namespace std;
using namespace tars; using namespace tars;
using namespace tup; using namespace tup;
int doRequest(TC_HttpRequest& stHttp,TC_TCPClient&tcpClient, TC_HttpResponse &stHttpRsp, int iTimeout) // int main(int argc,char ** argv)
// {
// if(argc != 3)
// {
// cout << "usage: " << argv[0] << " ThreadNum CallTimes" << endl;
// return -1;
// }
// try
// {
// tars::Int32 threads = TC_Common::strto<tars::Int32>(string(argv[1]));
// TC_ThreadPool tp;
// tp.init(threads);
// tp.start();
// cout << "init tp succ" << endl;
// tars::Int32 times = TC_Common::strto<tars::Int32>(string(argv[2]));
// for(int i = 0; i<threads; i++)
// {
// tp.exec(std::bind(httpClient, times));
// }
// tp.waitForAllDone(1000);
// }catch(exception &e)
// {
// cout<<e.what()<<endl;
// }
// catch(...)
// {
// }
// return 0;
// }
Communicator* _comm;
static string httpObj = "Test.HttpServer.httpObj@tcp -h 127.0.0.1 -p 8081";
struct Param
{ {
string sSendBuffer = stHttp.encode(); int count;
string call;
int thread;
int iRet = tcpClient.send(sSendBuffer.c_str(), sSendBuffer.length()); ServantPrx servantPrx;
if(iRet != TC_ClientSocket::EM_SUCCESS) };
{
return iRet;
}
stHttpRsp.reset(); Param param;
std::atomic<int> callback_count(0);
string sBuffer;
char *sTmpBuffer = new char[10240]; void httpCall(int excut_num)
size_t iRecvLen = 10240;
while(true)
{
iRecvLen = 10240;
iRet = tcpClient.recv(sTmpBuffer, iRecvLen);
if(iRet == TC_ClientSocket::EM_SUCCESS)
sBuffer.append(sTmpBuffer, iRecvLen);
switch(iRet)
{
case TC_ClientSocket::EM_SUCCESS:
if(stHttpRsp.incrementDecode(sBuffer))
{
delete []sTmpBuffer;
return TC_ClientSocket::EM_SUCCESS;
}
continue;
case TC_ClientSocket::EM_CLOSE:
delete []sTmpBuffer;
stHttpRsp.incrementDecode(sBuffer);
return TC_ClientSocket::EM_SUCCESS;
default:
delete []sTmpBuffer;
return iRet;
}
}
assert(true);
return 0;
}
void th_dohandle(int excut_num)
{ {
unsigned long sum = 0;
int64_t _iTime = TC_TimeProvider::getInstance()->getNowMs(); int64_t _iTime = TC_TimeProvider::getInstance()->getNowMs();
string sServer1("http://127.0.0.1:8081/"); string sServer1("http://127.0.0.1:8081/");
@ -87,76 +91,212 @@ void th_dohandle(int excut_num)
stHttpReq.setCacheControl("no-cache"); stHttpReq.setCacheControl("no-cache");
stHttpReq.setGetRequest(sServer1); stHttpReq.setGetRequest(sServer1);
TC_TCPClient tcpClient1; // TC_TCPClient tcpClient1;
tcpClient1.init("127.0.0.1", 8081, 3000); // tcpClient1.init("127.0.0.1", 8081, 3000);
int iRet = 0; int iRet = 0;
for (int i = 0; i<excut_num; i++) for (int i = 0; i<excut_num; i++)
{ {
TC_HttpResponse stHttpRsp;
// iRet = doRequest(stHttpReq, tcpClient1, stHttpRsp, 3000);
iRet = stHttpReq.doRequest(stHttpRsp, 3000);
if (iRet != 0)
{
cout <<"pthread id: " << TC_Thread::CURRENT_THREADID() << ", iRet:" << iRet <<endl;
}
++callback_count;
}
cout << "pthread id: " << TC_Thread::CURRENT_THREADID() << ", succ:" << param.count << "/" << excut_num << ", " << TC_TimeProvider::getInstance()->getNowMs() - _iTime <<"(ms)"<<endl;
}
struct TestHttpCallback : public HttpCallback
{
TestHttpCallback(int64_t t, int i, int c) : start(t), cur(i), count(c)
{
}
virtual int onHttpResponse(const std::map<std::string, std::string>& requestHeaders ,
const std::map<std::string, std::string>& responseHeaders ,
const std::vector<char>& rspBody)
{
callback_count++;
if(cur == count-1)
{
int64_t cost = TC_Common::now2us() - start;
cout << "onHttpResponse count:" << count << ", " << cost << " us, avg:" << 1.*cost/count << "us" << endl;
}
return 0;
}
virtual int onHttpResponseException(const std::map<std::string, std::string>& requestHeaders,
int expCode)
{
callback_count++;
return 0;
}
int64_t start;
int cur;
int count;
};
void syncRpc(int c)
{
int64_t t = TC_Common::now2us();
std::map<std::string, std::string> header;
header["X-Test"] = "YYYY";
std::map<std::string, std::string> rheader;
//发起远程调用
for (int i = 0; i < c; ++i)
{
string rbody;
try try
{ {
TC_HttpResponse stHttpRsp;
param.servantPrx->http_call("GET", "http://127.0.0.1:8081", header, "helloworld", rheader, rbody);
iRet = doRequest(stHttpReq, tcpClient1, stHttpRsp, 3000);
// iRet = stHttpReq.doRequest(stHttpRsp, 3000);
if (iRet != 0)
{
cout <<"pthread id: " << TC_Thread::CURRENT_THREADID() << ", iRet:" << iRet <<endl;
exit(-1);
}
else
{
sum++;
}
} }
catch(TC_Exception &e) catch(exception& e)
{ {
cout << "pthread id: " << TC_Thread::CURRENT_THREADID() << " id: " << i << " exception: " << e.what() << endl; cout << "exception:" << e.what() << endl;
exit(-1);
}
catch(...)
{
cout << "pthread id: " << TC_Thread::CURRENT_THREADID() << " id: " << i << " unknown exception." << endl;
exit(-1);
} }
++callback_count;
} }
cout << "pthread id: " << TC_Thread::CURRENT_THREADID() << ", succ:" << sum << "/" << excut_num << ", " << TC_TimeProvider::getInstance()->getNowMs() - _iTime <<"(ms)"<<endl;
int64_t cost = TC_Common::now2us() - t;
cout << "syncCall total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
} }
int main(int argc,char ** argv) // void http_call_async(const std::map<std::string, std::string>& headers,
{ // const std::string& body,
if(argc != 3) // HttpCallback* cb);
{
cout << "usage: " << argv[0] << " ThreadNum CallTimes" << endl;
return -1;
}
void asyncRpc(int c)
{
int64_t t = TC_Common::now2us();
std::map<std::string, std::string> header;
header["X-Test"] = "YYYY";
//发起远程调用
for (int i = 0; i < c; ++i)
{
HttpCallbackPtr p = new TestHttpCallback(t, i, c);
try
{
param.servantPrx->http_call_async(header, "helloworld", p);
}
catch(exception& e)
{
cout << "exception:" << e.what() << endl;
}
}
int64_t cost = TC_Common::now2us() - t;
cout << "asyncCall send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
}
int main(int argc, char *argv[])
{
try try
{ {
tars::Int32 threads = TC_Common::strto<tars::Int32>(string(argv[1])); if (argc < 4)
TC_ThreadPool tp;
tp.init(threads);
tp.start();
cout << "init tp succ" << endl;
tars::Int32 times = TC_Common::strto<tars::Int32>(string(argv[2]));
for(int i = 0; i<threads; i++)
{ {
tp.exec(std::bind(th_dohandle, times)); cout << "Usage:" << argv[0] << "--count=1000 --call=[basehttp|synchttp|asynchttp] --thread=1" << endl;
return 0;
} }
tp.waitForAllDone(1000); TC_Option option;
}catch(exception &e) option.decode(argc, argv);
{
cout<<e.what()<<endl;
}
catch(...)
{
param.count = TC_Common::strto<int>(option.getValue("count"));
if(param.count <= 0) param.count = 1000;
param.call = option.getValue("call");
if(param.call.empty()) param.call = "sync";
param.thread = TC_Common::strto<int>(option.getValue("thread"));
if(param.thread <= 0) param.thread = 1;
_comm = new Communicator();
_comm->setProperty("sendqueuelimit", "1000000");
_comm->setProperty("asyncqueuecap", "1000000");
param.servantPrx = _comm->stringToProxy<ServantPrx>(httpObj);
param.servantPrx->tars_connect_timeout(5000);
param.servantPrx->tars_async_timeout(60*1000);
ProxyProtocol proto;
proto.requestFunc = tars::http2Request;
proto.responseFunc = tars::http2Response;
int64_t start = TC_Common::now2us();
std::function<void(int)> func;
if (param.call == "basehttp")
{
func = httpCall;
}
else if (param.call == "synchttp")
{
func = syncRpc;
}
else if(param.call == "asynchttp")
{
func = asyncRpc;
}
else
{
cout << "no func, exits" << endl;
exit(0);
}
vector<std::thread*> vt;
for(int i = 0 ; i< param.thread; i++)
{
vt.push_back(new std::thread(func, param.count));
}
std::thread print([&]{while(callback_count != param.count * param.thread) {
cout << param.call << ": ----------finish count:" << callback_count << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
};});
for(size_t i = 0 ; i< vt.size(); i++)
{
vt[i]->join();
delete vt[i];
}
cout << "(pid:" << std::this_thread::get_id() << ")"
<< "(count:" << param.count << ")"
<< "(use ms:" << (TC_Common::now2us() - start)/1000 << ")"
<< endl;
while(callback_count != param.count * param.thread) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
print.join();
cout << "----------finish count:" << callback_count << endl;
} }
catch(exception &ex)
{
cout << ex.what() << endl;
}
cout << "main return." << endl;
return 0; return 0;
} }

View File

@ -21,42 +21,32 @@ using namespace std;
HttpServer g_app; HttpServer g_app;
// /////////////////////////////////////////////////////////////////
// struct HttpProtocol
// {
// /**
// * http协议解析
// * @param in
// * @param out
// *
// * @return int
// */
// static int parseHttp(TC_NetWorkBuffer &in, vector<char> &out)
// {
// try
// {
// bool b = in.checkRequest(in.length());
// if(b)
// {
// out = in;
// in.clearBuffers();
// //TLOGDEBUG("out size: " << out.size() << endl);
// return TC_EpollServer::PACKET_FULL;
// }
// else
// {
// return TC_EpollServer::PACKET_LESS;
// }
// }
// catch(exception &ex)
// {
// return TC_EpollServer::PACKET_ERR;
// }
// return TC_EpollServer::PACKET_LESS; //<2F><>ʾ<EFBFBD>յ<EFBFBD><D5B5>İ<EFBFBD><C4B0><EFBFBD><EFBFBD><EFBFBD>ȫ static TC_NetWorkBuffer::PACKET_TYPE parseHttp(TC_NetWorkBuffer &in, vector<char> &out)
// } {
vector<char> buffer = in.getBuffers();
cout << "parseHttp:" << buffer.data() << endl;
try
{
bool b = TC_HttpRequest::checkRequest(buffer.data(), buffer.size());
if(b)
{
out.swap(buffer);
in.clearBuffers();
return TC_NetWorkBuffer::PACKET_FULL;
}
else
{
return TC_NetWorkBuffer::PACKET_LESS;
}
}
catch(exception &ex)
{
return TC_NetWorkBuffer::PACKET_ERR;
}
// }; return TC_NetWorkBuffer::PACKET_LESS;
}
void void
HttpServer::initialize() HttpServer::initialize()
@ -65,7 +55,8 @@ HttpServer::initialize()
//... //...
addServant<HttpImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj"); addServant<HttpImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj");
addServantProtocol(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj",&TC_NetWorkBuffer::parseHttp); // addServantProtocol(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj",&TC_NetWorkBuffer::parseHttp);
addServantProtocol(ServerConfig::Application + "." + ServerConfig::ServerName + ".HttpObj",parseHttp);
} }
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
void void

View File

@ -30,7 +30,7 @@
#应用名称 #应用名称
app = Test app = Test
#服务名称 #服务名称
server = HelloServer server = HttpServer
#服务的数据目录,可执行文件,配置文件等 #服务的数据目录,可执行文件,配置文件等
basepath = ./ basepath = ./
datapath = ./ datapath = ./
@ -60,7 +60,7 @@
#当前线程个数 #当前线程个数
threads = 5 threads = 5
#处理对象 #处理对象
servant = Test.HelloServer.HttpObj servant = Test.HttpServer.HttpObj
#队列最大包个数 #队列最大包个数
queuecap = 1000000 queuecap = 1000000
protocol = not-tars protocol = not-tars

View File

@ -55,7 +55,7 @@ static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, Response
4+iRequestId4+ 4+iRequestId4+
*/ */
static void pushRequest(const RequestPacket& request, shared_ptr<TC_NetWorkBuffer::SendBuffer>& sbuff) static vector<char> pushRequest(const RequestPacket& request)
{ {
unsigned int net_bufflength = htonl(request.sBuffer.size()+8); unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength); unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
@ -71,7 +71,8 @@ static void pushRequest(const RequestPacket& request, shared_ptr<TC_NetWorkBuffe
memcpy(buffer.data() + sizeof(unsigned int), netrequestIdptr, sizeof(unsigned int)); memcpy(buffer.data() + sizeof(unsigned int), netrequestIdptr, sizeof(unsigned int));
memcpy(buffer.data() + sizeof(unsigned int) * 2, request.sBuffer.data(), request.sBuffer.size()); memcpy(buffer.data() + sizeof(unsigned int) * 2, request.sBuffer.data(), request.sBuffer.size());
sbuff->addBuffer(buffer); return buffer;
// sbuff->addBuffer(buffer);
} }
static void printResult(int iRequestId, const string &sResponseStr) static void printResult(int iRequestId, const string &sResponseStr)

View File

@ -278,6 +278,7 @@ int main(int argc, char *argv[])
TC_Option option; TC_Option option;
option.decode(argc, argv); option.decode(argc, argv);
param.count = TC_Common::strto<int>(option.getValue("count"));
if(param.count <= 0) param.count = 1000; if(param.count <= 0) param.count = 1000;
param.buffersize = TC_Common::strto<int>(option.getValue("buffersize")); param.buffersize = TC_Common::strto<int>(option.getValue("buffersize"));
if(param.buffersize <= 0) param.buffersize = 1000; if(param.buffersize <= 0) param.buffersize = 1000;

View File

@ -13,7 +13,9 @@ sleep 1
echo "client: ./bin/HttpClient" echo "client: ./bin/HttpClient"
./bin/HttpClient 2 10000 ./bin/HttpClient --count=10000 --thread=2 --call=basehttp
./bin/HttpClient --count=10000 --thread=2 --call=synchttp
./bin/HttpClient --count=10000 --thread=2 --call=asynchttp
sleep 1 sleep 1

View File

@ -169,7 +169,7 @@ int AdapterProxy::invoke(ReqMessage * msg)
startTrack(msg); startTrack(msg);
#endif #endif
_objectProxy->getProxyProtocol().requestFunc(msg->request, msg->sReqData); msg->sReqData->addBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request));
//交给连接发送数据,连接连上,buffer不为空,直接发送数据成功 //交给连接发送数据,连接连上,buffer不为空,直接发送数据成功
if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData) != Transceiver::eRetError) if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData) != Transceiver::eRetError)

View File

@ -19,7 +19,6 @@
#include "tup/Tars.h" #include "tup/Tars.h"
#include <iostream> #include <iostream>
#if TARS_HTTP2 #if TARS_HTTP2
#include "util/tc_nghttp2.h" #include "util/tc_nghttp2.h"
#include "util/tc_http2clientmgr.h" #include "util/tc_http2clientmgr.h"
@ -42,36 +41,13 @@
namespace tars namespace tars
{ {
// //TARSServer的协议解析器
// int AppProtocol::parseAdmin(string &in, string &out)
// {
// return parse(in, out);
// }
// void ProxyProtocol::tarsRequest(const RequestPacket& request, string& buff)
// {
// TarsOutputStream<BufferWriter> os;
// request.writeTo(os);
// tars::Int32 iHeaderLen = htonl(sizeof(tars::Int32) + os.getLength());
// buff.clear();
// buff.reserve(sizeof(tars::Int32) + os.getLength());
// buff.append((const char*)&iHeaderLen, sizeof(tars::Int32));
// buff.append(os.getBuffer(), os.getLength());
// }
//TAFServer的协议解析器 //TAFServer的协议解析器
TC_NetWorkBuffer::PACKET_TYPE AppProtocol::parseAdmin(TC_NetWorkBuffer &in, shared_ptr<TC_NetWorkBuffer::SendBuffer> &out) TC_NetWorkBuffer::PACKET_TYPE AppProtocol::parseAdmin(TC_NetWorkBuffer &in, shared_ptr<TC_NetWorkBuffer::SendBuffer> &out)
{ {
return parse(in, out->getBuffer()); return parse(in, out->getBuffer());
} }
void ProxyProtocol::tarsRequest(const RequestPacket& request, shared_ptr<TC_NetWorkBuffer::SendBuffer>& buff) vector<char> ProxyProtocol::tarsRequest(const RequestPacket& request)
{ {
TarsOutputStream<BufferWriterVector> os; TarsOutputStream<BufferWriterVector> os;
@ -82,13 +58,17 @@ void ProxyProtocol::tarsRequest(const RequestPacket& request, shared_ptr<TC_NetW
request.writeTo(os); request.writeTo(os);
buff->swap(os.getByteBuffer()); vector<char> buff;
assert(buff->length() >= 4); buff.swap(os.getByteBuffer());
iHeaderLen = htonl((int)(buff->length())); assert(buff.size() >= 4);
memcpy((void*)buff->buffer(), (const char *)&iHeaderLen, sizeof(iHeaderLen)); iHeaderLen = htonl((int)(buff.size()));
memcpy((void*)buff.data(), (const char *)&iHeaderLen, sizeof(iHeaderLen));
return buff;
} }
//////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////
@ -117,28 +97,33 @@ static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id
return len; return len;
} }
size_t http1Response(const char* recvBuffer, size_t length, std::list<tars::ResponsePacket>& done) TC_NetWorkBuffer::PACKET_TYPE http1Response(TC_NetWorkBuffer &in, tars::ResponsePacket& rsp)
{ {
tars::TC_HttpResponse httpRsp; TC_NetWorkBuffer::PACKET_TYPE flag = in.checkHttp();
bool ok = httpRsp.decode(std::string(recvBuffer, length));
if(!ok)
return 0;
ResponsePacket rsp; if(flag == TC_NetWorkBuffer::PACKET_FULL)
rsp.status["status"] = httpRsp.getResponseHeaderLine();
for (const auto& kv : httpRsp.getHeaders())
{ {
// 响应的头部 tars::TC_HttpResponse httpRsp;
rsp.status[kv.first] = kv.second; httpRsp.decode(in.getBuffers());
}
std::string content(httpRsp.getContent()); // ResponsePacket rsp;
rsp.sBuffer.assign(content.begin(), content.end()); rsp.status["status"] = httpRsp.getResponseHeaderLine();
done.push_back(rsp); for (const auto& kv : httpRsp.getHeaders())
return httpRsp.getHeadLength() + httpRsp.getContentLength(); {
// 响应的头部
rsp.status[kv.first] = kv.second;
}
rsp.sBuffer.assign(httpRsp.getContent().begin(), httpRsp.getContent().end());
}
return flag;
// done.push_back(rsp);
// return httpRsp.getHeadLength() + httpRsp.getContentLength();
} }
std::string encodeHttp2(RequestPacket& request, TC_NgHttp2* session) vector<char> encodeHttp2(RequestPacket& request, TC_NgHttp2* session)
{ {
std::vector<nghttp2_nv> nva; std::vector<nghttp2_nv> nva;
@ -186,13 +171,19 @@ std::string encodeHttp2(RequestPacket& request, TC_NgHttp2* session)
nghttp2_session_send(session->session()); nghttp2_session_send(session->session());
// 交给tars发送 // 交给tars发送
std::string out; // std::string out;
out.swap(session->sendBuffer()); // out.swap(session->sendBuffer());
// return out;
vector<char> out;
out.assign(session->sendBuffer().begin(), session->sendBuffer().end());
return out; return out;
} }
// ENCODE function, called by network thread // ENCODE function, called by network thread
void http2Request(const RequestPacket& request, std::string& out) vector<char> http2Request(const RequestPacket& request)
{ {
TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(request.iRequestId); TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(request.iRequestId);
if (session->getState() == TC_NgHttp2::None) if (session->getState() == TC_NgHttp2::None)
@ -203,39 +194,59 @@ void http2Request(const RequestPacket& request, std::string& out)
assert (session->getState() == TC_NgHttp2::Http2); assert (session->getState() == TC_NgHttp2::Http2);
out = encodeHttp2(const_cast<RequestPacket&>(request), session); return encodeHttp2(request, session);
} }
size_t http2Response(const char* recvBuffer, size_t length, list<ResponsePacket>& done, void* userptr) // TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, list<ResponsePacket>& done, void* userptr)
TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket& done)
{ {
const int sessionId = *(int*)&userptr; auto it = session->_doneResponses.begin();
TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(sessionId);
assert (session->getState() == TC_NgHttp2::Http2);
int readlen = nghttp2_session_mem_recv(session->session(), if(it == session->_doneResponses.end())
(const uint8_t*)recvBuffer,
length);
if (readlen < 0)
{ {
throw std::runtime_error("nghttp2_session_mem_recv return error"); vector<char> buffer = in.getBuffers();
return 0; in.clearBuffers();
Transceiver* userptr = ((Transceiver*))in->getConnection();
int sessionId = userptr->getAdapterProxy()->getId();
TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(sessionId);
assert (session->getState() == TC_NgHttp2::Http2);
int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.data(), buffer.length());
if (readlen < 0)
{
// throw std::runtime_error("nghttp2_session_mem_recv return error");
return TC_NetWorkBuffer::PACKET_ERROR;
}
} }
std::map<int, Http2Response>::const_iterator it(session->_doneResponses.begin()); it = session->_doneResponses.begin();
for (; it != session->_doneResponses.end(); ++ it) if(it == session->_doneResponses.end())
{ {
ResponsePacket rsp; return TC_NetWorkBuffer::PACKET_LESS;
}
rsp.iRequestId = it->second.streamId;
rsp.status = it->second.headers;
rsp.sBuffer.assign(it->second.body.begin(), it->second.body.end());
session->_doneResponses.erase(it);
// std::map<int, Http2Response>::const_iterator it(session->_doneResponses.begin());
// for (; it != session->_doneResponses.end(); ++ it)
// {
// ResponsePacket rsp;
rsp.iRequestId = it->second.streamId; // rsp.iRequestId = it->second.streamId;
rsp.status = it->second.headers; // rsp.status = it->second.headers;
rsp.sBuffer.assign(it->second.body.begin(), it->second.body.end()); // rsp.sBuffer.assign(it->second.body.begin(), it->second.body.end());
done.push_back(rsp); // done.push_back(rsp);
} // }
session->_doneResponses.clear(); // session->_doneResponses.clear();
return readlen; return TC_NetWorkBuffer::PACKET_FULL;
} }
#endif #endif

View File

@ -205,10 +205,9 @@ int ServantProxyCallback::onDispatchException(const RequestPacket &req, const Re
// } // }
// } // }
HttpServantProxyCallback::HttpServantProxyCallback(HttpCallback* cb) : HttpServantProxyCallback::HttpServantProxyCallback(const HttpCallbackPtr& cb) :
_httpCb(cb) _httpCb(cb)
{ {
assert(_httpCb);
} }
int HttpServantProxyCallback::onDispatchException(const RequestPacket &request, const ResponsePacket &response) int HttpServantProxyCallback::onDispatchException(const RequestPacket &request, const ResponsePacket &response)
@ -705,7 +704,7 @@ void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
//判断eStatus来判断状态 //判断eStatus来判断状态
assert(msg->eStatus != ReqMessage::REQ_REQ); assert(msg->eStatus != ReqMessage::REQ_REQ);
TLOGTARS("[TARS]ServantProxy::invoke line: " << __LINE__ << " status: " << msg->eStatus << " ret: " <<msg->response->iRet << endl); TLOGTARS("[TARS]ServantProxy::invoke line: " << __LINE__ << " status: " << msg->eStatus << ", ret: " <<msg->response->iRet << endl);
if(msg->eStatus == ReqMessage::REQ_RSP && msg->response->iRet == TARSSERVERSUCCESS) if(msg->eStatus == ReqMessage::REQ_RSP && msg->response->iRet == TARSSERVERSUCCESS)
{ {
@ -729,7 +728,7 @@ void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
if(msg->adapter) if(msg->adapter)
{ {
os << ",adapter" << msg->adapter->endpoint().desc(); os << ",adapter:" << msg->adapter->endpoint().desc();
} }
os << ",reqid:" << msg->request.iRequestId << "]"; os << ",reqid:" << msg->request.iRequestId << "]";
@ -917,7 +916,7 @@ void ServantProxy::http_call(const std::string& method,
void ServantProxy::http_call_async(const std::map<std::string, std::string>& headers, void ServantProxy::http_call_async(const std::map<std::string, std::string>& headers,
const std::string& body, const std::string& body,
HttpCallback* cb) const HttpCallbackPtr &cb)
{ {
ReqMessage * msg = new ReqMessage(); ReqMessage * msg = new ReqMessage();

View File

@ -248,7 +248,7 @@ bool Transceiver::sendAuthData(const BasicAuthInfo& info)
request.sBuffer.assign(out.begin(), out.end()); request.sBuffer.assign(out.begin(), out.end());
// vector<char> toSend; // vector<char> toSend;
objPrx->getProxyProtocol().requestFunc(request, _sendBuffer); _sendBuffer->addBuffer(objPrx->getProxyProtocol().requestFunc(request));
// _sendBuffer.addBuffer(toSend); // _sendBuffer.addBuffer(toSend);

View File

@ -27,10 +27,6 @@
#include "util/tc_epoll_server.h" #include "util/tc_epoll_server.h"
#include "util/tc_network_buffer.h" #include "util/tc_network_buffer.h"
#if TARS_HTTP2
#include "nghttp2/nghttp2.h"
#endif
using namespace std; using namespace std;
using namespace tup; using namespace tup;
@ -59,18 +55,6 @@ T net2host(T len)
class AppProtocol class AppProtocol
{ {
public: public:
// /**
// * 解析协议
// * @param in, 目前的buffer
// * @param out, 一个完整的包
// *
// * @return int, 0表示没有接收完全, 1表示收到一个完整包
// */
// static int parse(string &in, string &out)
// {
// return parseLen<10000000>(in,out);
// }
/** /**
* *
* @param in, buffer * @param in, buffer
@ -145,106 +129,15 @@ public:
return TC_NetWorkBuffer::PACKET_FULL; return TC_NetWorkBuffer::PACKET_FULL;
} }
// template<tars::Int32 iMaxLength>
// static int parseLen(string &in, string &out)
// {
// if(in.length() < sizeof(tars::Int32))
// {
// return TC_EpollServer::PACKET_LESS;
// }
// tars::Int32 iHeaderLen;
// memcpy(&iHeaderLen, in.c_str(), sizeof(tars::Int32));
// iHeaderLen = ntohl(iHeaderLen);
// if(iHeaderLen < tars::Int32(sizeof(tars::Int32))|| iHeaderLen > iMaxLength)
// {
// return TC_EpollServer::PACKET_ERR;
// }
// if((int)in.length() < iHeaderLen)
// {
// return TC_EpollServer::PACKET_LESS;
// }
// out = in.substr(sizeof(tars::Int32), iHeaderLen - sizeof(tars::Int32));
// in = in.substr(iHeaderLen);
// return TC_EpollServer::PACKET_FULL;
// }
// /**
// * 解析协议
// * @param in, 目前的buffer
// * @param out, 一个完整的包
// *
// * @return int, 0表示没有接收完全, 1表示收到一个完整包
// */
// // static int parseAdmin(string &in, string &out);
// /**
// *
// * @param T
// * @param offset
// * @param netorder
// * @param in
// * @param out
// * @return int
// */
// template<size_t offset, typename T, bool netorder>
// static int parseStream(string& in, string& out)
// {
// if(in.length() < offset + sizeof(T))
// {
// return TC_EpollServer::PACKET_LESS;
// }
// T iHeaderLen = 0;
// ::memcpy(&iHeaderLen, in.c_str() + offset, sizeof(T));
// if (netorder)
// {
// iHeaderLen = net2host<T>(iHeaderLen);
// }
// if (iHeaderLen < (T)(offset + sizeof(T)) || (uint32_t)iHeaderLen > 100000000)
// {
// return TC_EpollServer::PACKET_ERR;
// }
// if (in.length() < (uint32_t)iHeaderLen)
// {
// return TC_EpollServer::PACKET_LESS;
// }
// out = in.substr(0, iHeaderLen);
// in = in.substr(iHeaderLen);
// return TC_EpollServer::PACKET_FULL;
// }
}; };
typedef std::function<void(const RequestPacket&, shared_ptr<TC_NetWorkBuffer::SendBuffer>&)> request_protocol; typedef std::function<vector<char>(const RequestPacket&)> request_protocol;
/** /**
* , * ,
* *
*/ */
typedef std::function<TC_NetWorkBuffer::PACKET_TYPE(TC_NetWorkBuffer&, ResponsePacket&)> response_protocol; typedef std::function<TC_NetWorkBuffer::PACKET_TYPE(TC_NetWorkBuffer&, ResponsePacket&)> response_protocol;
typedef std::function<TC_NetWorkBuffer::PACKET_TYPE(TC_NetWorkBuffer&, ResponsePacket&)> response_ex_protocol; // typedef std::function<TC_NetWorkBuffer::PACKET_TYPE(TC_NetWorkBuffer&, ResponsePacket&)> response_ex_protocol;
// using request_protocol = std::function<void (const RequestPacket& , string& )>;
// /**
// * 接收协议处理, 返回值表示解析了多少字节
// * 框架层会自动对处理了包做处理
// */
// using response_protocol = std::function<size_t (const char *, size_t, list<ResponsePacket>&)>;
// using response_ex_protocol = std::function<size_t (const char *, size_t, list<ResponsePacket>&, void*)>;
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
/** /**
@ -263,112 +156,11 @@ public:
* @param request * @param request
* @param buff * @param buff
*/ */
static void streamRequest(const RequestPacket& request, shared_ptr<TC_NetWorkBuffer::SendBuffer>& buff) static const vector<char> &streamRequest(const RequestPacket& request)
{ {
buff->setBuffer(request.sBuffer); return request.sBuffer;
} }
// /**
// * 普通二进制请求包
// * @param request
// * @param buff
// */
// static void streamRequest(const RequestPacket& request, string& buff)
// {
// buff.assign((const char*)(&request.sBuffer[0]), request.sBuffer.size());
// }
// /**
// * 普通二进制 tars 请求包
// * @param request
// * @param buff
// */
// template
// <
// size_t offset,
// typename T,
// bool netorder,
// size_t idOffset,
// typename K,
// bool idNetorder,
// size_t packetMaxSize
// >
// static size_t streamResponse(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
// {
// size_t pos = 0;
// while (pos < length)
// {
// uint32_t len = length - pos;
// if (len < offset + sizeof(T))
// {
// break;
// }
// T iHeaderLen = 0;
// ::memcpy(&iHeaderLen, recvBuffer + pos + offset, sizeof(T));
// if (netorder)
// {
// iHeaderLen = net2host<T>(iHeaderLen);
// }
// //做一下保护,长度大于10M
// size_t sizeHeaderLen = static_cast<size_t>(iHeaderLen);
// if (sizeHeaderLen > packetMaxSize || iHeaderLen == 0)
// {
// throw TarsDecodeException("packet length too long or zero,len:(" + TC_Common::tostr(packetMaxSize) + ")" + TC_Common::tostr(iHeaderLen));
// }
// //包没有接收全
// if (len < iHeaderLen)
// {
// break;
// }
// ResponsePacket rsp;
// rsp.sBuffer.reserve(iHeaderLen);
// rsp.sBuffer.resize(iHeaderLen);
// ::memcpy(&rsp.sBuffer[0], recvBuffer + pos, iHeaderLen);
// K requestId;
// ::memcpy(&requestId, recvBuffer + pos + idOffset, sizeof(K));
// if (idNetorder)
// {
// requestId = net2host<K>(requestId);
// }
// rsp.iRequestId = static_cast<uint32_t>(requestId);
// done.push_back(rsp);
// pos += iHeaderLen;
// }
// return pos;
// }
// template
// <
// size_t offset,
// typename T,
// bool netorder,
// size_t idOffset,
// typename K,
// bool idNetorder
// >
// static size_t streamResponse(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
// {
// return streamResponse<offset, T, netorder, idOffset, K, idNetorder, 10000000>(recvBuffer, length, done);
// }
/** /**
* taf请求包 * taf请求包
* @param request * @param request
@ -469,12 +261,6 @@ public:
return tupResponseLen<TARS_NET_MIN_PACKAGE_SIZE, TARS_NET_MAX_PACKAGE_SIZE>(in, done); return tupResponseLen<TARS_NET_MIN_PACKAGE_SIZE, TARS_NET_MAX_PACKAGE_SIZE>(in, done);
} }
// static size_t tupResponse(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
// {
// return tupResponseLen<10000000>(recvBuffer,length,done);
// }
template<uint32_t iMinLength, uint32_t iMaxLength> template<uint32_t iMinLength, uint32_t iMaxLength>
static TC_NetWorkBuffer::PACKET_TYPE tupResponseLen(TC_NetWorkBuffer &in, ResponsePacket& rsp) static TC_NetWorkBuffer::PACKET_TYPE tupResponseLen(TC_NetWorkBuffer &in, ResponsePacket& rsp)
{ {
@ -571,115 +357,13 @@ public:
return TC_NetWorkBuffer::PACKET_FULL; return TC_NetWorkBuffer::PACKET_FULL;
} }
// template<uint32_t iMaxLength>
// static size_t tupResponseLen(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
// {
// size_t pos = 0;
// while (pos < length)
// {
// uint32_t len = length - pos;
// if(len < sizeof(tars::Int32))
// {
// break;
// }
// uint32_t iHeaderLen = ntohl(*(uint32_t*)(recvBuffer + pos));
// //做一下保护,长度大于10M
// if (iHeaderLen > iMaxLength || iHeaderLen < sizeof(tars::Int32))
// {
// throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
// }
// //包没有接收全
// if (len < iHeaderLen)
// {
// //看看包头是否正确
// static const uint32_t head = 20;
// if(len >= head)
// {
// TarsInputStream<BufferReader> is;
// is.setBuffer(recvBuffer + pos + sizeof(tars::Int32), head);
// //tup回来是requestpackage
// RequestPacket rsp;
// is.read(rsp.iVersion, 1, true);
// if (rsp.iVersion != TUPVERSION)
// {
// throw TarsDecodeException("version not correct, version:" + TC_Common::tostr(rsp.iVersion));
// }
// is.read(rsp.cPacketType, 2, true);
// if (rsp.cPacketType != TARSNORMAL)
// {
// throw TarsDecodeException("packettype not correct, packettype:" + TC_Common::tostr((int)rsp.cPacketType));
// }
// is.read(rsp.iMessageType, 3, true);
// is.read(rsp.iRequestId, 4, true);
// }
// break;
// }
// else
// {
// TarsInputStream<BufferReader> is;
// //buffer包括4个字节长度
// vector<char> buffer;
// buffer.reserve(iHeaderLen);
// buffer.resize(iHeaderLen);
// memcpy(&(buffer[0]), recvBuffer + pos, iHeaderLen);
// is.setBuffer(&(buffer[0]) + sizeof(tars::Int32), buffer.size() - sizeof(tars::Int32));
// pos += iHeaderLen;
// //TUP的响应包其实也是返回包
// RequestPacket req;
// req.readFrom(is);
// if (req.iVersion != TUPVERSION )
// {
// throw TarsDecodeException("version not correct, version:" + TC_Common::tostr(req.iVersion));
// }
// if (req.cPacketType != TARSNORMAL)
// {
// throw TarsDecodeException("packettype not correct, packettype:" + TC_Common::tostr((int)req.cPacketType));
// }
// ResponsePacket rsp;
// rsp.cPacketType = req.cPacketType;
// rsp.iMessageType = req.iMessageType;
// rsp.iRequestId = req.iRequestId;
// rsp.iVersion = req.iVersion;
// rsp.context = req.context;
// //tup的响应包直接放入到sBuffer里面
// rsp.sBuffer = buffer;
// done.push_back(rsp);
// }
// }
// return pos;
// }
public: public:
/** /**
* tars请求包 * tars请求包
* @param request * @param request
* @param buff * @param buff
*/ */
static void tarsRequest(const RequestPacket& request, shared_ptr<TC_NetWorkBuffer::SendBuffer>& buff); static vector<char> tarsRequest(const RequestPacket& request);
// static void tarsRequest(const RequestPacket& request, string& buff);
/** /**
* tars响应包解析 * tars响应包解析
@ -690,11 +374,6 @@ public:
{ {
return tarsResponseLen<TARS_NET_MIN_PACKAGE_SIZE, TARS_NET_MAX_PACKAGE_SIZE>(in, done); return tarsResponseLen<TARS_NET_MIN_PACKAGE_SIZE, TARS_NET_MAX_PACKAGE_SIZE>(in, done);
} }
// static size_t tarsResponse(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
// {
// return tarsResponseLen<10000000>(recvBuffer,length,done);
// }
template<uint32_t iMinLength, uint32_t iMaxLength> template<uint32_t iMinLength, uint32_t iMaxLength>
static TC_NetWorkBuffer::PACKET_TYPE tarsResponseLen(TC_NetWorkBuffer &in, ResponsePacket& rsp) static TC_NetWorkBuffer::PACKET_TYPE tarsResponseLen(TC_NetWorkBuffer &in, ResponsePacket& rsp)
@ -767,9 +446,6 @@ public:
TarsInputStream<BufferReader> is; TarsInputStream<BufferReader> is;
is.setBuffer(buffer.data(), buffer.size()); is.setBuffer(buffer.data(), buffer.size());
// pos += iHeaderLen;
// ResponsePacket rsp;
rsp.readFrom(is); rsp.readFrom(is);
if (rsp.iVersion != TARSVERSION) if (rsp.iVersion != TARSVERSION)
@ -791,113 +467,22 @@ public:
return TC_NetWorkBuffer::PACKET_FULL; return TC_NetWorkBuffer::PACKET_FULL;
} }
// template<uint32_t iMaxLength>
// static size_t tarsResponseLen(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
// {
// size_t pos = 0;
// while (pos < length)
// {
// uint32_t len = length - pos;
// if(len < sizeof(tars::Int32))
// {
// break;
// }
// uint32_t iHeaderLen = ntohl(*(uint32_t*)(recvBuffer + pos));
// //做一下保护,长度大于10M
// if (iHeaderLen > iMaxLength || iHeaderLen < sizeof(tars::Int32))
// {
// throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
// }
// //包没有接收全
// if (len < iHeaderLen)
// {
// //看看包头是否正确
// static const uint32_t head = 20;
// if(len >= head)
// {
// TarsInputStream<BufferReader> is;
// is.setBuffer(recvBuffer + pos + sizeof(tars::Int32), head);
// ResponsePacket rsp;
// is.read(rsp.iVersion, 1, false);
// if (rsp.iVersion != TARSVERSION)
// {
// throw TarsDecodeException("version not correct, version:" + TC_Common::tostr(rsp.iVersion));
// }
// is.read(rsp.cPacketType, 2, false);
// if (rsp.cPacketType != TARSNORMAL)
// {
// throw TarsDecodeException("packettype not correct, packettype:" + TC_Common::tostr((int)rsp.cPacketType));
// }
// is.read(rsp.iRequestId, 3, false);
// is.read(rsp.iMessageType, 4, false);
// is.read(rsp.iRet, 5, false);
// if (rsp.iRet < TARSSERVERUNKNOWNERR)
// {
// throw TarsDecodeException("response value not correct, value:" + TC_Common::tostr(rsp.iRet));
// }
// }
// break;
// }
// else
// {
// TarsInputStream<BufferReader> is;
// is.setBuffer(recvBuffer + pos + sizeof(tars::Int32), iHeaderLen - sizeof(tars::Int32));
// pos += iHeaderLen;
// ResponsePacket rsp;
// rsp.readFrom(is);
// if (rsp.iVersion != TARSVERSION)
// {
// throw TarsDecodeException("version not correct, version:" + TC_Common::tostr(rsp.iVersion));
// }
// if (rsp.cPacketType != TARSNORMAL)
// {
// throw TarsDecodeException("packettype not correct, packettype:" + TC_Common::tostr((int)rsp.cPacketType));
// }
// if (rsp.iRet < TARSSERVERUNKNOWNERR)
// {
// throw TarsDecodeException("response value not correct, value:" + TC_Common::tostr(rsp.iRet));
// }
// done.push_back(rsp);
// }
// }
// return pos;
// }
public: public:
request_protocol requestFunc; request_protocol requestFunc;
response_protocol responseFunc; response_protocol responseFunc;
response_ex_protocol responseExFunc; // response_ex_protocol responseExFunc;
}; };
class TC_NgHttp2; vector<char> http1Request(const tars::RequestPacket& request);
TC_NetWorkBuffer::PACKET_TYPE http1Response(TC_NetWorkBuffer &in, ResponsePacket& done);
void http1Request(const tars::RequestPacket& request, std::string& out);
size_t http1Response(const char* recvBuffer, size_t length, std::list<tars::ResponsePacket>& done);
std::string encodeHttp2(RequestPacket& request, TC_NgHttp2* session);
// ENCODE function, called by network thread // ENCODE function, called by network thread
void http2Request(const tars::RequestPacket& request, std::string& out); vector<char> http2Request(const tars::RequestPacket& request);
// DECODE function, called by network thread // DECODE function, called by network thread
size_t http2Response(const char* recvBuffer, size_t length, std::list<tars::ResponsePacket>& done, void* userData); TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket& done);
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////

View File

@ -380,10 +380,12 @@ public:
int expCode) = 0; int expCode) = 0;
}; };
typedef TC_AutoPtr<HttpCallback> HttpCallbackPtr;
class HttpServantProxyCallback : virtual public ServantProxyCallback class HttpServantProxyCallback : virtual public ServantProxyCallback
{ {
public: public:
explicit HttpServantProxyCallback(HttpCallback* cb); explicit HttpServantProxyCallback(const HttpCallbackPtr& cb);
/** /**
* *
@ -401,7 +403,7 @@ public:
private: private:
TC_AutoPtr<HttpCallback> _httpCb; HttpCallbackPtr _httpCb;
}; };
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
@ -675,7 +677,7 @@ public:
*/ */
void http_call_async(const std::map<std::string, std::string>& headers, void http_call_async(const std::map<std::string, std::string>& headers,
const std::string& body, const std::string& body,
HttpCallback* cb); const HttpCallbackPtr &cb);
/** /**
* RequestPacket中的context设置主调信息标识 * RequestPacket中的context设置主调信息标识

View File

@ -521,7 +521,7 @@ public:
* *
* @return http内容串 * @return http内容串
*/ */
string getContent() const { return _content; } const string &getContent() const { return _content; }
/** /**
* @brief http body(content-length). * @brief http body(content-length).

View File

@ -57,6 +57,8 @@ public:
method = it->second.method; method = it->second.method;
else else
return -1; return -1;
return 0;
} }
int getUri(int32_t reqid, string &uri) int getUri(int32_t reqid, string &uri)
@ -67,6 +69,8 @@ public:
uri = it->second.uri; uri = it->second.uri;
else else
return -1; return -1;
return 0;
} }
int getHeader(int32_t reqid, TC_Http::http_header_type &header) int getHeader(int32_t reqid, TC_Http::http_header_type &header)
@ -77,6 +81,8 @@ public:
header = it->second.header; header = it->second.header;
else else
return -1; return -1;
return 0;
} }
int getBody(int32_t reqid, string &body) int getBody(int32_t reqid, string &body)
@ -87,6 +93,8 @@ public:
body = it->second.body; body = it->second.body;
else else
return -1; return -1;
return 0;
} }
int doRequest(const vector<char> &request, vector<char>& response); int doRequest(const vector<char> &request, vector<char>& response);
@ -147,8 +155,8 @@ private:
TC_ThreadLock nghttpLock; TC_ThreadLock nghttpLock;
bool bOldVersion_; // bool bOldVersion_;
bool bUpgrade_; // bool bUpgrade_;
}; };
typedef TC_AutoPtr<TC_Http2Session> TC_Http2SessionPtr; typedef TC_AutoPtr<TC_Http2Session> TC_Http2SessionPtr;

View File

@ -219,6 +219,12 @@ public:
*/ */
uint32_t getValueOf4() const; uint32_t getValueOf4() const;
/**
* http协议判读
* @return
*/
TC_NetWorkBuffer::PACKET_TYPE checkHttp();
/** /**
* 1, (, ) * 1, (, )
* : buffer只返回包体, 1 * : buffer只返回包体, 1
@ -303,13 +309,6 @@ public:
static TC_NetWorkBuffer::PACKET_TYPE parseEcho(TC_NetWorkBuffer&in, vector<char> &out); static TC_NetWorkBuffer::PACKET_TYPE parseEcho(TC_NetWorkBuffer&in, vector<char> &out);
protected: protected:
/**
* http请求包是否齐全
* @param buffer, buffer中
* @return, true: http包完整了, false: http还不完整
*/
// bool checkHttp(std::string &buffer) const;
template<typename T> template<typename T>
T getValue() const T getValue() const
{ {
@ -365,6 +364,7 @@ protected:
moveHeader(length - sizeof(T)); moveHeader(length - sizeof(T));
return PACKET_FULL; return PACKET_FULL;
} }
protected: protected:
/** /**
* () * ()

View File

@ -58,15 +58,15 @@ struct Http2Response
/** /**
*@brief taf client网络线程使用的打包函数 *@brief taf client网络线程使用的打包函数
*/ */
void http2Request(const RequestPacket& , std::string& ); // void http2Request(const RequestPacket& , std::string& );
/** /**
*@brief NGHTTP2封装 *@brief NGHTTP2封装
*/ */
class TC_NgHttp2 class TC_NgHttp2
{ {
friend void http2Request(const RequestPacket& , std::string& ); // friend const vector<char> & http2Request(const RequestPacket&);
friend size_t http2Response(const char* , size_t , std::list<tars::ResponsePacket>& , void*); // friend size_t http2Response(const char* , size_t , std::list<tars::ResponsePacket>& , void*);
public: public:
/** /**
*@brief ng session的send回调 *@brief ng session的send回调

View File

@ -240,7 +240,6 @@ int TC_Http2Session::parse(string &in, string &out)
{ {
if(bNewCon_) if(bNewCon_)
{ {
//<2F><><EFBFBD>ӵ<EFBFBD><D3B5>׸<EFBFBD><D7B8><EFBFBD>
bNewCon_ = false; bNewCon_ = false;
nghttp2_settings_entry iv[2] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}, nghttp2_settings_entry iv[2] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100},
@ -251,7 +250,7 @@ int TC_Http2Session::parse(string &in, string &out)
int readlen = nghttp2_session_mem_recv(session_, (uint8_t *)in.c_str(), in.size()); int readlen = nghttp2_session_mem_recv(session_, (uint8_t *)in.c_str(), in.size());
if(readlen < 0) if(readlen < 0)
{ {
return TC_EpollServer::PACKET_ERR; return TC_NetWorkBuffer::PACKET_ERR;
} }
else else
{ {
@ -260,9 +259,9 @@ int TC_Http2Session::parse(string &in, string &out)
out.clear(); out.clear();
out.swap(reqout_); out.swap(reqout_);
if (out.empty()) if (out.empty())
return TC_EpollServer::PACKET_LESS; return TC_NetWorkBuffer::PACKET_LESS;
else else
return TC_EpollServer::PACKET_FULL; return TC_NetWorkBuffer::PACKET_FULL;
} }
} }

View File

@ -268,53 +268,22 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseBufferOf4(vector<char> &buf
// return PACKET_LESS; // return PACKET_LESS;
//} //}
TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseHttp(TC_NetWorkBuffer&in, vector<char> &out) TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::checkHttp()
{ {
try try
{ {
if(in._bufferList.empty()) const static int headerLen = 10;
if(_bufferList.empty() || getBufferLength() <= headerLen)
{ {
return PACKET_LESS; return PACKET_LESS;
} }
vector<char> buffer; vector<char> buffer;
getHeader(headerLen, buffer);
bool b; bool b = TC_HttpRequest::checkRequest(buffer.data(), buffer.size());
if(in._bufferList.empty())
{
b = false;
}
else
{
//不用size来判读只有一个元素, list的size会比较慢!!!
auto it = in._bufferList.begin();
auto pre = it;
++it;
if (it == in._bufferList.end())
{
//只有一个buffer
b = TC_HttpRequest::checkRequest(pre->data(), pre->size());
if (b)
{
buffer = in.getBuffers();
}
}
else
{
//todo 性能还需要优化
buffer = in.getBuffers();
b = TC_HttpRequest::checkRequest(buffer.data(), buffer.size());
}
}
if (b) if (b)
{ {
out.swap(buffer);
in.clearBuffers();
return PACKET_FULL; return PACKET_FULL;
} }
else else
@ -330,6 +299,19 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseHttp(TC_NetWorkBuffer&in, v
return PACKET_LESS; return PACKET_LESS;
} }
TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseHttp(TC_NetWorkBuffer&in, vector<char> &out)
{
TC_NetWorkBuffer::PACKET_TYPE b = in.checkHttp();
if (b == PACKET_FULL)
{
out = in.getBuffers();
in.clearBuffers();
}
return b;
}
TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseEcho(TC_NetWorkBuffer&in, vector<char> &out) TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseEcho(TC_NetWorkBuffer&in, vector<char> &out)
{ {
try try