http2 sync and async succ, modify async interface

This commit is contained in:
jarodruan 2020-02-17 20:16:18 +08:00
parent 0107665f77
commit 96c0a8bbd1
10 changed files with 138 additions and 533 deletions

View File

@ -112,8 +112,6 @@ void asyncRpc2(int c)
int64_t t = TC_Common::now2us(); int64_t t = TC_Common::now2us();
std::map<std::string, std::string> header; std::map<std::string, std::string> header;
header[":path"] = "/";
header[":method"] = "POST";
header[":authority"] = "domain.com"; header[":authority"] = "domain.com";
header[":scheme"] = "http"; header[":scheme"] = "http";
@ -124,7 +122,7 @@ void asyncRpc2(int c)
try try
{ {
param.servant2Prx->http_call_async(header, "helloworld", p); param.servant2Prx->http_call_async("POST", "/", header, "helloworld", p);
} }
catch(exception& e) catch(exception& e)
{ {

View File

@ -36,33 +36,21 @@ void Http2Imp::destroy()
//destroy servant here: //destroy servant here:
//... //...
} }
//
//void doRequestFunc(const TC_Http2Server::Req_Type reqtype, const string &requri, const TC_Http::http_header_type &reqHeader, const string &reqBody, TC_Http2Server::Http2Response &rsp)
//{
// rsp.status = 200;
// rsp.about = "OK";
// rsp.body = "response helloworld 2";
//}
int Http2Imp::doRequest(TarsCurrentPtr current, vector<char> &buffer) int Http2Imp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
{ {
shared_ptr<TC_Http2Server> session = getHttp2(current->getUId()); shared_ptr<TC_Http2Server> session = getHttp2(current->getUId());
// cout << "doRequest:" << session << ", buffer size:" << current->getRequestBuffer().size() << endl; vector<shared_ptr<TC_Http2Server::Http2Context>> contexts = session->decodeRequest();
vector<TC_Http2Server::Http2Context> contexts;
session->decodeRequest(contexts);
// cout << "doRequest context size:" << contexts.size() << endl;
for(size_t i = 0; i< contexts.size(); ++i) for(size_t i = 0; i< contexts.size(); ++i)
{ {
TC_Http2Server::Http2Context & context = contexts[i]; shared_ptr<TC_Http2Server::Http2Context> context = contexts[i];
vector<char> data; vector<char> data;
context.response.setHeader("X-Header", "TARS"); context->response.setHeader("X-Header", "TARS");
context.response.setResponse(200, "OK", context.request.getContent()); context->response.setResponse(200, "OK", context->request.getContent());
int ret = session->encodeResponse(context, data); int ret = session->encodeResponse(context, data);
if(ret != 0) if(ret != 0)
@ -72,50 +60,11 @@ int Http2Imp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
buffer.insert(buffer.end(), data.begin(), data.end()); buffer.insert(buffer.end(), data.begin(), data.end());
} }
// cout << "doRequest buffer size:" << buffer.size() << endl;
// static bool flag = true;
// if(flag)
// {
// //method 1:
// vector<int32_t> vtReqid;
// TC_Http2Server::doRequest(current->getRequestBuffer(), vtReqid);
//
// // cout << "doRequest size:" << vtReqid.size() << endl;
//
// TC_Http2Server::Http2Response rsp;
// rsp.status = 200;
// rsp.about = "OK";
// rsp.body = "response helloworld 1";
//
// for(size_t i = 0; i < vtReqid.size(); i++)
// {
// string rbody;
// session->getBody(vtReqid[i], rbody);
//
//// cout << vtReqid[i] << ", " << rbody << endl;
//
// vector<char> data;
// session->doResponse(vtReqid[i], rsp, data);
// buffer.insert(buffer.end(), data.begin(), data.end());
//
//
// }
// }
// else
// {
// //method 2:
// session->doRequest(current->getRequestBuffer(), doRequestFunc, buffer);
// }
// flag = !flag;
return 0; return 0;
} }
int Http2Imp::doClose(TarsCurrentPtr current) int Http2Imp::doClose(TarsCurrentPtr current)
{ {
cout << "doClose" << endl;
delHttp2(current->getUId()); delHttp2(current->getUId());
return 0; return 0;

View File

@ -29,7 +29,6 @@ TC_NetWorkBuffer::PACKET_TYPE parseHttp2(TC_NetWorkBuffer&in, vector<char> &out)
if(sessionPtr == NULL) if(sessionPtr == NULL)
{ {
shared_ptr<TC_Http2Server> session(new TC_Http2Server()); shared_ptr<TC_Http2Server> session(new TC_Http2Server());
// in.setContextData(session, [=]{delete session;});
in.setContextData(session.get()); in.setContextData(session.get());
session->settings(3000); session->settings(3000);
@ -40,11 +39,7 @@ TC_NetWorkBuffer::PACKET_TYPE parseHttp2(TC_NetWorkBuffer&in, vector<char> &out)
sessionPtr = session.get(); sessionPtr = session.get();
} }
TC_NetWorkBuffer::PACKET_TYPE flag = sessionPtr->parse(in, out); return sessionPtr->parse(in, out);
// cout << "parseHttp2:" << session << ", out size:" << out.size() << endl;
return flag;
} }

View File

@ -169,8 +169,6 @@ static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id
// ENCODE function, called by network thread // ENCODE function, called by network thread
vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *trans) vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *trans)
{ {
// std::lock_guard<std::mutex> l(m);
TC_Http2Client* session = (TC_Http2Client*)trans->getSendBuffer()->getContextData(); TC_Http2Client* session = (TC_Http2Client*)trans->getSendBuffer()->getContextData();
if(session == NULL) if(session == NULL)
{ {
@ -208,7 +206,6 @@ vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr
data.read_callback = reqbody_read_callback; data.read_callback = reqbody_read_callback;
} }
// cout << "pData:" << pData << ", " << data.read_callback << endl;
int32_t sid = nghttp2_submit_request(session->session(), int32_t sid = nghttp2_submit_request(session->session(),
NULL, NULL,
nva.data(), nva.data(),
@ -230,15 +227,8 @@ vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr
TLOGERROR("[TARS]http2Request::Fatal error: nghttp2_session_send return: " << nghttp2_strerror(rv) << ", " << std::this_thread::get_id() << ", session:" << session << endl); TLOGERROR("[TARS]http2Request::Fatal error: nghttp2_session_send return: " << nghttp2_strerror(rv) << ", " << std::this_thread::get_id() << ", session:" << session << endl);
return vector<char>(); return vector<char>();
} }
// else
// {
// cout << "send" << endl;
// }
// cout << "nghttp2_session_send, id:" << request.iRequestId << ", buff size:" << session->_buffer().size() << endl;
vector<char> out; vector<char> out;
// out.swap(session->buffer());
session->swap(out); session->swap(out);
return out; return out;
@ -246,8 +236,6 @@ vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr
TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in, ResponsePacket& rsp) TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in, ResponsePacket& rsp)
{ {
// std::lock_guard<std::mutex> l(m);
TC_Http2Client* session = (TC_Http2Client*)((Transceiver*)(in.getConnection()))->getSendBuffer()->getContextData(); TC_Http2Client* session = (TC_Http2Client*)((Transceiver*)(in.getConnection()))->getSendBuffer()->getContextData();
if(session == NULL) if(session == NULL)
{ {
@ -273,7 +261,6 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in,
pair<const char*, size_t> buffer = in.getBufferPointer(); pair<const char*, size_t> buffer = in.getBufferPointer();
// cout << "size:" << buffer.second << endl;
int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.first, buffer.second); int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.first, buffer.second);
if (readlen < 0) if (readlen < 0)
{ {
@ -286,16 +273,13 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in,
if(session->doneResponses().empty()) if(session->doneResponses().empty())
{ {
// cout << "doneResponses empty" << endl;
return TC_NetWorkBuffer::PACKET_LESS; return TC_NetWorkBuffer::PACKET_LESS;
} }
it = session->doneResponses().begin(); it = session->doneResponses().begin();
rsp.iRequestId = it->second.streamId; rsp.iRequestId = it->first;
rsp.status = it->second.headers; it->second->getHeaders(rsp.status);
rsp.sBuffer.swap(it->second.body); rsp.sBuffer.assign(it->second->getContent().begin(), it->second->getContent().end());
// cout << "http2Response id:" << it->second.streamId << endl;
session->doneResponses().erase(it); session->doneResponses().erase(it);

View File

@ -893,7 +893,9 @@ void ServantProxy::http_call(const std::string& method,
msg = NULL; msg = NULL;
} }
void ServantProxy::http_call_async(const std::map<std::string, std::string>& headers, void ServantProxy::http_call_async(const std::string& method,
const std::string& uri,
const std::map<std::string, std::string>& headers,
const std::string& body, const std::string& body,
const HttpCallbackPtr &cb) const HttpCallbackPtr &cb)
{ {
@ -902,6 +904,8 @@ void ServantProxy::http_call_async(const std::map<std::string, std::string>& hea
msg->init(ReqMessage::ASYNC_CALL, NULL, ""); msg->init(ReqMessage::ASYNC_CALL, NULL, "");
msg->bFromRpc = true; msg->bFromRpc = true;
msg->request.sServantName = uri;
msg->request.sFuncName = method;
// 使用下面两个字段保存头部和包体 // 使用下面两个字段保存头部和包体
msg->request.context = headers; msg->request.context = headers;
msg->request.sBuffer.assign(body.begin(), body.end()); msg->request.sBuffer.assign(body.begin(), body.end());

View File

@ -644,7 +644,9 @@ public:
/** /**
* http2协议异步远程调用 * http2协议异步远程调用
*/ */
void http_call_async(const std::map<std::string, std::string>& headers, void http_call_async(const std::string& method,
const std::string& uri,
const std::map<std::string, std::string>& headers,
const std::string& body, const std::string& body,
const HttpCallbackPtr &cb); const HttpCallbackPtr &cb);

View File

@ -534,6 +534,23 @@ public:
*/ */
string &getContent() { return _content; } string &getContent() { return _content; }
/**
* append content
* @param append
* @param bUpdateContentLength
*/
void appendContent(const char *buff, size_t len, bool bUpdateContentLength = false)
{
_content.append(buff, len);
if(bUpdateContentLength)
{
eraseHeader("Content-Length");
if(_content.length() > 0)
setContentLength(_content.length());
}
}
/** /**
* @brief http body(content-length). * @brief http body(content-length).
* *
@ -572,12 +589,18 @@ public:
* *
* @return http_header_type& * @return http_header_type&
*/ */
const http_header_type& getHeaders() const{return _headers;} const http_header_type& getHeaders() const{return _headers;}
/**
* get headers
* @param header
*/
void getHeaders(map<string, string> &header);
/** /**
* @brief * @brief
*/ */
void reset(); void reset();
/** /**
* @brief . * @brief .

View File

@ -23,6 +23,9 @@ public:
*/ */
virtual ~TC_Http2(); virtual ~TC_Http2();
/**
* data pack
*/
struct DataPack struct DataPack
{ {
DataPack(const char *data, size_t length) : _dataBuf(data), _length(length) {} DataPack(const char *data, size_t length) : _dataBuf(data), _length(length) {}
@ -94,16 +97,24 @@ class TC_Http2Server : public TC_Http2
{ {
public: public:
/**
* constructor
*/
TC_Http2Server(); TC_Http2Server();
/**
* deconstructor
*/
~TC_Http2Server(); ~TC_Http2Server();
/**
* context
*/
struct Http2Context struct Http2Context
{ {
// Http2Context(int32_t id) : reqId(id) {} Http2Context(int32_t id) : reqId(id) {}
int32_t reqId; int32_t reqId;
bool bFinish = false;
TC_HttpRequest request; TC_HttpRequest request;
TC_HttpResponse response; TC_HttpResponse response;
}; };
@ -111,10 +122,10 @@ public:
/** /**
* parse all request * parse all request
* @param request * @param request
* @param unordered_map<int32_t, std::shared_ptr<TC_HttpRequest>> * @param vector<std::shared_ptr<TC_HttpRequest>>
* @return * @return
*/ */
void decodeRequest(vector<Http2Context> &contexts); vector<shared_ptr<Http2Context>> decodeRequest();
/** /**
* *
@ -123,7 +134,7 @@ public:
* @param out * @param out
* @return * @return
*/ */
int encodeResponse(const Http2Context &context, vector<char> &out); int encodeResponse(const shared_ptr<Http2Context> &context, vector<char> &out);
/** /**
* http2 * http2
@ -133,7 +144,6 @@ public:
*/ */
TC_NetWorkBuffer::PACKET_TYPE parse(TC_NetWorkBuffer&in, vector<char> &out); TC_NetWorkBuffer::PACKET_TYPE parse(TC_NetWorkBuffer&in, vector<char> &out);
void onHeaderCallback(int32_t streamId); void onHeaderCallback(int32_t streamId);
void onHeaderCallback(int32_t streamId, const string &skey, const string &svalue); void onHeaderCallback(int32_t streamId, const string &skey, const string &svalue);
void onFrameRecvCallback(int32_t streamId); void onFrameRecvCallback(int32_t streamId);
@ -142,28 +152,20 @@ public:
protected: protected:
Http2Context &getContext(int32_t streamId); shared_ptr<Http2Context> getContext(int32_t streamId);
void deleteContext(int32_t streamId); void deleteContext(int32_t streamId);
protected: protected:
// TC_SpinLock _contextLock; TC_ThreadMutex _nghttpLock;
// TC_ThreadMutex _contextLock;
// TC_SpinLock reqLock_; unordered_map<int32_t, shared_ptr<Http2Context>> _context;
// unordered_map<int32_t, RequestPack> _mReq;
// unordered_map<int32_t, RequestPack> _mReq; vector<shared_ptr<Http2Context>> _contextFinished;
// unordered_map<int32_t, std::shared_ptr<Http2Context>> _context;
unordered_map<int32_t, Http2Context> _context;
vector<Http2Context> _contextFinished;
vector<char> _reqout; vector<char> _reqout;
bool _bNewCon;
TC_ThreadMutex _nghttpLock;
}; };
///////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////
@ -172,72 +174,37 @@ class TC_Http2Client : public TC_Http2
{ {
public: public:
enum ResponseState /**
{ * constructor
ResponseNone, */
ResponseHeadersDone,
ResponseBodyDone,
};
struct Http2Response
{
int streamId;
std::map<std::string, std::string> headers;
vector<char> body;
ResponseState state;
void swap(Http2Response& other);
};
TC_Http2Client(); TC_Http2Client();
/**
* deconstructor
*/
~TC_Http2Client(); ~TC_Http2Client();
public:
// /**
// * @brief HTTP2握手+setting
// */
// int settings(unsigned int maxCurrentStreams = 2000);
// /**
// * @brief 当前缓冲区
// */
// vector<char>& sendBuffer() { return _sendBuf; }
//
// /**
// * @brief session
// */
// nghttp2_session* session() const { return _session; }
/** /**
* @brief response * @brief response
*/ */
std::unordered_map<int, Http2Response> &responses() { return _responses; } std::unordered_map<int, shared_ptr<TC_HttpResponse>> &responses() { return _responses; }
/** /**
* @brief response finished * @brief response finished
*/ */
std::unordered_map<int, Http2Response> &doneResponses() { return _doneResponses; } std::unordered_map<int, shared_ptr<TC_HttpResponse>> &doneResponses() { return _doneResponses; }
private: private:
// /**
// * session
// */
// nghttp2_session* _session;
//
// /**
// * 发送缓存区由send callback填充
// */
// vector<char> _sendBuf;
/** /**
* *
*/ */
std::unordered_map<int, Http2Response> _responses; std::unordered_map<int, shared_ptr<TC_HttpResponse>> _responses;
/** /**
* *
*/ */
std::unordered_map<int, Http2Response> _doneResponses; std::unordered_map<int, shared_ptr<TC_HttpResponse>> _doneResponses;
}; };
} }

View File

@ -708,6 +708,14 @@ void TC_Http::reset()
_bIsChunked = false; _bIsChunked = false;
} }
void TC_Http::getHeaders(map<string, string> &header)
{
for(auto it = _headers.begin(); it != _headers.end(); ++it)
{
header.insert(map<string, string>::value_type(it->first, it->second));
}
}
/********************* TC_HttpCookie ***********************/ /********************* TC_HttpCookie ***********************/
bool TC_HttpCookie::matchDomain(const string &sCookieDomain, const string &sDomain) bool TC_HttpCookie::matchDomain(const string &sCookieDomain, const string &sDomain)

View File

@ -168,95 +168,63 @@ static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
void TC_Http2Server::onHeaderCallback(int streamId) void TC_Http2Server::onHeaderCallback(int streamId)
{ {
// TC_LockT<TC_ThreadMutex> lock(_nghttpLock); _context[streamId] = std::make_shared<Http2Context>(streamId);
_context[streamId].reqId = streamId;
} }
void TC_Http2Server::onHeaderCallback(int32_t streamId, const string &skey, const string &svalue) void TC_Http2Server::onHeaderCallback(int32_t streamId, const string &skey, const string &svalue)
{ {
TC_Http2Server::Http2Context & context = getContext(streamId); shared_ptr<Http2Context> context = getContext(streamId);
if (TC_Port::strcasecmp(skey.c_str(), ":method") == 0) if (TC_Port::strcasecmp(skey.c_str(), ":method") == 0)
{ {
context.request.setMethod(svalue.c_str()); context->request.setMethod(svalue.c_str());
} }
else if (TC_Port::strcasecmp(skey.c_str(), ":path") == 0) else if (TC_Port::strcasecmp(skey.c_str(), ":path") == 0)
{ {
context.request.setPath(svalue.c_str()); context->request.setPath(svalue.c_str());
} }
else if (TC_Port::strcasecmp(skey.c_str(), ":scheme") == 0) else if (TC_Port::strcasecmp(skey.c_str(), ":scheme") == 0)
{ {
context.request.setScheme(svalue.c_str()); context->request.setScheme(svalue.c_str());
} }
else if (TC_Port::strcasecmp(skey.c_str(), ":authority") == 0) else if (TC_Port::strcasecmp(skey.c_str(), ":authority") == 0)
{ {
context.request.setDomain(svalue.c_str()); context->request.setDomain(svalue.c_str());
} }
context.request.setHeader(skey, svalue); context->request.setHeader(skey, svalue);
} }
void TC_Http2Server::onFrameRecvCallback(int32_t streamId) void TC_Http2Server::onFrameRecvCallback(int32_t streamId)
{ {
TC_Http2Server::Http2Context& context = getContext(streamId); shared_ptr<Http2Context> context = getContext(streamId);
// TC_LockT<TC_ThreadMutex> lock(reqLock_); if(context->request.getHeaders().find(":method") != context->request.getHeaders().end() ||
// auto it = _mReq.find(streamId); context->request.getHeaders().find(":path") != context->request.getHeaders().end() ||
// if (it != _mReq.end()) context->request.getHeaders().find(":scheme") != context->request.getHeaders().end())
// {
// it->second.bFinish = true;
if(context.request.getHeaders().find(":method") != context.request.getHeaders().end() ||
context.request.getHeaders().find(":path") != context.request.getHeaders().end() ||
context.request.getHeaders().find(":scheme") != context.request.getHeaders().end())
{ {
context.bFinish = true;
_contextFinished.push_back(context); _contextFinished.push_back(context);
_reqout.insert(_reqout.end(), (char*)&streamId, (char*)&streamId + sizeof(int32_t)); _reqout.insert(_reqout.end(), (char*)&streamId, (char*)&streamId + sizeof(int32_t));
deleteContext(streamId); deleteContext(streamId);
} }
// }
} }
void TC_Http2Server::onDataChunkRecvCallback(int32_t streamId, const char *data, size_t len) void TC_Http2Server::onDataChunkRecvCallback(int32_t streamId, const char *data, size_t len)
{ {
TC_Http2Server::Http2Context &context = getContext(streamId); shared_ptr<Http2Context> context = getContext(streamId);
context.request.getContent().append(data, len); context->request.getContent().append(data, len);
//
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(streamId);
// if (it != _mReq.end())
// {
// it->second.body.append(data, len);
// }
} }
void TC_Http2Server::onStreamCloseCallback(int32_t streamId) void TC_Http2Server::onStreamCloseCallback(int32_t streamId)
{ {
// TC_Http2Server::Http2Context & context = getContext(streamId);
// {
deleteContext(streamId); deleteContext(streamId);
// }
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(streamId);
// if (it != _mReq.end())
// {
// if (it->second.bFinish != true)
// {
// _mReq.erase(streamId);
// }
// }
} }
TC_Http2Server::TC_Http2Server()
TC_Http2Server::TC_Http2Server(): _bNewCon(true)
{ {
nghttp2_session_callbacks *callbacks; nghttp2_session_callbacks *callbacks;
@ -285,21 +253,11 @@ TC_Http2Server::~TC_Http2Server()
TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector<char> &out) TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector<char> &out)
{ {
// cout << "parse:" << in.getBufferLength() << endl;
// if(_bNewCon)
// {
// _bNewCon = false;
//
// }
in.mergeBuffers(); in.mergeBuffers();
auto buff = in.getBufferPointer(); auto buff = in.getBufferPointer();
// int readlen; TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
TC_LockT<TC_ThreadMutex> lock2(_nghttpLock);
int readlen = nghttp2_session_mem_recv(_session, (uint8_t *) buff.first, buff.second); int readlen = nghttp2_session_mem_recv(_session, (uint8_t *) buff.first, buff.second);
@ -311,8 +269,6 @@ TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector<
{ {
in.moveHeader(readlen); in.moveHeader(readlen);
// TC_LockT<TC_ThreadMutex> lock1(reqLock_);
if (_reqout.empty()) if (_reqout.empty())
{ {
return TC_NetWorkBuffer::PACKET_LESS; return TC_NetWorkBuffer::PACKET_LESS;
@ -325,16 +281,8 @@ TC_NetWorkBuffer::PACKET_TYPE TC_Http2Server::parse(TC_NetWorkBuffer&in, vector<
return TC_NetWorkBuffer::PACKET_FULL; return TC_NetWorkBuffer::PACKET_FULL;
} }
//void TC_Http2Server::createReq(int32_t streamId) shared_ptr<TC_Http2Server::Http2Context> TC_Http2Server::getContext(int32_t streamId)
//{
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
//
// _context[streamId] = std::make_shared<TC_HttpRequest>();
//}
TC_Http2Server::Http2Context& TC_Http2Server::getContext(int32_t streamId)
{ {
// TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
auto it = _context.find(streamId); auto it = _context.find(streamId);
if (it != _context.end()) if (it != _context.end())
{ {
@ -342,62 +290,40 @@ TC_Http2Server::Http2Context& TC_Http2Server::getContext(int32_t streamId)
} }
assert(false); assert(false);
return NULL;
} }
void TC_Http2Server::deleteContext(int32_t streamId) void TC_Http2Server::deleteContext(int32_t streamId)
{ {
// TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
_context.erase(streamId); _context.erase(streamId);
} }
void TC_Http2Server::decodeRequest(vector<Http2Context> &contexts) vector<shared_ptr<TC_Http2Server::Http2Context>> TC_Http2Server::decodeRequest()
{ {
vector<shared_ptr<TC_Http2Server::Http2Context>> contexts;
TC_LockT<TC_ThreadMutex> lock(_nghttpLock); TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
_contextFinished.swap(contexts); _contextFinished.swap(contexts);
// return context; return contexts;
} }
//
//vector<int32_t> TC_Http2Server::parseReqId(const vector<char> &request)
//{
// vector<int32_t> vtReqid;
// for (unsigned int i = 0; i < request.size(); i += sizeof(int32_t))
// {
// int32_t reqId;
// memcpy(&reqId, &request[i], sizeof(int32_t));
// vtReqid.push_back(reqId);
// }
//
// return vtReqid;
//}
int TC_Http2Server::encodeResponse(const TC_Http2Server::Http2Context &context, vector<char> &out) int TC_Http2Server::encodeResponse(const shared_ptr<TC_Http2Server::Http2Context> &context, vector<char> &out)
{ {
// { string sstatus = TC_Common::tostr(context->response.getStatus());
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it == _mReq.end())
// return -1;
// }
// deleteContext(context->reqId);
string sstatus = TC_Common::tostr(context.response.getStatus());
const char* strstatus = ":status"; const char* strstatus = ":status";
nghttp2_nv *hdrs = new nghttp2_nv[context.response.getHeaders().size() + 1]; nghttp2_nv *hdrs = new nghttp2_nv[context->response.getHeaders().size() + 1];
hdrs[0].flags = NGHTTP2_NV_FLAG_NONE; hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[0].name = (uint8_t*)strstatus; hdrs[0].name = (uint8_t*)strstatus;
hdrs[0].namelen = 7; hdrs[0].namelen = 7;
hdrs[0].value = (uint8_t*)sstatus.c_str(); hdrs[0].value = (uint8_t*)sstatus.c_str();
hdrs[0].valuelen = sstatus.size(); hdrs[0].valuelen = sstatus.size();
TC_Http::http_header_type::const_iterator it = context.response.getHeaders().begin(); TC_Http::http_header_type::const_iterator it = context->response.getHeaders().begin();
for (int n = 1; it != context.response.getHeaders().end(); n++, it++) for (int n = 1; it != context->response.getHeaders().end(); n++, it++)
{ {
hdrs[n].flags = NGHTTP2_NV_FLAG_NONE; hdrs[n].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[n].name = (uint8_t*)it->first.c_str(); hdrs[n].name = (uint8_t*)it->first.c_str();
@ -406,11 +332,7 @@ int TC_Http2Server::encodeResponse(const TC_Http2Server::Http2Context &context,
hdrs[n].valuelen = it->second.size(); hdrs[n].valuelen = it->second.size();
} }
const string &body = context.response.getContent(); DataPack dataPack(context->response.getContent().c_str(), context->response.getContent().size());
DataPack dataPack(body.c_str(), body.size());
// dataPack.readPos = 0;
// dataPack.dataBuf = response.body;
nghttp2_data_provider data_prd; nghttp2_data_provider data_prd;
data_prd.source.ptr = (void*)&dataPack; data_prd.source.ptr = (void*)&dataPack;
@ -418,7 +340,7 @@ int TC_Http2Server::encodeResponse(const TC_Http2Server::Http2Context &context,
{ {
TC_LockT<TC_ThreadMutex> lock(_nghttpLock); TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
_err = nghttp2_submit_response(_session, context.reqId, hdrs, context.response.getHeaders().size()+1, &data_prd); _err = nghttp2_submit_response(_session, context->reqId, hdrs, context->response.getHeaders().size()+1, &data_prd);
if (_err != 0 ) { if (_err != 0 ) {
delete [] hdrs; delete [] hdrs;
@ -440,267 +362,20 @@ int TC_Http2Server::encodeResponse(const TC_Http2Server::Http2Context &context,
delete [] hdrs; delete [] hdrs;
return 0; return 0;
// {
// TC_LockT<TC_ThreadMutex> lock(_responseBufLock);
// out.swap(_responseBuf);
// _responseBuf.clear();
// }
//
// {
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// _mReq.erase(reqid);
// }
} }
//
//int TC_Http2Server::doRequest(const vector<char> &request, vector<int32_t>& vtReqid)
//{
// vtReqid.clear();
//
// for (unsigned int i = 0; i < request.size(); i += sizeof(int32_t))
// {
//// RequestPack *ptr;
//// memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Server::RequestPack *));
//
// int32_t reqId;
// memcpy(&reqId, &request[i], sizeof(int32_t));
// vtReqid.push_back(reqId);
// }
//
// return 0;
//}
//
//int TC_Http2Server::doResponse(int32_t reqid, const Http2Response &response, vector<char>& out)
//{
// {
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it == _mReq.end())
// return -1;
// }
// string sstatus = TC_Common::tostr(response.status);
//
// const char* strstatus = ":status";
// nghttp2_nv *hdrs = new nghttp2_nv[response.header.size() + 1];
// hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
// hdrs[0].name = (uint8_t*)strstatus;
// hdrs[0].namelen = 7;
// hdrs[0].value = (uint8_t*)sstatus.c_str();
// hdrs[0].valuelen = sstatus.size();
// TC_Http::http_header_type::const_iterator it = response.header.begin();
// for (int n = 1; it != response.header.end(); n++, it++)
// {
// hdrs[n].flags = NGHTTP2_NV_FLAG_NONE;
// hdrs[n].name = (uint8_t*)it->first.c_str();
// hdrs[n].namelen = it->first.size();
// hdrs[n].value = (uint8_t*)it->second.c_str();
// hdrs[n].valuelen = it->second.size();
// }
//
// DataPack dataPack;
// dataPack.readPos = 0;
// dataPack.dataBuf = response.body;
//
// nghttp2_data_provider data_prd;
// data_prd.source.ptr = (void*)&dataPack;
// data_prd.read_callback = server::str_read_callback;
// int ret ;
// {
// TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
//
// ret = nghttp2_submit_response(_session, reqid, hdrs, response.header.size()+1, &data_prd);
// if (ret != 0) {
// cout << "nghttp2_submit_response error" << endl;
// return -1;
// }
//
// while (nghttp2_session_want_write(_session)) {
// ret = nghttp2_session_send(_session);
// if (ret != 0) {
// cout << "nghttp2_session_send error" << endl;
// return -1;
// }
// }
// }
//
// delete [] hdrs;
//
// this->swap(out);
//// {
//// TC_LockT<TC_ThreadMutex> lock(_responseBufLock);
//// out.swap(_responseBuf);
//// _responseBuf.clear();
//// }
//
// {
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// _mReq.erase(reqid);
// }
//
// return 0;
//}
//
//int TC_Http2Server::doRequest(const vector<char> &request, TC_Http2Server::RequestFunc requestFunc, vector<char>& response)
//{
// for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Server::RequestPack *))
// {
// Http2Response rsp;
//
// RequestPack *ptr;
// memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Server::RequestPack *));
//
// Req_Type qstatus;
//
// const char* sMethod = ptr->header.find(":method")->second.c_str(); //TC_Common::upper(TC_Common::trim(ptr->header.find(":method")->second));
// if (TC_Port::strcasecmp(sMethod, "GET") == 0)
// qstatus = REQUEST_GET;
// else if (TC_Port::strcasecmp(sMethod, "POST") == 0)
// qstatus = REQUEST_POST;
// else if (TC_Port::strcasecmp(sMethod, "OPTIONS") == 0)
// qstatus = REQUEST_OPTIONS;
// else if (TC_Port::strcasecmp(sMethod, "HEAD") == 0)
// qstatus = REQUEST_HEAD;
// else if (TC_Port::strcasecmp(sMethod, "PUT") == 0)
// qstatus = REQUEST_PUT;
// else if (TC_Port::strcasecmp(sMethod, "DELETE") == 0)
// qstatus = REQUEST_DELETE;
// else
// {
// continue;
// }
// string sstatus = ptr->header.find(":path")->second;
//
// requestFunc(qstatus, sstatus, ptr->header, ptr->body, rsp);
//
// DataPack dataPack;
// dataPack.readPos = 0;
// dataPack.dataBuf = rsp.body;
//
// sstatus = TC_Common::tostr(rsp.status);
//
// const char* strstatus = ":status";
// nghttp2_nv *hdrs = new nghttp2_nv[rsp.header.size() + 1];
// hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
// hdrs[0].name = (uint8_t*)strstatus;
// hdrs[0].namelen = 7;
// hdrs[0].value = (uint8_t*)sstatus.c_str();
// hdrs[0].valuelen = sstatus.size();
// TC_Http::http_header_type::iterator it = rsp.header.begin();
// for (int n = 1; it != rsp.header.end(); n++, it++)
// {
// hdrs[n].flags = NGHTTP2_NV_FLAG_NONE;
// hdrs[n].name = (uint8_t*)it->first.c_str();
// hdrs[n].namelen = it->first.size();
// hdrs[n].value = (uint8_t*)it->second.c_str();
// hdrs[n].valuelen = it->second.size();
// }
//
// nghttp2_data_provider data_prd;
// data_prd.source.ptr = (void*)&dataPack;
// data_prd.read_callback = server::str_read_callback;
//
// {
// TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
//
// int ret = nghttp2_submit_response(_session, ptr->streamId, hdrs, rsp.header.size()+1, &data_prd);
// if (ret != 0)
// {
// cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl;
// }
// ;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret));
//
// while (nghttp2_session_want_write(_session)) {
// ret = nghttp2_session_send(_session);
// if (ret != 0)
// {
// cout << "nghttp2_submit_response error:" << nghttp2_strerror(ret) << endl;
// }
// ;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret));
// }
// }
//
// vector<char> out;
// swap(out);
// response.insert(response.begin(), out.begin(), out.end());
////
//// {
//// TC_LockT<TC_ThreadMutex> lock(_responseBufLock);
//// response.insert(response.begin(), _responseBuf.begin(), _responseBuf.end());
//// _responseBuf.clear();
//// }
//
// delete [] hdrs;
// {
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// _mReq.erase(ptr->streamId);
// }
//
// }
//
// return 0;
//}
//
//int TC_Http2Server::getMethod(int32_t reqid, Req_Type &method)
//{
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it != _mReq.end())
// method = it->second.method;
// else
// return -1;
//
// return 0;
//}
//
//int TC_Http2Server::getUri(int32_t reqid, string &uri)
//{
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it != _mReq.end())
// uri = it->second.uri;
// else
// return -1;
//
// return 0;
//}
//
//int TC_Http2Server::getHeader(int32_t reqid, TC_Http::http_header_type &header)
//{
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it != _mReq.end())
// header = it->second.header;
// else
// return -1;
//
// return 0;
//}
//
//int TC_Http2Server::getBody(int32_t reqid, string &body)
//{
// TC_LockT<TC_ThreadMutex> lock(reqLock_);
// auto it = _mReq.find(reqid);
// if (it != _mReq.end())
// body = it->second.body;
// else
// return -1;
//
// return 0;
//}
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
//
void TC_Http2Client::Http2Response::swap(Http2Response& other) //void TC_Http2Client::Http2Response::swap(Http2Response& other)
{ //{
if (this == &other) // if (this == &other)
return; // return;
//
std::swap(streamId, other.streamId); // std::swap(streamId, other.streamId);
headers.swap(other.headers); // headers.swap(other.headers);
body.swap(other.body); // body.swap(other.body);
std::swap(state, other.state); // std::swap(state, other.state);
} //}
/////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////
@ -722,10 +397,10 @@ static int on_begin_headers_callback(nghttp2_session* session, const nghttp2_fra
{ {
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{ {
TC_Http2Client::Http2Response rsp; // TC_HttpResponse rsp;
rsp.streamId = frame->hd.stream_id; // rsp.streamId = frame->hd.stream_id;
rsp.state = TC_Http2Client::ResponseNone; // rsp.state = TC_Http2Client::ResponseNone;
nghttp2->responses()[rsp.streamId] = rsp; nghttp2->responses()[frame->hd.stream_id] = std::make_shared<TC_HttpResponse>();
} }
} }
@ -749,7 +424,7 @@ static int on_header_callback(nghttp2_session* session, const nghttp2_frame* fra
std::string n((const char*)name, namelen); std::string n((const char*)name, namelen);
std::string v((const char*)value, valuelen); std::string v((const char*)value, valuelen);
it->second.headers.insert(std::make_pair(n, v)); it->second->setHeader(n, v);
return 0; return 0;
} }
@ -773,7 +448,8 @@ static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame*
case NGHTTP2_HEADERS: case NGHTTP2_HEADERS:
if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS) if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS)
{ {
it->second.state = TC_Http2Client::ResponseHeadersDone; ;
// it->second.state = TC_Http2Client::ResponseHeadersDone;
} }
return 0; return 0;
@ -794,7 +470,7 @@ static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags,
return NGHTTP2_ERR_CALLBACK_FAILURE; return NGHTTP2_ERR_CALLBACK_FAILURE;
} }
it->second.body.insert(it->second.body.end(), (const char* )data, (const char* )data + len); it->second->appendContent((const char* )data, len);
return 0; return 0;
} }
@ -808,10 +484,10 @@ static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id,
return NGHTTP2_ERR_CALLBACK_FAILURE; return NGHTTP2_ERR_CALLBACK_FAILURE;
} }
it->second.state = TC_Http2Client::ResponseBodyDone; // it->second.state = TC_Http2Client::ResponseBodyDone;
nghttp2->doneResponses()[stream_id].swap(it->second); nghttp2->doneResponses()[stream_id] = it->second;
nghttp2->responses().erase(it); nghttp2->responses().erase(it);
return 0; return 0;
} }
@ -836,7 +512,6 @@ TC_Http2Client::TC_Http2Client()
TC_Http2Client::~TC_Http2Client() TC_Http2Client::~TC_Http2Client()
{ {
// nghttp2_session_del(_session);
} }
// void TC_Http2Client::onNegotiateDone(bool succ) // void TC_Http2Client::onNegotiateDone(bool succ)