http2 can only send slow

This commit is contained in:
jarodruan 2020-02-10 22:30:36 +08:00
parent 9f5da0f7bf
commit 877da51524
13 changed files with 154 additions and 1609 deletions

View File

@ -58,9 +58,6 @@ void httpCall(int excut_num)
stHttpReq.setCacheControl("no-cache");
stHttpReq.setGetRequest(sServer1);
// TC_TCPClient tcpClient1;
// tcpClient1.init("127.0.0.1", 8081, 3000);
int iRet = 0;
for (int i = 0; i<excut_num; i++)
@ -75,10 +72,9 @@ void httpCall(int excut_num)
cout <<"pthread id: " << TC_Thread::CURRENT_THREADID() << ", iRet:" << iRet <<endl;
}
++callback_count;
}
cout << "pthread id: " << TC_Thread::CURRENT_THREADID() << ", succ:" << param.count << "/" << excut_num << ", " << TC_TimeProvider::getInstance()->getNowMs() - _iTime <<"(ms)"<<endl;
cout << "httpCall, succ:" << param.count << "/" << excut_num << ", " << TC_TimeProvider::getInstance()->getNowMs() - _iTime <<"(ms)"<<endl;
}
struct TestHttpCallback : public HttpCallback
@ -94,7 +90,6 @@ struct TestHttpCallback : public HttpCallback
{
callback_count++;
cout << "onHttpResponse" << endl;
if(cur == count-1)
{
int64_t cost = TC_Common::now2us() - start;
@ -106,6 +101,8 @@ cout << "onHttpResponse" << endl;
virtual int onHttpResponseException(const std::map<std::string, std::string>& requestHeaders,
int expCode)
{
cout << "onHttpResponseException expCode:" << expCode << endl;
callback_count++;
return 0;
@ -121,8 +118,6 @@ void syncRpc(int c)
int64_t t = TC_Common::now2us();
std::map<std::string, std::string> header;
// header[":authority"] = "domain.com";
// header[":scheme"] = "http";
std::map<std::string, std::string> rheader;
//发起远程调用
@ -133,7 +128,7 @@ void syncRpc(int c)
try
{
param.servantPrx->http_call("GET", "http://127.0.0.1:8081", header, "helloworld", rheader, rbody);
param.servantPrx->http_call("GET", "/", header, "helloworld", rheader, rbody);
}
catch(exception& e)
{
@ -199,6 +194,13 @@ void asyncRpc2(int c)
{
cout << "exception:" << e.what() << endl;
}
TC_Common::msleep(10);
// while(i-callback_count > 0 )
// {
// TC_Common::msleep(100);
// }
}
int64_t cost = TC_Common::now2us() - t;
@ -239,6 +241,9 @@ int main(int argc, char *argv[])
param.servantPrx->tars_connect_timeout(5000);
param.servantPrx->tars_async_timeout(60*1000);
param.servant2Prx->tars_connect_timeout(5000);
param.servant2Prx->tars_async_timeout(60*1000);
ProxyProtocol proto;
proto.requestFunc = ProxyProtocol::http1Request;
proto.responseFunc = ProxyProtocol::http1Response;

View File

@ -46,16 +46,18 @@ void doRequestFunc(const TC_Http2Server::Req_Type reqtype, const string &requri,
int Http2Imp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
{
TC_Http2Server* session = getHttp2(current->getUId());
cout << "doRequest" << endl;
static bool flag = true;
if(flag)
{
//method 1:
vector<int32_t> vtReqid;
TC_Http2Server::doRequest(current->getRequestBuffer(), vtReqid);
// cout << "doRequest size:" << vtReqid.size() << endl;
TC_Http2Server::Http2Response rsp;
rsp.status = 200;
rsp.about = "OK";
@ -72,7 +74,7 @@ cout << "doRequest" << endl;
session->doRequest(current->getRequestBuffer(), doRequestFunc, buffer);
}
flag = !flag;
// flag = !flag;
return 0;
}

View File

@ -39,6 +39,7 @@ int HttpImp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
vector<char> v = current->getRequestBuffer();
string sBuf;
sBuf.assign(&v[0],v.size());
request.decode(sBuf);
TC_HttpResponse rsp;
string s="hello";

View File

@ -25,7 +25,6 @@ HttpServer g_app;
TC_NetWorkBuffer::PACKET_TYPE parseHttp2(TC_NetWorkBuffer&in, vector<char> &out)
{
cout << "parseHttp2" << endl;
TC_Http2Server*session = (TC_Http2Server*)(in.getContextData());
if(session == NULL)

View File

@ -12,7 +12,9 @@ sleep 3
echo "client: .\\bin\\Release\\HttpClient.exe"
.\\bin\\Release\\HttpClient.exe 2 10000
.\\bin\\Release\\HttpClient.exe --count=10000 --thread=2 --call=basehttp
.\\bin\\Release\\HttpClient.exe --count=10000 --thread=2 --call=synchttp2
.\\bin\\Release\\HttpClient.exe --count=10000 --thread=2 --call=asynchttp2
sleep 1

View File

@ -14,8 +14,8 @@ sleep 1
echo "client: ./bin/HttpClient"
./bin/HttpClient --count=10000 --thread=2 --call=basehttp
./bin/HttpClient --count=10000 --thread=2 --call=synchttp
./bin/HttpClient --count=10000 --thread=2 --call=asynchttp
./bin/HttpClient --count=10000 --thread=2 --call=synchttp2
./bin/HttpClient --count=10000 --thread=2 --call=asynchttp2
sleep 1

View File

@ -162,7 +162,7 @@ int AdapterProxy::invoke(ReqMessage * msg)
startTrack(msg);
#endif
msg->sReqData->addBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get()));
msg->sReqData->setBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get()));
//交给连接发送数据,连接连上,buffer不为空,直接发送数据成功
if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData) != Transceiver::eRetError)

View File

@ -86,6 +86,8 @@ static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id
void *user_data)
{
std::vector<char>* body = (std::vector<char>* )source->ptr;
// cout << "reqbody_read_callback:" << body->size() << endl;
if (body->empty())
{
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
@ -165,7 +167,6 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http1Response(TC_NetWorkBuffer &in,
// ENCODE function, called by network thread
vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *trans)
{
TC_Http2Client* session = trans->getHttp2Client();
std::vector<nghttp2_nv> nva;
@ -195,9 +196,10 @@ vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr
data.read_callback = reqbody_read_callback;
}
// cout << "pData:" << pData << ", " << data.read_callback << endl;
int32_t sid = nghttp2_submit_request(session->session(),
NULL,
&nva[0],
nva.data(),
nva.size(),
pData,
NULL);
@ -208,28 +210,49 @@ vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr
}
request.iRequestId = sid;
nghttp2_session_send(session->session());
int rv = nghttp2_session_send(session->session());
if (rv != 0) {
TLOGERROR("[TARS]http2Request::Fatal error: nghttp2_session_send return: " << nghttp2_strerror(rv) << endl);
return vector<char>();
}
// cout << "nghttp2_session_send, id:" << request.iRequestId << ", buff size:" << session->sendBuffer().size() << endl;
// if(session->sendBuffer().empty())
// {
// exit(0);
// }
// get data to send
vector<char> out;
out.swap(session->sendBuffer());
cout << "http2Request out buffer size:" << out.size() << endl;
return out;
}
TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in, ResponsePacket& rsp)
{
// cout << "http2Response" << endl;
TC_Http2Client* session = ((Transceiver*)(in.getConnection()))->getHttp2Client();
auto it = session->doneResponses().begin();
if(it == session->doneResponses().end() && !in.empty())
{
vector<char> buffer = in.getBuffers();
if(in.empty())
{
return TC_NetWorkBuffer::PACKET_LESS;
}
//merge to one buffer
in.mergeBuffers();
int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.data(), buffer.size());
pair<const char*, size_t> buffer = in.getBufferPointer();
int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.first, buffer.second);
// vector<char> buffer = in.getBuffers();
// int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.data(), buffer.size());
if (readlen < 0)
{

View File

@ -982,13 +982,6 @@ public:
}
}
// /**
// * 初始化处理线程,线程将会启动
// */
// template<typename T> void setHandle()
// {
// _pEpollServer->setHandleGroup<T>(_handleGroupName, _iHandleNum, this);
// }
/**
*
* @param index
@ -1121,12 +1114,6 @@ public:
* adapter的名字
*/
string _name;
/**
* handle分组名称
*/
string _handleGroupName;
/**
* fd
*/
@ -1517,10 +1504,6 @@ public:
*/
bool _bClose;
/**
*
*/
int _iMaxTemQueueSize;
/**
*
*/
@ -1688,7 +1671,7 @@ public:
/**
*
*/
NetThread(TC_EpollServer *epollServer);
NetThread(TC_EpollServer *epollServer, int index);
/**
*
@ -1766,48 +1749,6 @@ public:
*/
vector<TC_EpollServer::ConnStatus> getConnStatus(int lfd);
// /**
// * 获取连接数
// *
// * @return size_t
// */
// size_t getConnectionCount() { return _list.size(); }
// /**
// * 获取监听socket信息
// *
// * @return map<int,ListenSocket>
// */
// map<int, BindAdapterPtr> getListenSocketInfo();
// /**
// * 根据名称获取BindAdapter
// * @param sName
// * @return BindAdapterPtr
// */
// BindAdapterPtr getBindAdapter(const string &sName);
// /**
// * 关闭连接
// * @param uid
// */
// void close(unsigned int uid);
// /**
// * 发送数据
// * @param uid
// * @param s
// */
// void send(unsigned int uid, const string &s, const string &ip, uint16_t port);
// /**
// * 获取某一监听端口的连接数
// * @param lfd
// *
// * @return vector<TC_EpollServer::ConnStatus>
// */
// vector<TC_EpollServer::ConnStatus> getConnStatus(int lfd);
/**
*
*
@ -1855,11 +1796,6 @@ public:
*/
void setUdpRecvBufferSize(size_t nSize=DEFAULT_RECV_BUFFERSIZE);
// /*
// *当网络线程中listeners没有监听socket时使用adapter中设置的最大连接数
// */
// void setListSize(size_t iSize) { _listSize += iSize; }
/**
*
* @return size_t
@ -1898,28 +1834,6 @@ public:
*/
void delConnection(Connection *cPtr, bool bEraseList = true,EM_CLOSE_T closeType=EM_CLIENT_CLOSE);
// /**
// * 发送数据
// * @param cPtr
// * @param buffer
// */
// int sendBuffer(Connection *cPtr, const string &buffer, const string &ip, uint16_t port);
// /**
// * 发送数据
// * @param cPtr
// * @return int
// */
// int sendBuffer(Connection *cPtr);
// /**
// * 接收buffer
// * @param cPtr
// * @param buffer
// * @return int
// */
// int recvBuffer(Connection *cPtr, recv_queue::queue_type &v);
/**
*
*/
@ -1930,24 +1844,6 @@ public:
*/
void processNet(const epoll_event &ev);
// /**
// * 停止线程
// */
// void stopThread();
// /**
// * 新连接建立
// * @param fd
// */
// bool accept(int fd, int domain = AF_INET);
// /**
// * 绑定端口
// * @param ep
// * @param s
// */
// void bind(const TC_Endpoint &ep, TC_Socket &s);
/**
*
*/
@ -1978,16 +1874,6 @@ public:
*/
int _threadIndex;
/**
* socket
*/
// map<int, BindAdapterPtr> _listeners;
/**
* socket的网络线程时使adapter信息
*/
// size_t _listSize;
/**
* epoll
*/
@ -1998,11 +1884,6 @@ public:
*/
bool _bTerminate;
/**
* epoll是否已经创建
*/
// bool _createEpoll;
/**
* handle是否已经启动
*/
@ -2013,14 +1894,6 @@ public:
*/
TC_Epoller::NotifyInfo _notify;
// /**
// * 管道(用于关闭服务)
// */
// TC_Socket _shutdown;
// //管道(用于通知有数据需要发送就)
// TC_Socket _notify;
/**
*
*/
@ -2031,11 +1904,6 @@ public:
*/
send_queue _sbuffer;
/**
* BindAdapter是否有udp监听
*/
// bool _hasUdp;
/**
*
*/
@ -2056,21 +1924,6 @@ public:
*
*/
bool _notifySignal = false;
/**
* 线,使
*/
// TC_BufferPool* _bufferPool;
// /**
// * 该网络线程的内存池所负责分配的最小字节和最大字节(2的幂向上取整)
// */
// size_t _poolMinBlockSize;
// size_t _poolMaxBlockSize;
/**
* 线hold的最大字节
*/
// size_t _poolMaxBytes;
};
////////////////////////////////////////////////////////////////////////////
public:

View File

@ -199,6 +199,12 @@ public:
*/
pair<const char*, size_t> getBufferPointer() const;
/**
* buffer拼接起来
* @return string
*/
void mergeBuffers();
/**
* buffer(buffer拼接起来, )
* @return string

View File

@ -301,6 +301,7 @@ int TC_TCPClient::checkSocket()
//设置非阻塞模式
_socket.setblock(false);
_socket.setNoCloseWait();
int iRet;
#if TARGET_PLATFORM_LINUX

File diff suppressed because it is too large Load Diff

View File

@ -58,6 +58,21 @@ pair<const char*, size_t> TC_NetWorkBuffer::getBufferPointer() const
return make_pair(it->data() + _pos, it->size() - _pos);
}
void TC_NetWorkBuffer::mergeBuffers()
{
//merge to one buffer
if(_bufferList.size() > 1)
{
vector<char> buffer = getBuffers();
_pos = 0;
_bufferList.clear();
_bufferList.push_back(buffer);
}
assert(_bufferList.size() <= 1);
}
string TC_NetWorkBuffer::getBuffersString() const
{
string buffer;
@ -86,8 +101,8 @@ string TC_NetWorkBuffer::getBuffersString() const
vector<char> TC_NetWorkBuffer::getBuffers() const
{
vector<char> buffer;
buffer.resize(_length);
vector<char> buffer;
buffer.resize(_length);
auto it = _bufferList.begin();
@ -96,17 +111,17 @@ vector<char> TC_NetWorkBuffer::getBuffers() const
{
if(it == _bufferList.begin())
{
memcpy(&buffer[pos], it->data() + _pos, it->size() - _pos);
pos += it->size() - _pos;
memcpy(&buffer[pos], it->data() + _pos, it->size() - _pos);
pos += it->size() - _pos;
}
else
{
memcpy(&buffer[pos], it->data(), it->size());
pos += it->size();
memcpy(&buffer[pos], it->data(), it->size());
pos += it->size();
}
++it;
}
return buffer;
}
@ -261,65 +276,25 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseBufferOf4(vector<char> &buf
return parseBuffer<uint32_t>(buffer, minLength, maxLength);
}
//bool TC_NetWorkBuffer::checkHttp(string &buffer) const
//{
// //这样处理性能是有问题的, 有提升的空间
// buffer = getBuffers();
//
// return TC_HttpRequest::checkRequest(buffer.c_str(), buffer.length());
//}
//
//TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseHttp(TC_NetWorkBuffer&in, string &out)
//{
// try
// {
// string buffer;
// bool b = in.checkHttp(buffer);
// if (b)
// {
// out.swap(buffer);
// in.clearBuffers();;
// return PACKET_FULL;
// }
// else
// {
// return PACKET_LESS;
// }
// }
// catch (exception &ex)
// {
// return PACKET_ERR;
// }
//
// return PACKET_LESS;
//}
TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::checkHttp()
{
try
{
const static int headerLen = 10;
if(_bufferList.empty() || getBufferLength() <= headerLen)
mergeBuffers();
pair<const char*, size_t> buffer = getBufferPointer();
if(buffer.first == NULL || buffer.second == 0)
{
return PACKET_LESS;
}
vector<char> buffer;
getHeader(headerLen, buffer);
bool b = TC_HttpRequest::checkRequest(buffer.first, buffer.second);
bool b = TC_HttpRequest::checkRequest(buffer.data(), buffer.size());
if (b)
{
return PACKET_FULL;
}
else
{
return PACKET_LESS;
}
return b ? PACKET_FULL : PACKET_LESS;
}
catch (exception &ex)
{
cout << ex.what() << endl;
return PACKET_ERR;
}
@ -333,7 +308,6 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseHttp(TC_NetWorkBuffer&in, v
if (b == PACKET_FULL)
{
out = in.getBuffers();
in.clearBuffers();
}
return b;
@ -348,9 +322,8 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseEcho(TC_NetWorkBuffer&in, v
{
return PACKET_LESS;
}
out = in.getBuffers();
in.clearBuffers();
return TC_NetWorkBuffer::PACKET_FULL;
}
catch (exception &ex)