fix http2 multi thread bug

This commit is contained in:
jarodruan 2020-02-17 18:52:11 +08:00
parent 8f3c4cdd7b
commit 0107665f77
8 changed files with 683 additions and 415 deletions

View File

@ -21,7 +21,7 @@ using namespace std;
TC_SpinLock Http2Imp::_mutex;
unordered_map<int32_t, TC_Http2Server*> Http2Imp::_http2;
unordered_map<int32_t, shared_ptr<TC_Http2Server>> Http2Imp::_http2;
//////////////////////////////////////////////////////
void Http2Imp::initialize()
@ -36,52 +36,77 @@ void Http2Imp::destroy()
//destroy servant here:
//...
}
void doRequestFunc(const TC_Http2Server::Req_Type reqtype, const string &requri, const TC_Http::http_header_type &reqHeader, const string &reqBody, TC_Http2Server::Http2Response &rsp)
{
rsp.status = 200;
rsp.about = "OK";
rsp.body = "response helloworld 2";
}
//
//void doRequestFunc(const TC_Http2Server::Req_Type reqtype, const string &requri, const TC_Http::http_header_type &reqHeader, const string &reqBody, TC_Http2Server::Http2Response &rsp)
//{
// rsp.status = 200;
// rsp.about = "OK";
// rsp.body = "response helloworld 2";
//}
int Http2Imp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
{
shared_ptr<TC_Http2Server> session = getHttp2(current->getUId());
TC_Http2Server* session = getHttp2(current->getUId());
// cout << "doRequest:" << session << ", buffer size:" << current->getRequestBuffer().size() << endl;
static bool flag = true;
if(flag)
{
//method 1:
vector<int32_t> vtReqid;
TC_Http2Server::doRequest(current->getRequestBuffer(), vtReqid);
vector<TC_Http2Server::Http2Context> contexts;
// cout << "doRequest size:" << vtReqid.size() << endl;
session->decodeRequest(contexts);
TC_Http2Server::Http2Response rsp;
rsp.status = 200;
rsp.about = "OK";
rsp.body = "response helloworld 1";
// cout << "doRequest context size:" << contexts.size() << endl;
for(size_t i = 0; i < vtReqid.size(); i++)
{
string rbody;
session->getBody(vtReqid[i], rbody);
for(size_t i = 0; i< contexts.size(); ++i)
{
TC_Http2Server::Http2Context & context = contexts[i];
vector<char> data;
// cout << vtReqid[i] << ", " << rbody << endl;
context.response.setHeader("X-Header", "TARS");
context.response.setResponse(200, "OK", context.request.getContent());
vector<char> data;
session->doResponse(vtReqid[i], rsp, data);
buffer.insert(buffer.end(), data.begin(), data.end());
int ret = session->encodeResponse(context, data);
if(ret != 0)
{
cout << "encodeResponse error:" << session->getErrMsg() << endl;
}
buffer.insert(buffer.end(), data.begin(), data.end());
}
// cout << "doRequest buffer size:" << buffer.size() << endl;
}
}
else
{
//method 2:
session->doRequest(current->getRequestBuffer(), doRequestFunc, buffer);
}
// static bool flag = true;
// if(flag)
// {
// //method 1:
// vector<int32_t> vtReqid;
// TC_Http2Server::doRequest(current->getRequestBuffer(), vtReqid);
//
// // cout << "doRequest size:" << vtReqid.size() << endl;
//
// TC_Http2Server::Http2Response rsp;
// rsp.status = 200;
// rsp.about = "OK";
// rsp.body = "response helloworld 1";
//
// for(size_t i = 0; i < vtReqid.size(); i++)
// {
// string rbody;
// session->getBody(vtReqid[i], rbody);
//
//// cout << vtReqid[i] << ", " << rbody << endl;
//
// vector<char> data;
// session->doResponse(vtReqid[i], rsp, data);
// buffer.insert(buffer.end(), data.begin(), data.end());
//
//
// }
// }
// else
// {
// //method 2:
// session->doRequest(current->getRequestBuffer(), doRequestFunc, buffer);
// }
// flag = !flag;
@ -90,6 +115,7 @@ int Http2Imp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
int Http2Imp::doClose(TarsCurrentPtr current)
{
cout << "doClose" << endl;
delHttp2(current->getUId());
return 0;

View File

@ -57,7 +57,7 @@ public:
*/
int doClose(TarsCurrentPtr current);
static TC_Http2Server *getHttp2(uint32_t uid)
static shared_ptr<TC_Http2Server> getHttp2(uint32_t uid)
{
TC_LockT<TC_SpinLock> lock(_mutex);
@ -70,7 +70,7 @@ public:
return NULL;
}
static void addHttp2(uint32_t uid, TC_Http2Server* ptr)
static void addHttp2(uint32_t uid, const shared_ptr<TC_Http2Server> &ptr)
{
TC_LockT<TC_SpinLock> lock(_mutex);
@ -93,7 +93,7 @@ protected:
static TC_SpinLock _mutex;
static unordered_map<int32_t, TC_Http2Server*> _http2;
static unordered_map<int32_t, shared_ptr<TC_Http2Server>> _http2;
};
/////////////////////////////////////////////////////
#endif

View File

@ -24,18 +24,27 @@ HttpServer g_app;
TC_NetWorkBuffer::PACKET_TYPE parseHttp2(TC_NetWorkBuffer&in, vector<char> &out)
{
TC_Http2Server*session = (TC_Http2Server*)(in.getContextData());
TC_Http2Server*sessionPtr = (TC_Http2Server*)(in.getContextData());
if(session == NULL)
if(sessionPtr == NULL)
{
session = new TC_Http2Server();
in.setContextData(session, [=]{delete session;});
shared_ptr<TC_Http2Server> session(new TC_Http2Server());
// in.setContextData(session, [=]{delete session;});
in.setContextData(session.get());
session->settings(3000);
TC_EpollServer::Connection *connection = (TC_EpollServer::Connection *)in.getConnection();
Http2Imp::addHttp2(connection->getId(), session);
sessionPtr = session.get();
}
return session->parse(in, out);
TC_NetWorkBuffer::PACKET_TYPE flag = sessionPtr->parse(in, out);
// cout << "parseHttp2:" << session << ", out size:" << out.size() << endl;
return flag;
}

View File

@ -21,16 +21,16 @@
#合并回调线程和网络线程(以网络线程个数为准)
mergenetasync = 0
#模块名称
modulename = TestApp.HttpServer
modulename = TestApp.Http2Server
</client>
#定义所有绑定的IP
<server>
closecout = 0
#应用名称
app = TestApp
#服务名称
server = HttpServer
server = Http2Server
#服务的数据目录,可执行文件,配置文件等
basepath = ./
datapath = ./
@ -54,7 +54,7 @@
allow =
maxconns = 4096
threads = 1
servant = TestApp.HttpServer.Http2Obj
servant = TestApp.Http2Server.Http2Obj
queuecap = 1000000
protocol = not-tars
</Http2Adapter>

View File

@ -88,6 +88,9 @@ struct TC_HttpRequest_Exception : public TC_Http_Exception
~TC_HttpRequest_Exception() throw(){};
};
class TC_HttpRequest;
class TC_HttpResponse;
/**
* @brief URL解析类.
*
@ -279,6 +282,8 @@ public:
void specialize();
protected:
friend class TC_HttpRequest;
/**
* @brief URL.
*
@ -524,6 +529,12 @@ public:
const string &getContent() const { return _content; }
/**
* @brief get body
* @return http body
*/
string &getContent() { return _content; }
/**
* @brief http body(content-length).
*
* @param content http body内容
@ -1156,10 +1167,40 @@ public:
int doRequest(TC_HttpResponse &stHttpRep, int iTimeout = 3000);
/**
* @brief .
* @brief get request type
*/
int requestType() const { return _requestType ; }
/**
* @brief set request type
*/
void setRequestType(int requestType) { _requestType = requestType ; }
/**
* set method
* @param
* @return method invalid, throw exception
*/
void setMethod(const char * sMethod);
/**
* set method
* @param
*/
void setPath(const char * sPath);
/**
* set domain
* @param
*/
void setDomain(const char * sDomain);
/**
* set schema
* @param
*/
void setScheme(const char * sScheme);
/**
* @brief GET请求.
*

View File

@ -13,10 +13,25 @@ namespace tars
class TC_Http2
{
public:
/**
* constructor
*/
TC_Http2();
/**
* deconstructor
*/
virtual ~TC_Http2();
struct DataPack
{
DataPack(const char *data, size_t length) : _dataBuf(data), _length(length) {}
const char* _dataBuf;
size_t _length;
size_t _readPos = 0;
};
/**
* @brief setting
*/
@ -39,11 +54,23 @@ public:
*/
void swap(vector<char> &buff) { _buff.swap(buff); }
/**
* insert buff
* @param buff
*/
void insertBuff(const char *buff, size_t length) { _buff.insert(_buff.end(), buff, buff + length); }
/**
* @brief session
*/
nghttp2_session* session() const { return _session; }
/**
*
* @return
*/
const char *getErrMsg();
protected:
/**
* error code
@ -61,41 +88,42 @@ protected:
vector<char> _buff;
};
class TC_Http2Server
class TC_Http2Server : public TC_Http2
{
public:
enum Req_Type
{
REQUEST_GET,
REQUEST_POST,
REQUEST_OPTIONS,
REQUEST_HEAD,
REQUEST_PUT,
REQUEST_DELETE
} ;
struct Http2Response
{
int status;
string about;
TC_Http::http_header_type header;
string body;
};
typedef std::function<void(const Req_Type reqtype, const string &requri, const TC_Http::http_header_type &reqHeader, const string &reqBody, Http2Response &rsp)> RequestFunc;
TC_Http2Server();
~TC_Http2Server();
/**
* get all http2 request id
* @param in
* @param out
* @return
*/
static int doRequest(const vector<char> &request, vector<int32_t>& vtReqid);
struct Http2Context
{
// Http2Context(int32_t id) : reqId(id) {}
int32_t reqId;
bool bFinish = false;
TC_HttpRequest request;
TC_HttpResponse response;
};
/**
* parse all request
* @param request
* @param unordered_map<int32_t, std::shared_ptr<TC_HttpRequest>>
* @return
*/
void decodeRequest(vector<Http2Context> &contexts);
/**
*
* @param reqid
* @param response
* @param out
* @return
*/
int encodeResponse(const Http2Context &context, vector<char> &out);
/**
* http2
@ -105,61 +133,37 @@ public:
*/
TC_NetWorkBuffer::PACKET_TYPE parse(TC_NetWorkBuffer&in, vector<char> &out);
int doResponse(int32_t reqid, const Http2Response &response, vector<char>& out);
int doRequest(const vector<char> &request, RequestFunc func, vector<char>& response);
int getMethod(int32_t reqid, Req_Type &method);
int getUri(int32_t reqid, string &uri);
int getHeader(int32_t reqid, TC_Http::http_header_type &header);
int getBody(int32_t reqid, string &body);
void appendResponseBuf(const char *buff, size_t length);
void onHeaderCallback(int32_t streamId);
void onHeaderCallback(int32_t streamId, const string &skey, const string &svalue);
void onFrameRecvCallback(int32_t streamId);
void onDataChunkRecvCallback(int32_t streamId, const char *data, size_t len);
void onStreamCloseCallback(int32_t streamId);
struct DataPack
{
DataPack(){}
protected:
DataPack(const string &data, int pos):dataBuf(data), readPos(pos){}
string dataBuf;
unsigned int readPos;
};
Http2Context &getContext(int32_t streamId);
void deleteContext(int32_t streamId);
protected:
struct RequestPack
{
RequestPack():streamId(0), bFinish(false){}
Req_Type method;
string uri;
TC_Http::http_header_type header;
string body;
int32_t streamId;
bool bFinish;
};
// TC_SpinLock _contextLock;
// TC_ThreadMutex _contextLock;
TC_SpinLock _responseBufLock;
vector<char> _responseBuf;
// TC_SpinLock reqLock_;
// unordered_map<int32_t, RequestPack> _mReq;
TC_SpinLock reqLock_;
unordered_map<int32_t, RequestPack> _mReq;
// unordered_map<int32_t, RequestPack> _mReq;
// unordered_map<int32_t, std::shared_ptr<Http2Context>> _context;
unordered_map<int32_t, Http2Context> _context;
vector<Http2Context> _contextFinished;
vector<char> _reqout;
nghttp2_session *_session;
bool _bNewCon;
TC_SpinLock _nghttpLock;
TC_ThreadMutex _nghttpLock;
};
/////////////////////////////////////////////////////////////////////////////////

View File

@ -2093,6 +2093,58 @@ void TC_HttpRequest::getHostPort(string &sDomain, uint32_t &iPort)
iPort = TC_Common::strto<uint32_t>(_httpURL.getPort());
}
void TC_HttpRequest::setMethod(const char * sMethod)
{
//解析请求类型
if(TC_Port::strcasecmp(sMethod, "GET") ==0) //if(sMethod == "GET")
{
_requestType = REQUEST_GET;
}
else if(TC_Port::strcasecmp(sMethod, "POST") ==0) //else if(sMethod == "POST")
{
_requestType = REQUEST_POST;
}
else if(TC_Port::strcasecmp(sMethod, "PUT") ==0)
{
_requestType = REQUEST_PUT;
}
else if(TC_Port::strcasecmp(sMethod, "PATCH") ==0)
{
_requestType = REQUEST_PATCH;
}
else if(TC_Port::strcasecmp(sMethod, "OPTIONS") ==0) //else if(sMethod == "OPTIONS")
{
_requestType = REQUEST_OPTIONS;
}
else if(TC_Port::strcasecmp(sMethod, "HEAD") == 0)
{
_requestType = REQUEST_HEAD;
}
else if(TC_Port::strcasecmp(sMethod, "DELETE") == 0)
{
_requestType = REQUEST_DELETE;
}
else
{
throw TC_HttpRequest_Exception("[TC_HttpRequest::setMethod] http request command error: " + string(sMethod));
}
}
void TC_HttpRequest::setPath(const char *path)
{
_httpURL._sPath = path;
}
void TC_HttpRequest::setDomain(const char * sDomain)
{
_httpURL._sDomain = sDomain;
}
void TC_HttpRequest::setScheme(const char * sScheme)
{
_httpURL._sScheme = sScheme;
}
int TC_HttpRequest::doRequest(TC_HttpResponse &stHttpRsp, int iTimeout)
{
//只支持短连接模式

View File

@ -51,8 +51,14 @@ int TC_Http2::settings(unsigned int maxCurrentStreams)
NGHTTP2_FLAG_NONE,
iv,
sizeof(iv)/sizeof(iv[0]));
int rv = nghttp2_session_send(_session);
return rv;
_err = nghttp2_session_send(_session);
return _err;
}
const char *TC_Http2::getErrMsg()
{
return nghttp2_strerror(_err);
}
///////////////////////////////////////////////////////////////////////////////////
@ -65,19 +71,19 @@ static ssize_t str_read_callback(nghttp2_session *session, int32_t stream_id,
nghttp2_data_source *source,
void *user_data)
{
TC_Http2Server::DataPack *dataPack = (TC_Http2Server::DataPack*)(source->ptr);
if(dataPack->readPos == dataPack->dataBuf.size())
TC_Http2::DataPack *dataPack = (TC_Http2::DataPack*)(source->ptr);
if(dataPack->_readPos == dataPack->_length)
{
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
return 0;
}
size_t size = std::min(dataPack->dataBuf.size() - dataPack->readPos, length);
size_t size = std::min(dataPack->_length - dataPack->_readPos, length);
memcpy(buf, dataPack->dataBuf.c_str() + dataPack->readPos, size);
memcpy(buf, dataPack->_dataBuf + dataPack->_readPos, size);
dataPack->readPos += size;
dataPack->_readPos += size;
if(dataPack->readPos == dataPack->dataBuf.size())
if(dataPack->_readPos == dataPack->_length)
{
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
@ -89,7 +95,7 @@ static ssize_t send_callback(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
TC_Http2Server *ptr = (TC_Http2Server*)user_data;
ptr->appendResponseBuf((const char*)data, length);
ptr->insertBuff((const char*)data, length);
return (ssize_t)length;
}
@ -97,7 +103,7 @@ static ssize_t send_callback(nghttp2_session *session, const uint8_t *data,
static int on_header_callback(nghttp2_session *session,
const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
size_t valuelen, uint8_t flags, void *user_data)
{
TC_Http2Server *ptr = (TC_Http2Server*)user_data;
ptr->onHeaderCallback(frame->hd.stream_id, string((char*)name, namelen), string((char*)value, valuelen));
@ -106,7 +112,7 @@ static int on_header_callback(nghttp2_session *session,
static int on_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
void *user_data)
{
TC_Http2Server *ptr = (TC_Http2Server*)user_data;
@ -118,16 +124,16 @@ static int on_begin_headers_callback(nghttp2_session *session,
return 0;
}
static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
{
TC_Http2Server *ptr = (TC_Http2Server*)user_data;
switch (frame->hd.type)
switch (frame->hd.type)
{
case NGHTTP2_DATA:
case NGHTTP2_HEADERS:
/* Check that the client request has finished */
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
{
ptr->onFrameRecvCallback(frame->hd.stream_id);
return 0;
@ -141,7 +147,7 @@ static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame
static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *data,
size_t len, void *user_data)
size_t len, void *user_data)
{
TC_Http2Server *ptr = (TC_Http2Server*)user_data;
@ -150,7 +156,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
return 0;
}
static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data)
static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data)
{
TC_Http2Server *ptr = (TC_Http2Server*)user_data;
ptr->onStreamCloseCallback(stream_id);
@ -160,93 +166,97 @@ static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
}
void TC_Http2Server::appendResponseBuf(const char *buff, size_t length)
{
TC_LockT<TC_SpinLock> lock(_responseBufLock);
_responseBuf.insert(_responseBuf.end(), buff, buff + length);
}
void TC_Http2Server::onHeaderCallback(int streamId)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
_mReq[streamId].streamId = streamId;
// TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
_context[streamId].reqId = streamId;
}
void TC_Http2Server::onHeaderCallback(int32_t streamId, const string &skey, const string &svalue)
{
TC_LockT<TC_SpinLock>lock(reqLock_);
auto it = _mReq.find(streamId);
if (it != _mReq.end())
{
it->second.header.insert(std::pair<string, string>(skey, svalue));
TC_Http2Server::Http2Context & context = getContext(streamId);
if (TC_Port::strcasecmp(skey.c_str(), ":method") == 0)
{
// string sMethod = TC_Common::upper(TC_Common::trim(svalue));
if (TC_Port::strcasecmp(svalue.c_str(), "GET") == 0)
it->second.method = REQUEST_GET;
else if (TC_Port::strcasecmp(svalue.c_str(), "POST") == 0)
it->second.method = REQUEST_POST;
else if (TC_Port::strcasecmp(svalue.c_str(), "OPTIONS") == 0)
it->second.method = REQUEST_OPTIONS;
else if (TC_Port::strcasecmp(svalue.c_str(), "HEAD") == 0)
it->second.method = REQUEST_HEAD;
else if (TC_Port::strcasecmp(svalue.c_str(), "PUT") == 0)
it->second.method = REQUEST_PUT;
else if (TC_Port::strcasecmp(svalue.c_str(), "DELETE") == 0)
it->second.method = REQUEST_DELETE;
}
else if (TC_Port::strcasecmp(skey.c_str(), ":path") == 0)
{
it->second.uri = svalue;
}
if (TC_Port::strcasecmp(skey.c_str(), ":method") == 0)
{
context.request.setMethod(svalue.c_str());
}
else if (TC_Port::strcasecmp(skey.c_str(), ":path") == 0)
{
context.request.setPath(svalue.c_str());
}
else if (TC_Port::strcasecmp(skey.c_str(), ":scheme") == 0)
{
context.request.setScheme(svalue.c_str());
}
else if (TC_Port::strcasecmp(skey.c_str(), ":authority") == 0)
{
context.request.setDomain(svalue.c_str());
}
context.request.setHeader(skey, svalue);
}
void TC_Http2Server::onFrameRecvCallback(int32_t streamId)
void TC_Http2Server::onFrameRecvCallback(int32_t streamId)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
auto it = _mReq.find(streamId);
if (it != _mReq.end())
{
it->second.bFinish = true;
TC_Http2Server::Http2Context& context = getContext(streamId);
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(streamId);
// if (it != _mReq.end())
// {
// it->second.bFinish = true;
if(context.request.getHeaders().find(":method") != context.request.getHeaders().end() ||
context.request.getHeaders().find(":path") != context.request.getHeaders().end() ||
context.request.getHeaders().find(":scheme") != context.request.getHeaders().end())
{
context.bFinish = true;
_contextFinished.push_back(context);
_reqout.insert(_reqout.end(), (char*)&streamId, (char*)&streamId + sizeof(int32_t));
deleteContext(streamId);
if(it->second.header.find(":method") != it->second.header.end() ||
it->second.header.find(":path") != it->second.header.end() ||
it->second.header.find(":scheme") != it->second.header.end())
{
char *tmpptr = (char*)&(it->second);
_reqout.insert(_reqout.end(), (char*)&tmpptr, (char*)&tmpptr + sizeof(TC_Http2Server::RequestPack *));
}
}
// }
}
void TC_Http2Server::onDataChunkRecvCallback(int32_t streamId, const char *data, size_t len)
void TC_Http2Server::onDataChunkRecvCallback(int32_t streamId, const char *data, size_t len)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
auto it = _mReq.find(streamId);
if (it != _mReq.end())
{
it->second.body.append(data, len);
}
TC_Http2Server::Http2Context &context = getContext(streamId);
context.request.getContent().append(data, len);
//
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(streamId);
// if (it != _mReq.end())
// {
// it->second.body.append(data, len);
// }
}
void TC_Http2Server::onStreamCloseCallback(int32_t streamId)
void TC_Http2Server::onStreamCloseCallback(int32_t streamId)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
auto it = _mReq.find(streamId);
if (it != _mReq.end())
{
if (it->second.bFinish != true)
{
_mReq.erase(streamId);
}
}
// TC_Http2Server::Http2Context & context = getContext(streamId);
// {
deleteContext(streamId);
// }
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(streamId);
// if (it != _mReq.end())
// {
// if (it->second.bFinish != true)
// {
// _mReq.erase(streamId);
// }
// }
}
TC_Http2Server::TC_Http2Server(): _session(NULL), _bNewCon(true)
TC_Http2Server::TC_Http2Server(): _bNewCon(true)
{
nghttp2_session_callbacks *callbacks;
@ -266,42 +276,32 @@ TC_Http2Server::TC_Http2Server(): _session(NULL), _bNewCon(true)
nghttp2_session_server_new(&_session, callbacks, ((void*)this));
// *(int32_t*)((char*)_session + 2380) = 100000000;
//TLOGDEBUG("window size:" << nghttp2_session_get_remote_window_size(_session) << endl);
nghttp2_session_callbacks_del(callbacks);
}
TC_Http2Server::~TC_Http2Server()
{
nghttp2_session_del(_session);
}
TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector<char> &out)
{
if(_bNewCon)
{
_bNewCon = false;
// cout << "parse:" << in.getBufferLength() << endl;
nghttp2_settings_entry iv[2] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 3000},
{NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 100*1024*1024}};
nghttp2_submit_settings(_session, NGHTTP2_FLAG_NONE, iv, sizeof(iv)/sizeof(nghttp2_settings_entry));
nghttp2_session_send(_session);
}
// if(_bNewCon)
// {
// _bNewCon = false;
//
// }
in.mergeBuffers();
auto buff = in.getBufferPointer();
int readlen;
// int readlen;
{
TC_LockT<TC_SpinLock> lock2(_nghttpLock);
TC_LockT<TC_ThreadMutex> lock2(_nghttpLock);
readlen = nghttp2_session_mem_recv(_session, (uint8_t *) buff.first, buff.second);
}
int readlen = nghttp2_session_mem_recv(_session, (uint8_t *) buff.first, buff.second);
if(readlen < 0)
{
@ -311,7 +311,7 @@ TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector<
{
in.moveHeader(readlen);
TC_LockT<TC_SpinLock> lock1(reqLock_);
// TC_LockT<TC_ThreadMutex> lock1(reqLock_);
if (_reqout.empty())
{
@ -325,233 +325,369 @@ TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector<
return TC_NetWorkBuffer::PACKET_FULL;
}
int TC_Http2Server::doRequest(const vector<char> &request, vector<int32_t>& vtReqid)
//void TC_Http2Server::createReq(int32_t streamId)
//{
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
//
// _context[streamId] = std::make_shared<TC_HttpRequest>();
//}
TC_Http2Server::Http2Context& TC_Http2Server::getContext(int32_t streamId)
{
vtReqid.clear();
// TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
auto it = _context.find(streamId);
if (it != _context.end())
{
return it->second;
}
for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Server::RequestPack *))
{
RequestPack *ptr;
memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Server::RequestPack *));
assert(false);
vtReqid.push_back(ptr->streamId);
}
return 0;
}
int TC_Http2Server::doResponse(int32_t reqid, const Http2Response &response, vector<char>& out)
void TC_Http2Server::deleteContext(int32_t streamId)
{
{
TC_LockT<TC_SpinLock> lock(reqLock_);
auto it = _mReq.find(reqid);
if (it == _mReq.end())
return -1;
}
string sstatus = TC_Common::tostr(response.status);
// TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
const char* strstatus = ":status";
nghttp2_nv *hdrs = new nghttp2_nv[response.header.size() + 1];
hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[0].name = (uint8_t*)strstatus;
hdrs[0].namelen = 7;
hdrs[0].value = (uint8_t*)sstatus.c_str();
hdrs[0].valuelen = sstatus.size();
TC_Http::http_header_type::const_iterator it = response.header.begin();
for (int n = 1; it != response.header.end(); n++, it++)
{
hdrs[n].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[n].name = (uint8_t*)it->first.c_str();
hdrs[n].namelen = it->first.size();
hdrs[n].value = (uint8_t*)it->second.c_str();
hdrs[n].valuelen = it->second.size();
}
DataPack dataPack;
dataPack.readPos = 0;
dataPack.dataBuf = response.body;
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void*)&dataPack;
data_prd.read_callback = server::str_read_callback;
int ret ;
{
TC_LockT<TC_SpinLock> lock(_nghttpLock);
ret = nghttp2_submit_response(_session, reqid, hdrs, response.header.size()+1, &data_prd);
if (ret != 0) {
cout << "nghttp2_submit_response error" << endl;
return -1;
}
while (nghttp2_session_want_write(_session)) {
ret = nghttp2_session_send(_session);
if (ret != 0) {
cout << "nghttp2_session_send error" << endl;
return -1;
}
}
}
delete [] hdrs;
{
TC_LockT<TC_SpinLock> lock(_responseBufLock);
out.swap(_responseBuf);
_responseBuf.clear();
}
{
TC_LockT<TC_SpinLock> lock(reqLock_);
_mReq.erase(reqid);
}
return 0;
_context.erase(streamId);
}
int TC_Http2Server::doRequest(const vector<char> &request, TC_Http2Server::RequestFunc requestFunc, vector<char>& response)
void TC_Http2Server::decodeRequest(vector<Http2Context> &contexts)
{
for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Server::RequestPack *))
{
Http2Response rsp;
TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
RequestPack *ptr;
memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Server::RequestPack *));
_contextFinished.swap(contexts);
Req_Type qstatus;
// return context;
}
//
//vector<int32_t> TC_Http2Server::parseReqId(const vector<char> &request)
//{
// vector<int32_t> vtReqid;
// for (unsigned int i = 0; i < request.size(); i += sizeof(int32_t))
// {
// int32_t reqId;
// memcpy(&reqId, &request[i], sizeof(int32_t));
// vtReqid.push_back(reqId);
// }
//
// return vtReqid;
//}
const char* sMethod = ptr->header.find(":method")->second.c_str(); //TC_Common::upper(TC_Common::trim(ptr->header.find(":method")->second));
if (TC_Port::strcasecmp(sMethod, "GET") == 0)
qstatus = REQUEST_GET;
else if (TC_Port::strcasecmp(sMethod, "POST") == 0)
qstatus = REQUEST_POST;
else if (TC_Port::strcasecmp(sMethod, "OPTIONS") == 0)
qstatus = REQUEST_OPTIONS;
else if (TC_Port::strcasecmp(sMethod, "HEAD") == 0)
qstatus = REQUEST_HEAD;
else if (TC_Port::strcasecmp(sMethod, "PUT") == 0)
qstatus = REQUEST_PUT;
else if (TC_Port::strcasecmp(sMethod, "DELETE") == 0)
qstatus = REQUEST_DELETE;
else
{
continue;
}
string sstatus = ptr->header.find(":path")->second;
int TC_Http2Server::encodeResponse(const TC_Http2Server::Http2Context &context, vector<char> &out)
{
// {
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it == _mReq.end())
// return -1;
// }
requestFunc(qstatus, sstatus, ptr->header, ptr->body, rsp);
// deleteContext(context->reqId);
DataPack dataPack;
dataPack.readPos = 0;
dataPack.dataBuf = rsp.body;
string sstatus = TC_Common::tostr(context.response.getStatus());
sstatus = TC_Common::tostr(rsp.status);
const char* strstatus = ":status";
const char* strstatus = ":status";
nghttp2_nv *hdrs = new nghttp2_nv[rsp.header.size() + 1];
hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[0].name = (uint8_t*)strstatus;
hdrs[0].namelen = 7;
hdrs[0].value = (uint8_t*)sstatus.c_str();
hdrs[0].valuelen = sstatus.size();
TC_Http::http_header_type::iterator it = rsp.header.begin();
for (int n = 1; it != rsp.header.end(); n++, it++)
{
hdrs[n].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[n].name = (uint8_t*)it->first.c_str();
hdrs[n].namelen = it->first.size();
hdrs[n].value = (uint8_t*)it->second.c_str();
hdrs[n].valuelen = it->second.size();
}
nghttp2_nv *hdrs = new nghttp2_nv[context.response.getHeaders().size() + 1];
hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[0].name = (uint8_t*)strstatus;
hdrs[0].namelen = 7;
hdrs[0].value = (uint8_t*)sstatus.c_str();
hdrs[0].valuelen = sstatus.size();
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void*)&dataPack;
data_prd.read_callback = server::str_read_callback;
{
TC_LockT<TC_SpinLock> lock(_nghttpLock);
TC_Http::http_header_type::const_iterator it = context.response.getHeaders().begin();
for (int n = 1; it != context.response.getHeaders().end(); n++, it++)
{
hdrs[n].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[n].name = (uint8_t*)it->first.c_str();
hdrs[n].namelen = it->first.size();
hdrs[n].value = (uint8_t*)it->second.c_str();
hdrs[n].valuelen = it->second.size();
}
int ret = nghttp2_submit_response(_session, ptr->streamId, hdrs, rsp.header.size()+1, &data_prd);
if (ret != 0)
{
cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl;
}
;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret));
const string &body = context.response.getContent();
while (nghttp2_session_want_write(_session)) {
ret = nghttp2_session_send(_session);
if (ret != 0)
{
cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl;
}
;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret));
}
}
DataPack dataPack(body.c_str(), body.size());
// dataPack.readPos = 0;
// dataPack.dataBuf = response.body;
{
TC_LockT<TC_SpinLock> lock(_responseBufLock);
response.insert(response.begin(), _responseBuf.begin(), _responseBuf.end());
_responseBuf.clear();
}
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void*)&dataPack;
data_prd.read_callback = server::str_read_callback;
{
TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
delete [] hdrs;
{
TC_LockT<TC_SpinLock> lock(reqLock_);
_mReq.erase(ptr->streamId);
}
_err = nghttp2_submit_response(_session, context.reqId, hdrs, context.response.getHeaders().size()+1, &data_prd);
if (_err != 0 ) {
delete [] hdrs;
}
return _err;
}
return 0;
while (nghttp2_session_want_write(_session)) {
_err = nghttp2_session_send(_session);
if (_err != 0) {
delete [] hdrs;
return _err;
}
}
this->swap(out);
}
delete [] hdrs;
return 0;
// {
// TC_LockT<TC_ThreadMutex> lock(_responseBufLock);
// out.swap(_responseBuf);
// _responseBuf.clear();
// }
//
// {
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// _mReq.erase(reqid);
// }
}
int TC_Http2Server::getMethod(int32_t reqid, Req_Type &method)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
auto it = _mReq.find(reqid);
if (it != _mReq.end())
method = it->second.method;
else
return -1;
return 0;
}
int TC_Http2Server::getUri(int32_t reqid, string &uri)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
auto it = _mReq.find(reqid);
if (it != _mReq.end())
uri = it->second.uri;
else
return -1;
return 0;
}
int TC_Http2Server::getHeader(int32_t reqid, TC_Http::http_header_type &header)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
auto it = _mReq.find(reqid);
if (it != _mReq.end())
header = it->second.header;
else
return -1;
return 0;
}
int TC_Http2Server::getBody(int32_t reqid, string &body)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
auto it = _mReq.find(reqid);
if (it != _mReq.end())
body = it->second.body;
else
return -1;
return 0;
}
//
//int TC_Http2Server::doRequest(const vector<char> &request, vector<int32_t>& vtReqid)
//{
// vtReqid.clear();
//
// for (unsigned int i = 0; i < request.size(); i += sizeof(int32_t))
// {
//// RequestPack *ptr;
//// memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Server::RequestPack *));
//
// int32_t reqId;
// memcpy(&reqId, &request[i], sizeof(int32_t));
// vtReqid.push_back(reqId);
// }
//
// return 0;
//}
//
//int TC_Http2Server::doResponse(int32_t reqid, const Http2Response &response, vector<char>& out)
//{
// {
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it == _mReq.end())
// return -1;
// }
// string sstatus = TC_Common::tostr(response.status);
//
// const char* strstatus = ":status";
// nghttp2_nv *hdrs = new nghttp2_nv[response.header.size() + 1];
// hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
// hdrs[0].name = (uint8_t*)strstatus;
// hdrs[0].namelen = 7;
// hdrs[0].value = (uint8_t*)sstatus.c_str();
// hdrs[0].valuelen = sstatus.size();
// TC_Http::http_header_type::const_iterator it = response.header.begin();
// for (int n = 1; it != response.header.end(); n++, it++)
// {
// hdrs[n].flags = NGHTTP2_NV_FLAG_NONE;
// hdrs[n].name = (uint8_t*)it->first.c_str();
// hdrs[n].namelen = it->first.size();
// hdrs[n].value = (uint8_t*)it->second.c_str();
// hdrs[n].valuelen = it->second.size();
// }
//
// DataPack dataPack;
// dataPack.readPos = 0;
// dataPack.dataBuf = response.body;
//
// nghttp2_data_provider data_prd;
// data_prd.source.ptr = (void*)&dataPack;
// data_prd.read_callback = server::str_read_callback;
// int ret ;
// {
// TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
//
// ret = nghttp2_submit_response(_session, reqid, hdrs, response.header.size()+1, &data_prd);
// if (ret != 0) {
// cout << "nghttp2_submit_response error" << endl;
// return -1;
// }
//
// while (nghttp2_session_want_write(_session)) {
// ret = nghttp2_session_send(_session);
// if (ret != 0) {
// cout << "nghttp2_session_send error" << endl;
// return -1;
// }
// }
// }
//
// delete [] hdrs;
//
// this->swap(out);
//// {
//// TC_LockT<TC_ThreadMutex> lock(_responseBufLock);
//// out.swap(_responseBuf);
//// _responseBuf.clear();
//// }
//
// {
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// _mReq.erase(reqid);
// }
//
// return 0;
//}
//
//int TC_Http2Server::doRequest(const vector<char> &request, TC_Http2Server::RequestFunc requestFunc, vector<char>& response)
//{
// for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Server::RequestPack *))
// {
// Http2Response rsp;
//
// RequestPack *ptr;
// memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Server::RequestPack *));
//
// Req_Type qstatus;
//
// const char* sMethod = ptr->header.find(":method")->second.c_str(); //TC_Common::upper(TC_Common::trim(ptr->header.find(":method")->second));
// if (TC_Port::strcasecmp(sMethod, "GET") == 0)
// qstatus = REQUEST_GET;
// else if (TC_Port::strcasecmp(sMethod, "POST") == 0)
// qstatus = REQUEST_POST;
// else if (TC_Port::strcasecmp(sMethod, "OPTIONS") == 0)
// qstatus = REQUEST_OPTIONS;
// else if (TC_Port::strcasecmp(sMethod, "HEAD") == 0)
// qstatus = REQUEST_HEAD;
// else if (TC_Port::strcasecmp(sMethod, "PUT") == 0)
// qstatus = REQUEST_PUT;
// else if (TC_Port::strcasecmp(sMethod, "DELETE") == 0)
// qstatus = REQUEST_DELETE;
// else
// {
// continue;
// }
// string sstatus = ptr->header.find(":path")->second;
//
// requestFunc(qstatus, sstatus, ptr->header, ptr->body, rsp);
//
// DataPack dataPack;
// dataPack.readPos = 0;
// dataPack.dataBuf = rsp.body;
//
// sstatus = TC_Common::tostr(rsp.status);
//
// const char* strstatus = ":status";
// nghttp2_nv *hdrs = new nghttp2_nv[rsp.header.size() + 1];
// hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
// hdrs[0].name = (uint8_t*)strstatus;
// hdrs[0].namelen = 7;
// hdrs[0].value = (uint8_t*)sstatus.c_str();
// hdrs[0].valuelen = sstatus.size();
// TC_Http::http_header_type::iterator it = rsp.header.begin();
// for (int n = 1; it != rsp.header.end(); n++, it++)
// {
// hdrs[n].flags = NGHTTP2_NV_FLAG_NONE;
// hdrs[n].name = (uint8_t*)it->first.c_str();
// hdrs[n].namelen = it->first.size();
// hdrs[n].value = (uint8_t*)it->second.c_str();
// hdrs[n].valuelen = it->second.size();
// }
//
// nghttp2_data_provider data_prd;
// data_prd.source.ptr = (void*)&dataPack;
// data_prd.read_callback = server::str_read_callback;
//
// {
// TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
//
// int ret = nghttp2_submit_response(_session, ptr->streamId, hdrs, rsp.header.size()+1, &data_prd);
// if (ret != 0)
// {
// cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl;
// }
// ;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret));
//
// while (nghttp2_session_want_write(_session)) {
// ret = nghttp2_session_send(_session);
// if (ret != 0)
// {
// cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl;
// }
// ;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret));
// }
// }
//
// vector<char> out;
// swap(out);
// response.insert(response.begin(), out.begin(), out.end());
////
//// {
//// TC_LockT<TC_ThreadMutex> lock(_responseBufLock);
//// response.insert(response.begin(), _responseBuf.begin(), _responseBuf.end());
//// _responseBuf.clear();
//// }
//
// delete [] hdrs;
// {
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// _mReq.erase(ptr->streamId);
// }
//
// }
//
// return 0;
//}
//
//int TC_Http2Server::getMethod(int32_t reqid, Req_Type &method)
//{
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it != _mReq.end())
// method = it->second.method;
// else
// return -1;
//
// return 0;
//}
//
//int TC_Http2Server::getUri(int32_t reqid, string &uri)
//{
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it != _mReq.end())
// uri = it->second.uri;
// else
// return -1;
//
// return 0;
//}
//
//int TC_Http2Server::getHeader(int32_t reqid, TC_Http::http_header_type &header)
//{
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it != _mReq.end())
// header = it->second.header;
// else
// return -1;
//
// return 0;
//}
//
//int TC_Http2Server::getBody(int32_t reqid, string &body)
//{
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it != _mReq.end())
// body = it->second.body;
// else
// return -1;
//
// return 0;
//}
////////////////////////////////////////////////////////////////////////////////////////