http2 sync succ

This commit is contained in:
jarodruan 2020-02-17 09:06:10 +08:00
parent 6154acda2a
commit c8c2800c09
10 changed files with 123 additions and 144 deletions

View File

@ -113,7 +113,7 @@ void asyncRpc2(int c)
std::map<std::string, std::string> header;
header[":path"] = "/";
header[":method"] = "GET";
header[":method"] = "POST";
header[":authority"] = "domain.com";
header[":scheme"] = "http";
@ -131,12 +131,10 @@ void asyncRpc2(int c)
cout << "exception:" << e.what() << endl;
}
TC_Common::msleep(10);
// while(i-callback_count > 0 )
// {
// TC_Common::msleep(100);
// }
if(i % 500 == 0)
{
TC_Common::msleep(100);
}
}
int64_t cost = TC_Common::now2us() - t;

View File

@ -65,9 +65,16 @@ int Http2Imp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
for(size_t i = 0; i < vtReqid.size(); i++)
{
string rbody;
session->getBody(vtReqid[i], rbody);
// cout << vtReqid[i] << ", " << rbody << endl;
vector<char> data;
session->doResponse(vtReqid[i], rsp, data);
buffer.insert(buffer.end(), data.begin(), data.end());
}
}
else

View File

@ -53,7 +53,7 @@
endpoint = tcp -h 0.0.0.0 -p 8082 -t 10000
allow =
maxconns = 4096
threads = 5
threads = 1
servant = TestApp.HttpServer.Http2Obj
queuecap = 1000000
protocol = not-tars

View File

@ -165,10 +165,21 @@ static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id
return len;
}
//std::mutex m;
// ENCODE function, called by network thread
vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *trans)
{
TC_Http2Client* session = trans->getHttp2Client();
// std::lock_guard<std::mutex> l(m);
TC_Http2Client* session = (TC_Http2Client*)trans->getSendBuffer()->getContextData();
if(session == NULL)
{
session = new TC_Http2Client();
trans->getSendBuffer()->setContextData(session, [=]{delete session;});
session->settings();
}
std::vector<nghttp2_nv> nva;
@ -212,18 +223,19 @@ vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr
request.iRequestId = sid;
// cout << "http2Request id:" << request.iRequestId << endl;
int rv = nghttp2_session_send(session->session());
if (rv != 0) {
TLOGERROR("[TARS]http2Request::Fatal error: nghttp2_session_send return: " << nghttp2_strerror(rv) << endl);
TLOGERROR("[TARS]http2Request::Fatal error: nghttp2_session_send return: " << nghttp2_strerror(rv) << ", " << std::this_thread::get_id() << ", session:" << session << endl);
return vector<char>();
}
// else
// {
// cout << "send" << endl;
// }
// cout << "nghttp2_session_send, id:" << request.iRequestId << ", buff size:" << session->_buffer().size() << endl;
// if(session->_buffer().empty())
// {
// exit(0);
// }
// get data to send
vector<char> out;
out.swap(session->sendBuffer());
@ -232,9 +244,19 @@ vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr
TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in, ResponsePacket& rsp)
{
// cout << "http2Response" << endl;
// std::lock_guard<std::mutex> l(m);
TC_Http2Client* session = ((Transceiver*)(in.getConnection()))->getHttp2Client();
TC_Http2Client* session = (TC_Http2Client*)((Transceiver*)(in.getConnection()))->getSendBuffer()->getContextData();
if(session == NULL)
{
session = new TC_Http2Client();
((Transceiver*)(in.getConnection()))->getSendBuffer()->setContextData(session, [=]{delete session;});
session->settings();
}
// cout << "http2Response:" << std::this_thread::get_id() << ", " << session << ", " << in.getBufferLength() << endl;
auto it = session->doneResponses().begin();
@ -249,12 +271,8 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in,
pair<const char*, size_t> buffer = in.getBufferPointer();
// cout << "size:" << buffer.second << endl;
int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.first, buffer.second);
// vector<char> buffer = in.getBuffers();
// int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.data(), buffer.size());
if (readlen < 0)
{
TLOGERROR("[TARS]http2Response::Fatal error: nghttp2_session_mem_recv error:" << nghttp2_strerror((int)readlen) << endl);
@ -266,6 +284,7 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in,
if(session->doneResponses().empty())
{
// cout << "doneResponses empty" << endl;
return TC_NetWorkBuffer::PACKET_LESS;
}
@ -274,6 +293,8 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in,
rsp.status = it->second.headers;
rsp.sBuffer.swap(it->second.body);
// cout << "http2Response id:" << it->second.streamId << endl;
session->doneResponses().erase(it);
return TC_NetWorkBuffer::PACKET_FULL;

View File

@ -14,25 +14,20 @@
* specific language governing permissions and limitations under the License.
*/
//#include <sys/uio.h>
#include "servant/Transceiver.h"
#include "servant/AdapterProxy.h"
#include "servant/Application.h"
#include "servant/TarsLogger.h"
#include "servant/AuthLogic.h"
//#include "servant/Auth.h"
#if TARS_SSL
#include "util/tc_openssl.h"
#endif
#if TARS_HTTP2
#include "util/tc_http2.h"
#endif
namespace tars
{
static const int BUFFER_SIZE = 10 * 1024;
static const int BUFFER_SIZE = 16 * 1024;
///////////////////////////////////////////////////////////////////////
Transceiver::Transceiver(AdapterProxy * pAdapterProxy,const EndpointInfo &ep)
@ -317,14 +312,6 @@ void Transceiver::close()
}
#endif
#if TARS_HTTP2
if(_http2Client)
{
delete _http2Client;
_http2Client = NULL;
}
#endif
_adapterProxy->getObjProxy()->getCommunicatorEpoll()->delFd(_fd,&_fdInfo,EPOLLIN|EPOLLOUT);
NetworkUtil::closeSocketNoThrow(_fd);
@ -342,21 +329,6 @@ void Transceiver::close()
TLOGTARS("[TARS][trans close:"<< _adapterProxy->getObjProxy()->name()<< "," << _ep.desc() << "]" << endl);
}
#if TARS_HTTP2
TC_Http2Client* Transceiver::getHttp2Client()
{
if(_http2Client == NULL)
{
_http2Client = new TC_Http2Client();
_http2Client->settings();
}
return _http2Client;
}
#endif
int Transceiver::doRequest()
{
if(!isValid()) return -1;
@ -470,6 +442,11 @@ int Transceiver::sendRequest(const shared_ptr<TC_NetWorkBuffer::Buffer> &buff)
return eRetError;
}
static std::atomic<int> totalSend{0};
totalSend += iRet;
// cout << "totalSend:" << totalSend << endl;
//没有全部发送完,写buffer 返回成功
if(iRet < (int)buff->length())
{
@ -524,12 +501,18 @@ int TcpTransceiver::doResponse()
int recvCount = 0;
static std::atomic<int> totalRecv{0};
do
{
char buff[BUFFER_SIZE] = {0x00};
if ((iRet = this->recv(buff, BUFFER_SIZE, 0)) > 0)
{
totalRecv += iRet;
// cout << "totalRecv:" << totalRecv << endl;
TC_NetWorkBuffer *rbuf = &_recvBuffer;
#if TARS_SSL
if (isSSL())

View File

@ -21,9 +21,7 @@
#include "servant/NetworkUtil.h"
#include "servant/CommunicatorEpoll.h"
#include "servant/AuthLogic.h"
// #include "util/tc_buffer.h"
#include <list>
//#include <sys/uio.h>
using namespace std;
@ -33,10 +31,6 @@ namespace tars
class TC_OpenSSL;
#endif
#if TARS_HTTP2
class TC_Http2Client;
#endif
class AdapterProxy;
//////////////////////////////////////////////////////////
@ -121,6 +115,18 @@ public:
*/
int sendRequest(const shared_ptr<TC_NetWorkBuffer::Buffer> &pData);
/**
* send buffer
* @return
*/
TC_NetWorkBuffer *getSendBuffer() { return &_sendBuffer; }
/**
* recv buffer
* @return
*/
TC_NetWorkBuffer *getRecvBuffer() { return &_recvBuffer; }
/*
* Send BufferCache是否有完整的包
* @return int
@ -227,9 +233,6 @@ public:
*/
bool sendAuthData(const BasicAuthInfo& );
#if TARS_HTTP2
TC_Http2Client* getHttp2Client();
#endif
protected:
/**
**
@ -281,9 +284,6 @@ protected:
std::shared_ptr<TC_OpenSSL> _openssl;
#endif
#if TARS_HTTP2
TC_Http2Client* _http2Client = NULL;
#endif
/*
* buffer
*/

View File

@ -145,7 +145,7 @@ public:
/**
* @brief HTTP2握手+setting
*/
int settings(unsigned int maxCurrentStreams = 1000);
int settings(unsigned int maxCurrentStreams = 2000);
/**
* @brief
*/

View File

@ -877,6 +877,8 @@ int TC_EpollServer::Connection::parseProtocol(TC_NetWorkBuffer &rbuf)
int TC_EpollServer::Connection::recvTcp()
{
static std::atomic<int> totalRecv{0};
int recvCount = 0;
TC_NetWorkBuffer *rbuf = &_recvBuffer;
@ -909,6 +911,8 @@ int TC_EpollServer::Connection::recvTcp()
}
else
{
// cout << "totalRecv:" << totalRecv << endl;
#if TARS_SSL
if (_pBindAdapter->getEndpoint().isSSL())
{
@ -1024,6 +1028,7 @@ int TC_EpollServer::Connection::recv()
int TC_EpollServer::Connection::sendBuffer()
{
static std::atomic<int> totalSend{0};
while(!_sendBuffer.empty())
{
pair<const char*, size_t> data = _sendBuffer.getBufferPointer();
@ -1045,6 +1050,9 @@ int TC_EpollServer::Connection::sendBuffer()
}
}
totalSend += iBytesSent;
// cout << "totalSend:" << totalSend << endl;
if(iBytesSent > 0)
{
_sendBuffer.moveHeader(iBytesSent);

View File

@ -35,6 +35,11 @@ static ssize_t str_read_callback(nghttp2_session *session, int32_t stream_id,
void *user_data)
{
TC_Http2Server::DataPack *dataPack = (TC_Http2Server::DataPack*)(source->ptr);
if(dataPack->readPos == dataPack->dataBuf.size())
{
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
return 0;
}
size_t size = std::min(dataPack->dataBuf.size() - dataPack->readPos, length);
memcpy(buf, dataPack->dataBuf.c_str() + dataPack->readPos, size);
@ -248,7 +253,7 @@ TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector<
{
_bNewCon = false;
nghttp2_settings_entry iv[2] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 1000},
nghttp2_settings_entry iv[2] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 2000},
{NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 100*1024*1024}};
nghttp2_submit_settings(_session, NGHTTP2_FLAG_NONE, iv, sizeof(iv)/sizeof(nghttp2_settings_entry));
@ -259,11 +264,13 @@ TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector<
auto buff = in.getBufferPointer();
int readlen = nghttp2_session_mem_recv(_session, (uint8_t *)buff.first, buff.second);
int readlen;
// vector<char> buff = in.getBuffers();
{
TC_LockT<TC_SpinLock> lock2(_nghttpLock);
// int readlen = nghttp2_session_mem_recv(_session, (uint8_t *)buff.data(), buff.size());
readlen = nghttp2_session_mem_recv(_session, (uint8_t *) buff.first, buff.second);
}
if(readlen < 0)
{
@ -273,12 +280,13 @@ TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector<
{
in.moveHeader(readlen);
TC_LockT<TC_SpinLock> lock1(reqLock_);
if (_reqout.empty())
{
return TC_NetWorkBuffer::PACKET_LESS;
}
// out.insert(out.end(), _reqout.begin(), _reqout.end());
out.swap(_reqout);
_reqout.clear();
}
@ -340,15 +348,19 @@ int TC_Http2Server::doResponse(int32_t reqid, const Http2Response &response, vec
TC_LockT<TC_SpinLock> lock(_nghttpLock);
ret = nghttp2_submit_response(_session, reqid, hdrs, response.header.size()+1, &data_prd);
if (ret != 0)
if (ret != 0) {
cout << "nghttp2_submit_response error" << endl;
return -1;
}
while (nghttp2_session_want_write(_session)) {
ret = nghttp2_session_send(_session);
if (ret != 0)
if (ret != 0) {
cout << "nghttp2_session_send error" << endl;
return -1;
}
}
}
delete [] hdrs;
@ -430,18 +442,24 @@ int TC_Http2Server::doRequest(const vector<char> &request, TC_Http2Server::Reque
int ret = nghttp2_submit_response(_session, ptr->streamId, hdrs, rsp.header.size()+1, &data_prd);
if (ret != 0)
{
cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl;
}
;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret));
while (nghttp2_session_want_write(_session)) {
ret = nghttp2_session_send(_session);
if (ret != 0)
{
cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl;
}
;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret));
}
}
{
TC_LockT<TC_SpinLock> lock(_responseBufLock);
response.swap(_responseBuf);
response.insert(response.begin(), _responseBuf.begin(), _responseBuf.end());
_responseBuf.clear();
}
@ -659,15 +677,15 @@ int TC_Http2Client::settings(unsigned int maxCurrentStreams)
{
nghttp2_settings_entry iv[2] = {
{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, maxCurrentStreams},
{NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 1024 * 1024 * 1024},
{NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 100 * 1024 * 1024},
};
/* 24 bytes magic string also will be sent*/
int rv = nghttp2_submit_settings(_session,
nghttp2_submit_settings(_session,
NGHTTP2_FLAG_NONE,
iv,
sizeof(iv)/sizeof(iv[0]));
rv = nghttp2_session_send(_session);
int rv = nghttp2_session_send(_session);
return rv;
}

View File

@ -9,14 +9,6 @@ using namespace std;
namespace tars
{
//void TC_NetWorkBuffer::addSwapBuffer(vector<char>& buff)
//{
// _length += buff.size();
//
// _bufferList.push_back(std::make_shared<Buffervector<char>());
// _bufferList.back().swap(buff);
//}
void TC_NetWorkBuffer::addBuffer(const shared_ptr<TC_NetWorkBuffer::Buffer> & buff)
{
_bufferList.push_back(buff);
@ -145,30 +137,6 @@ bool TC_NetWorkBuffer::getHeader(size_t len, std::string &buffer) const
buffer.resize(len);
getBuffers(&buffer[0], len);
//
// auto it = _bufferList.begin();
//
// size_t left = len;
//
// while(it != _bufferList.end())
// {
// if((*it)->length() >= left)
// {
// //当前buffer足够
// buffer.append((*it)->buffer(), left);
// return true;
// }
// else
// {
// //当前buffer不够
// buffer.append((*it)->buffer(), (*it)->length());
// left = left - (*it)->length();
// }
//
// ++it;
// }
// assert(buffer.length() == len);
return true;
}
@ -188,30 +156,6 @@ bool TC_NetWorkBuffer::getHeader(size_t len, std::vector<char> &buffer) const
buffer.resize(len);
getBuffers(&buffer[0], len);
//
// auto it = _bufferList.begin();
//
// size_t left = len;
//
// while(it != _bufferList.end())
// {
// if((*it)->length() >= left)
// {
// //当前buffer足够
// buffer.insert(buffer.end(), (*it)->buffer(), (*it)->buffer() + left);
// return true;
// }
// else
// {
// //当前buffer不够
// buffer.insert(buffer.end(), (*it)->buffer(), (*it)->buffer() + (*it)->length());
// left = left - (*it)->length();
// }
//
// ++it;
// }
//
// assert(buffer.size() == len);
return true;
}