update tc_http2, remove tc_http2clientmgr tc_http2session tc_nghttp2

This commit is contained in:
jarodruan 2020-02-10 12:08:02 +08:00
parent 642c9228d7
commit 86a350f80d
13 changed files with 54 additions and 1342 deletions

View File

@ -15,14 +15,13 @@
*/
#include "Http2Imp.h"
#include "util/tc_http2session.h"
#include "servant/Application.h"
using namespace std;
TC_SpinLock Http2Imp::_mutex;
unordered_map<int32_t, TC_Http2Session*> Http2Imp::_http2Sessions;
unordered_map<int32_t, TC_Http2Server*> Http2Imp::_http2;
//////////////////////////////////////////////////////
void Http2Imp::initialize()
@ -51,13 +50,13 @@ int Http2Imp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
{
vector<int32_t> vtReqid;
TC_Http2Session::doRequest(current->getRequestBuffer(), vtReqid);
TC_Http2Server::doRequest(current->getRequestBuffer(), vtReqid);
cout << "doRequest:" << TC_Common::tostr(vtReqid.begin(), vtReqid.end(), ", ") << endl;
TC_Http2Session* session = getHttp2Session(current->getUId());
TC_Http2Server* session = getHttp2(current->getUId());
TC_Http2Session::Http2Response rsp;
TC_Http2Server::Http2Response rsp;
rsp.status = 200;
rsp.about = "OK";
rsp.body = "response helloworld";
@ -73,7 +72,7 @@ int Http2Imp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
int Http2Imp::doClose(TarsCurrentPtr current)
{
delHttp2Session(current->getUId());
delHttp2(current->getUId());
return 0;
}

View File

@ -19,7 +19,7 @@
#include "servant/Application.h"
#include "util/tc_spin_lock.h"
#include "util/tc_http2session.h"
#include "util/tc_http2.h"
/**
*
@ -53,37 +53,35 @@ public:
*/
int doClose(TarsCurrentPtr current);
static TC_Http2Session *getHttp2Session(uint32_t uid)
static TC_Http2Server *getHttp2(uint32_t uid)
{
TC_LockT<TC_SpinLock> lock(_mutex);
auto it = _http2Sessions.find(uid);
auto it = _http2.find(uid);
if(it != _http2Sessions.end())
if(it != _http2.end())
{
return it->second;
}
return NULL;
}
static void addHttp2Session(uint32_t uid, TC_Http2Session* ptr)
static void addHttp2(uint32_t uid, TC_Http2Server* ptr)
{
TC_LockT<TC_SpinLock> lock(_mutex);
_http2Sessions[uid] = ptr;
_http2[uid] = ptr;
}
static void delHttp2Session(uint32_t uid)
static void delHttp2(uint32_t uid)
{
TC_LockT<TC_SpinLock> lock(_mutex);
auto it = _http2Sessions.find(uid);
auto it = _http2.find(uid);
if(it != _http2Sessions.end())
if(it != _http2.end())
{
delete it->second;
_http2Sessions.erase(it);
_http2.erase(it);
}
}
@ -91,7 +89,7 @@ protected:
static TC_SpinLock _mutex;
static unordered_map<int32_t, TC_Http2Session*> _http2Sessions;
static unordered_map<int32_t, TC_Http2Server*> _http2;
};
/////////////////////////////////////////////////////
#endif

View File

@ -17,7 +17,7 @@
#include "HttpServer.h"
#include "HttpImp.h"
#include "Http2Imp.h"
#include "util/tc_http2session.h"
#include "util/tc_http2.h"
using namespace std;
@ -25,19 +25,17 @@ HttpServer g_app;
TC_NetWorkBuffer::PACKET_TYPE parseHttp2(TC_NetWorkBuffer&in, vector<char> &out)
{
TC_Http2Session *session = (TC_Http2Session*)(in.getContextData());
TC_Http2Server*session = (TC_Http2Server*)(in.getContextData());
if(session == NULL)
{
session = new TC_Http2Session();
in.setContextData(session);
session = new TC_Http2Server();
in.setContextData(session, [=]{delete session;});
TC_EpollServer::Connection *connection = (TC_EpollServer::Connection *)in.getConnection();
Http2Imp::addHttp2Session(connection->getId(), session);
Http2Imp::addHttp2(connection->getId(), session);
}
cout << "parseHttp2:" << in.getBufferLength() << ", " << session << endl;
return session->parse(in, out);
}

View File

@ -24,7 +24,7 @@
#include "tup/tup.h"
#include "servant/StatF.h"
#include "servant/StatReport.h"
#include "util/tc_nghttp2.h"
#include "util/tc_http2.h"
// #include "util/tc_http2clientmgr.h"
#ifdef _USE_OPENTRACKING
#include "servant/text_map_carrier.h"

View File

@ -24,7 +24,7 @@
#include <iostream>
#if TARS_HTTP2
#include "util/tc_nghttp2.h"
#include "util/tc_http2.h"
// #include "util/tc_http2clientmgr.h"
#endif
@ -163,80 +163,21 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http1Response(TC_NetWorkBuffer &in,
return TC_NetWorkBuffer::PACKET_LESS;
}
// vector<char> encodeHttp2(RequestPacket& request, TC_NgHttp2* session)
// {
// std::vector<nghttp2_nv> nva;
// const std::string method(":method");
// nghttp2_nv nv1 = MAKE_STRING_NV(method, request.sFuncName);
// if (!request.sFuncName.empty())
// nva.push_back(nv1);
// const std::string path(":path");
// nghttp2_nv nv2 = MAKE_STRING_NV(path, request.sServantName);
// if (!request.sServantName.empty())
// nva.push_back(nv2);
// for (std::map<std::string, std::string>::const_iterator
// it(request.context.begin());
// it != request.context.end();
// ++ it)
// {
// nghttp2_nv nv = MAKE_STRING_NV(it->first, it->second);
// nva.push_back(nv);
// }
// nghttp2_data_provider* pData = NULL;
// nghttp2_data_provider data;
// if (!request.sBuffer.empty())
// {
// pData = &data;
// data.source.ptr = (void*)&request.sBuffer;
// data.read_callback = reqbody_read_callback;
// }
// int32_t sid = nghttp2_submit_request(session->session(),
// NULL,
// &nva[0],
// nva.size(),
// pData,
// NULL);
// if (sid < 0)
// {
// TLOGERROR("encodeHttp2::Fatal error: nghttp2_submit_request return: " << sid << endl);
// return vector<char>();
// }
// request.iRequestId = sid;
// nghttp2_session_send(session->session());
// // 交给tars发送
// // std::string out;
// // out.swap(session->sendBuffer());
// // return out;
// vector<char> out;
// out.assign(session->sendBuffer().begin(), session->sendBuffer().end());
// return out;
// }
// ENCODE function, called by network thread
vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *trans)
{
// TC_NgHttp2* session = Http2ClientSessionManager::getInstance()->getSession(request.iRequestId);
TC_NgHttp2* session = trans->getHttp2Session();
TC_Http2Client* session = trans->getHttp2Client();
assert(session != NULL);
if (session->getState() == TC_NgHttp2::None)
{
session->Init();
session->settings();
}
// if (session->getState() == TC_NgHttp2::None)
// {
// session->Init();
// session->settings();
// }
assert (session->getState() == TC_NgHttp2::Http2);
// assert (session->getState() == TC_NgHttp2::Http2);
// return encodeHttp2(request, session);
std::vector<nghttp2_nv> nva;
@ -282,25 +223,25 @@ vector<char> ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr
nghttp2_session_send(session->session());
// 交给tars发送
// std::string out;
// out.swap(session->sendBuffer());
// return out;
vector<char> out;
out.assign(session->sendBuffer().begin(), session->sendBuffer().end());
session->sendBuffer().clear();
cout << "iRequestId:" << request.iRequestId << ", size:" << out.size() << endl;
out.swap(session->sendBuffer());
return out;
// vector<char> out;
// out.assign(session->sendBuffer().begin(), session->sendBuffer().end());
// session->sendBuffer().clear();
// cout << "iRequestId:" << request.iRequestId << ", size:" << out.size() << endl;
// return out;
}
// TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, list<ResponsePacket>& done, void* userptr)
TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in, ResponsePacket& rsp)
{
cout << "http2Response1:" << in.getBufferLength() << endl;
TC_NgHttp2* session = ((Transceiver*)(in.getConnection()))->getHttp2Session();
TC_Http2Client* session = ((Transceiver*)(in.getConnection()))->getHttp2Client();
// assert (session->getState() == TC_NgHttp2::Http2);
@ -339,19 +280,6 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in,
cout << "http2Response2 size:" << session->doneResponses().size() << ", iRequestId:" << rsp.iRequestId << endl;
// std::map<int, Http2Response>::const_iterator it(session->_doneResponses.begin());
// for (; it != session->_doneResponses.end(); ++ it)
// {
// ResponsePacket rsp;
// rsp.iRequestId = it->second.streamId;
// rsp.status = it->second.headers;
// rsp.sBuffer.assign(it->second.body.begin(), it->second.body.end());
// done.push_back(rsp);
// }
// session->_doneResponses.clear();
return TC_NetWorkBuffer::PACKET_FULL;
}

View File

@ -27,8 +27,7 @@
#endif
#if TARS_HTTP2
#include "util/tc_nghttp2.h"
// #include "util/tc_http2clientmgr.h"
#include "util/tc_http2.h"
#endif
namespace tars
{
@ -277,11 +276,10 @@ void Transceiver::close()
#endif
#if TARS_HTTP2
// Http2ClientSessionManager::getInstance()->delSession(_adapterProxy->getId());
if(_http2Session)
if(_http2Client)
{
nghttp2_session_del(_http2Session->session());
_http2Session = NULL;
delete _http2Client;
_http2Client = NULL;
}
#endif
@ -307,20 +305,16 @@ void Transceiver::close()
}
#if TARS_HTTP2
TC_NgHttp2* Transceiver::getHttp2Session()
TC_Http2Client* Transceiver::getHttp2Client()
{
if(_http2Session == NULL)
if(_http2Client == NULL)
{
_http2Session = new TC_NgHttp2(false);
// if (_http2Session->getState() == TC_NgHttp2::None)
// {
// _http2Session->Init();
// _http2Session->settings();
// }
_http2Client = new TC_Http2Client();
// _http2Client->Init();
_http2Client->settings();
}
return _http2Session;
return _http2Client;
}
#endif

View File

@ -34,7 +34,7 @@ namespace tars
#endif
#if TARS_HTTP2
class TC_NgHttp2;
class TC_Http2Client;
#endif
class AdapterProxy;
@ -227,7 +227,7 @@ public:
bool sendAuthData(const BasicAuthInfo& );
#if TARS_HTTP2
TC_NgHttp2* getHttp2Session();
TC_Http2Client* getHttp2Client();
#endif
protected:
/**
@ -281,7 +281,7 @@ protected:
#endif
#if TARS_HTTP2
TC_NgHttp2* _http2Session = NULL;
TC_Http2Client* _http2Client = NULL;
#endif
/*
* buffer

View File

@ -1,58 +0,0 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#ifndef __TC_HTTP2CLIENTMANAGER_H
#define __TC_HTTP2CLIENTMANAGER_H
#if TARS_HTTP2
#include <map>
#include "util/tc_singleton.h"
namespace tars
{
/////////////////////////////////////////////////
/**
*@file tc_http2clientmgr.h
*@brief http2客户端session集合
*/
/////////////////////////////////////////////////
class TC_NgHttp2;
class Http2ClientSessionManager : public TC_Singleton<Http2ClientSessionManager>
{
public:
Http2ClientSessionManager();
~Http2ClientSessionManager();
TC_NgHttp2* getSession(int id, bool isServer = false);
bool delSession(int id);
private:
typedef std::map<int, TC_NgHttp2*> SESSION_MAP;
SESSION_MAP _sessions;
};
} // end namespace tars
#endif // end #if TARS_SSL
#endif

View File

@ -1,143 +0,0 @@
#ifndef __TC_HTTP2_H__
#define __TC_HTTP2_H__
#if TARS_HTTP2
#include "util/tc_thread.h"
#include "util/tc_autoptr.h"
#include "util/tc_http.h"
#include "util/tc_network_buffer.h"
#include "util/tc_spin_lock.h"
#include "nghttp2/nghttp2.h"
namespace tars
{
typedef enum
{
REQUEST_GET,
REQUEST_POST,
REQUEST_OPTIONS,
REQUEST_HEAD,
REQUEST_PUT,
REQUEST_DELETE
}Req_Type;
typedef int (*ResponseFunc)(const Req_Type reqtype,
const string &requri,
const TC_Http::http_header_type &reqHeader,
const string &reqBody,
int &resopnseStatus,
string &resopnseAbout,
TC_Http::http_header_type &responseHeader,
string &responseBody);
class TC_Http2Session: public TC_HandleBase
{
public:
TC_Http2Session();
~TC_Http2Session();
struct Http2Response
{
int status;
string about;
TC_Http::http_header_type header;
string body;
};
/**
* get all http2 request id
* @param in
* @param out
* @return
*/
static int doRequest(const vector<char> &request, vector<int32_t>& vtReqid);
/**
* http2
* @param in
* @param out
* @return
*/
TC_NetWorkBuffer::PACKET_TYPE parse(TC_NetWorkBuffer&in, vector<char> &out);
int doResponse(int32_t reqid, const Http2Response &response, vector<char>& out);
int doRequest(const vector<char> &request, vector<char>& response);
void setResponseFunc(ResponseFunc func) { responseFunc_ = func; }
int getMethod(int32_t reqid, Req_Type &method);
int getUri(int32_t reqid, string &uri);
int getHeader(int32_t reqid, TC_Http::http_header_type &header);
int getBody(int32_t reqid, string &body);
struct RequestPack
{
RequestPack():streamId(0), bFinish(false){}
Req_Type method;
string uri;
TC_Http::http_header_type header;
string body;
int32_t streamId;
bool bFinish;
};
struct DataPack
{
DataPack(){}
DataPack(string &data, int pos):dataBuf(data), readPos(pos){}
string dataBuf;
unsigned int readPos;
};
TC_SpinLock responseBufLock_;
string responseBuf_;
TC_SpinLock reqLock_;
map<int32_t, RequestPack> mReq_;
vector<char> reqout_;
protected:
private:
int (*responseFunc_)(const Req_Type reqtype,
const string &requri,
const TC_Http::http_header_type &reqHeader,
const string &reqBody,
int &resopnseStatus,
string &resopnseAbout,
TC_Http::http_header_type &responseHeader,
string &responseBody);
nghttp2_session *session_;
bool bNewCon_;
TC_SpinLock nghttpLock;
// bool bOldVersion_;
// bool bUpgrade_;
};
typedef TC_AutoPtr<TC_Http2Session> TC_Http2SessionPtr;
}
#endif
#endif

View File

@ -1,201 +0,0 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#ifndef __TC_NGHTTP2_H
#define __TC_NGHTTP2_H
#if TARS_HTTP2
#include <string>
#include <list>
#include <map>
#include "nghttp2/nghttp2.h"
namespace tars
{
// struct RequestPacket;
// struct ResponsePacket;
enum ResponseState
{
ResponseNone,
ResponseHeadersDone,
ResponseBodyDone,
};
struct Http2Response
{
int streamId;
std::map<std::string, std::string> headers;
std::string body;
ResponseState state;
void swap(Http2Response& other);
};
/////////////////////////////////////////////////
/**
*@file tc_nghttp2.h
*@brief NGHTTP2封装
*
*/
/////////////////////////////////////////////////
/**
*@brief taf client网络线程使用的打包函数
*/
// void http2Request(const RequestPacket& , std::string& );
/**
*@brief NGHTTP2封装
*/
class TC_NgHttp2
{
// friend const vector<char> & http2Request(const RequestPacket&);
// friend size_t http2Response(const char* , size_t , std::list<tars::ResponsePacket>& , void*);
public:
/**
*@brief ng session的send回调
*/
static ssize_t onSend(nghttp2_session *session, const unsigned char* data, size_t length, int flags, void *user_data);
/**
*@brief ng session的收到新流回调
*/
static int onBeginHeaders(nghttp2_session *session, const nghttp2_frame *frame, void *user_data);
/**
*@brief ng session的收到header kv
*/
static int onHeader(nghttp2_session *session, const nghttp2_frame *frame,
const uint8_t *name, size_t namelen,
const uint8_t *value, size_t valuelen,
uint8_t flags, void *user_data);
/**
*@brief ng session的收到完整frame
*/
static int onFrameRecv(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) ;
/**
*@brief ng session的收到data frame
*/
static int onDataChunkRecv(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *data,
size_t len, void *user_data);
/**
*@brief ng session的收到完整stream
*/
static int onStreamClose(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data);
public:
enum State
{
None,
Negotiating,
Http2, // use HTTP-2
Http1, // please use HTTP-1
};
/**
* @brief .
*/
explicit
TC_NgHttp2(bool isServer = false);
/**
* @brief .
*/
~TC_NgHttp2();
/**
* @brief
*/
int getState() const;
private:
/**
* @brief
*/
TC_NgHttp2(const TC_NgHttp2& );
void operator=(const TC_NgHttp2& );
public:
/**
* @brief
*/
void Init();
/**
* @brief
*/
void onNegotiateDone(bool succ);
/**
* @brief HTTP2握手+setting
*/
int settings(unsigned int maxCurrentStreams = 1);
/**
* @brief
*/
std::string& sendBuffer();
/**
* @brief session
*/
nghttp2_session* session() const;
/**
* @brief
*/
std::map<int, Http2Response> &doneResponses() { return _doneResponses; }
private:
/**
* session
*/
nghttp2_session* _session;
/**
*
*/
State _state;
/**
*
*/
bool _isServer;
/**
* send callback填充
*/
std::string _sendBuf;
/**
*
*/
std::map<int, Http2Response> _responses;
/**
*
*/
std::map<int, Http2Response> _doneResponses;
/**
*使
*/
std::string _settings;
};
} // end namespace tars
#endif // end #if TARS_HTTP2
#endif

View File

@ -1,62 +0,0 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#if TARS_HTTP2
#include <map>
#include <string>
#include "util/tc_nghttp2.h"
#include "util/tc_http2clientmgr.h"
namespace tars
{
Http2ClientSessionManager::Http2ClientSessionManager()
{
}
Http2ClientSessionManager::~Http2ClientSessionManager()
{
}
TC_NgHttp2* Http2ClientSessionManager::getSession(int id, bool isServer)
{
SESSION_MAP::iterator it(_sessions.find(id));
if (it == _sessions.end())
{
TC_NgHttp2* http2 = new TC_NgHttp2(isServer);
it = _sessions.insert(std::make_pair(id, http2)).first;
}
return it->second;
}
bool Http2ClientSessionManager::delSession(int id)
{
SESSION_MAP::iterator it(_sessions.find(id));
if (it == _sessions.end())
return false;
nghttp2_session_del(it->second->session());
_sessions.erase(it);
return true;
}
} // end namespace tars
#endif // end #if TARS_SSL

View File

@ -1,496 +0,0 @@
#if TARS_HTTP2
#include "util/tc_http.h"
#include "util/tc_http2session.h"
#include "util/tc_epoll_server.h"
namespace tars
{
#define MAKE_NV(NAME, VALUE) \
{ \
(uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1, \
NGHTTP2_NV_FLAG_NONE \
}
#define ARRLEN(x) (sizeof(x) / sizeof(x[0]))
#define MIN(x, y) ((x < y)?x:y)
static ssize_t str_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length,
uint32_t *data_flags,
nghttp2_data_source *source,
void *user_data)
{
TC_Http2Session::DataPack *dataPack = (TC_Http2Session::DataPack*)(source->ptr);
size_t size = MIN(dataPack->dataBuf.size() - dataPack->readPos, length);
memcpy(buf, dataPack->dataBuf.c_str() + dataPack->readPos, size);
dataPack->readPos += size;
if(dataPack->readPos == dataPack->dataBuf.size())
{
//TLOGDEBUG("[str_read_callback] finish:" << size << endl);
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
//TLOGDEBUG("[str_read_callback] size:" << size << " length:" << length<< endl);
return size;
}
static ssize_t send_callback(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
TC_Http2Session *ptr = (TC_Http2Session*)user_data;
{
TC_LockT<TC_SpinLock> lock(ptr->responseBufLock_);
ptr->responseBuf_.append((char*)data, length);
}
//TLOGDEBUG("[send_callback] length:" << length << endl);
return (ssize_t)length;
}
static int on_header_callback(nghttp2_session *session,
const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
cout << "[on_header_callback] streamid:" << frame->hd.stream_id << " name:" << name << " value:" << value << " flags:" << flags << endl;
TC_Http2Session *ptr = (TC_Http2Session*)user_data;
{
TC_LockT<TC_SpinLock>lock(ptr->reqLock_);
map<int32_t, TC_Http2Session::RequestPack>::iterator it = ptr->mReq_.find(frame->hd.stream_id);
if (it != ptr->mReq_.end())
{
string skey((char*)name, namelen);
string svalue((char*)value, valuelen);
it->second.header.insert(std::pair<string, string>(skey, svalue));
skey = TC_Common::lower(TC_Common::trim(skey));
if (skey == ":method")
{
string sMethod = TC_Common::upper(TC_Common::trim(svalue));
if (sMethod == "GET")
it->second.method = REQUEST_GET;
else if (sMethod == "POST")
it->second.method = REQUEST_POST;
else if (sMethod == "OPTIONS")
it->second.method = REQUEST_OPTIONS;
else if (sMethod == "HEAD")
it->second.method = REQUEST_HEAD;
else if (sMethod == "PUT")
it->second.method = REQUEST_PUT;
else if (sMethod == "DELETE")
it->second.method = REQUEST_DELETE;
}
else if (skey == ":path")
{
it->second.uri = svalue;
}
}
}
return 0;
}
static int on_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
cout << "[on_begin_headers_callback] streamid:" << frame->hd.stream_id << endl;
TC_Http2Session *ptr = (TC_Http2Session*)user_data;
if (frame->hd.type != NGHTTP2_HEADERS || frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
return 0;
}
{
TC_LockT<TC_SpinLock> lock(ptr->reqLock_);
ptr->mReq_[frame->hd.stream_id].streamId = frame->hd.stream_id;
}
return 0;
}
static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
{
//TLOGDEBUG("[on_frame_recv_callback] id:" << frame->hd.stream_id << " type:" << int(frame->hd.type) << endl);
cout << "[on_frame_recv_callback] id:" << frame->hd.stream_id << " type:" << int(frame->hd.type) << endl;
TC_Http2Session *ptr = (TC_Http2Session*)user_data;
switch (frame->hd.type)
{
case NGHTTP2_DATA:
case NGHTTP2_HEADERS:
/* Check that the client request has finished */
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
{
cout << "[on_frame_recv_callback] NGHTTP2_FLAG_END_STREAM" << endl;
{
TC_LockT<TC_SpinLock> lock(ptr->reqLock_);
map<int32_t, TC_Http2Session::RequestPack>::iterator it = ptr->mReq_.find(frame->hd.stream_id);
if (it != ptr->mReq_.end())
{
it->second.bFinish = true;
if(it->second.header.find(":method") != it->second.header.end() ||
it->second.header.find(":path") != it->second.header.end() ||
it->second.header.find(":scheme") != it->second.header.end())
{
cout << "insert reqout_" << endl;
char *tmpptr = (char*)&(it->second);
ptr->reqout_.insert(ptr->reqout_.end(), (char*)&tmpptr, (char*)&tmpptr + sizeof(TC_Http2Session::RequestPack *));
}
}
}
return 0;
}
break;
default:
break;
}
return 0;
}
static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *data,
size_t len, void *user_data)
{
cout << "[on_data_chunk_recv_callback] stream_id:" << stream_id << endl;
TC_Http2Session *ptr = (TC_Http2Session*)user_data;
{
TC_LockT<TC_SpinLock> lock(ptr->reqLock_);
map<int32_t, TC_Http2Session::RequestPack>::iterator it = ptr->mReq_.find(stream_id);
if (it != ptr->mReq_.end())
{
it->second.body.append((char*)data, len);
}
}
return 0;
}
static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data)
{
cout << "[on_stream_close_callback] streamid:" << stream_id << endl;
TC_Http2Session *ptr = (TC_Http2Session*)user_data;
{
TC_LockT<TC_SpinLock> lock(ptr->reqLock_);
map<int32_t, TC_Http2Session::RequestPack>::iterator it = ptr->mReq_.find(stream_id);
if (it != ptr->mReq_.end())
{
if (it->second.bFinish != true)
{
ptr->mReq_.erase(stream_id);
}
}
}
return 0;
}
TC_Http2Session::TC_Http2Session():session_(NULL), bNewCon_(true)
{
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, on_data_chunk_recv_callback);
nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, on_stream_close_callback);
nghttp2_session_callbacks_set_on_header_callback(callbacks, on_header_callback);
nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, on_begin_headers_callback);
nghttp2_session_server_new(&session_, callbacks, ((void*)this));
*(int32_t*)((char*)session_ + 2380) = 100000000;
//TLOGDEBUG("window size:" << nghttp2_session_get_remote_window_size(session_) << endl);
nghttp2_session_callbacks_del(callbacks);
}
TC_Http2Session::~TC_Http2Session()
{
nghttp2_session_del(session_);
}
TC_NetWorkBuffer::PACKET_TYPE TC_Http2Session::parse(TC_NetWorkBuffer&in, vector<char> &out)
{
if(bNewCon_)
{
bNewCon_ = false;
nghttp2_settings_entry iv[2] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100},
{NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 100*1024*1024}};
nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, iv, ARRLEN(iv));
}
vector<char> buff = in.getBuffers();
int readlen = nghttp2_session_mem_recv(session_, (uint8_t *)buff.data(), buff.size());
cout << "parse:" << readlen << ", reqout_ size: " << reqout_.size() << endl;
if(readlen < 0)
{
return TC_NetWorkBuffer::PACKET_ERR;
}
else
{
in.moveHeader(readlen);
if (reqout_.empty())
{
return TC_NetWorkBuffer::PACKET_LESS;
}
out.insert(out.end(), reqout_.begin(), reqout_.end());
reqout_.clear();
}
return TC_NetWorkBuffer::PACKET_FULL;
}
int TC_Http2Session::doRequest(const vector<char> &request, vector<int32_t>& vtReqid)
{
vtReqid.clear();
string responseAbout;
TC_Http::http_header_type responseHeader;
string sstatus;
for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Session::RequestPack *))
{
RequestPack *ptr;
memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Session::RequestPack *));
vtReqid.push_back(ptr->streamId);
}
return 0;
}
int TC_Http2Session::doResponse(int32_t reqid, const Http2Response &response, vector<char>& out)
{
{
TC_LockT<TC_SpinLock> lock(reqLock_);
map<int32_t, RequestPack>::iterator it = mReq_.find(reqid);
if (it == mReq_.end())
return -1;
}
string sstatus = TC_Common::tostr(response.status);
const char* strstatus = ":status";
nghttp2_nv *hdrs = new nghttp2_nv[response.header.size() + 1];
hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[0].name = (uint8_t*)strstatus;
hdrs[0].namelen = 7;
hdrs[0].value = (uint8_t*)sstatus.c_str();
hdrs[0].valuelen = sstatus.size();
TC_Http::http_header_type::const_iterator it = response.header.begin();
for (int n = 1; it != response.header.end(); n++, it++)
{
hdrs[n].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[n].name = (uint8_t*)it->first.c_str();
hdrs[n].namelen = it->first.size();
hdrs[n].value = (uint8_t*)it->second.c_str();
hdrs[n].valuelen = it->second.size();
}
DataPack dataPack;
dataPack.readPos = 0;
dataPack.dataBuf = response.body;
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void*)&dataPack;
data_prd.read_callback = str_read_callback;
int ret ;
{
TC_LockT<TC_SpinLock> lock(nghttpLock);
ret = nghttp2_submit_response(session_, reqid, hdrs, response.header.size()+1, &data_prd);
if (ret != 0)
return -1;
while (nghttp2_session_want_write(session_)) {
ret = nghttp2_session_send(session_);
if (ret != 0)
return -1;
}
}
delete [] hdrs;
{
TC_LockT<TC_SpinLock> lock(responseBufLock_);
out.clear();
out.insert(out.end(), responseBuf_.begin(), responseBuf_.end());
responseBuf_.clear();
}
{
TC_LockT<TC_SpinLock> lock(reqLock_);
mReq_.erase(reqid);
}
return 0;
}
int TC_Http2Session::doRequest(const vector<char> &request, vector<char>& response)
{
int responseStatus = 0;
string responseAbout;
TC_Http::http_header_type responseHeader;
string sstatus;
for (unsigned int i = 0; i < request.size(); i += sizeof(TC_Http2Session::RequestPack *))
{
RequestPack *ptr;
memcpy(&ptr, (char*)&(request[i]), sizeof(TC_Http2Session::RequestPack *));
string sMethod = TC_Common::upper(TC_Common::trim(ptr->header.find(":method")->second));
if (sMethod == "GET")
responseStatus = REQUEST_GET;
else if (sMethod == "POST")
responseStatus = REQUEST_POST;
else if (sMethod == "OPTIONS")
responseStatus = REQUEST_OPTIONS;
else if (sMethod == "HEAD")
responseStatus = REQUEST_HEAD;
else if (sMethod == "PUT")
responseStatus = REQUEST_PUT;
else if (sMethod == "DELETE")
responseStatus = REQUEST_DELETE;
else
{
continue;
}
sstatus = ptr->header.find(":path")->second;
responseAbout.clear();
responseHeader.clear();
DataPack dataPack;
dataPack.readPos = 0;
responseFunc_((Req_Type)responseStatus,
sstatus,
ptr->header,
ptr->body,
responseStatus,
responseAbout,
responseHeader,
dataPack.dataBuf);
sstatus = TC_Common::tostr(responseStatus);
const char* strstatus = ":status";
nghttp2_nv *hdrs = new nghttp2_nv[responseHeader.size() + 1];
hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[0].name = (uint8_t*)strstatus;
hdrs[0].namelen = 7;
hdrs[0].value = (uint8_t*)sstatus.c_str();
hdrs[0].valuelen = sstatus.size();
TC_Http::http_header_type::iterator it = responseHeader.begin();
for (int n = 1; it != responseHeader.end(); n++, it++)
{
hdrs[n].flags = NGHTTP2_NV_FLAG_NONE;
hdrs[n].name = (uint8_t*)it->first.c_str();
hdrs[n].namelen = it->first.size();
hdrs[n].value = (uint8_t*)it->second.c_str();
hdrs[n].valuelen = it->second.size();
}
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void*)&dataPack;
data_prd.read_callback = str_read_callback;
int ret ;
{
TC_LockT<TC_SpinLock> lock(nghttpLock);
ret = nghttp2_submit_response(session_, ptr->streamId, hdrs, responseHeader.size()+1, &data_prd);
if (ret != 0)
;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret));
while (nghttp2_session_want_write(session_)) {
ret = nghttp2_session_send(session_);
if (ret != 0)
;//TLOGERROR("Fatal error: %s", nghttp2_strerror(ret));
}
response.clear();
response.insert(response.end(), responseBuf_.begin(), responseBuf_.end());
responseBuf_.clear();
}
delete [] hdrs;
{
TC_LockT<TC_SpinLock> lock(reqLock_);
mReq_.erase(ptr->streamId);
}
}
return 0;
}
int TC_Http2Session::getMethod(int32_t reqid, Req_Type &method)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
map<int32_t, RequestPack>::iterator it = mReq_.find(reqid);
if (it != mReq_.end())
method = it->second.method;
else
return -1;
return 0;
}
int TC_Http2Session::getUri(int32_t reqid, string &uri)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
map<int32_t, RequestPack>::iterator it = mReq_.find(reqid);
if (it != mReq_.end())
uri = it->second.uri;
else
return -1;
return 0;
}
int TC_Http2Session::getHeader(int32_t reqid, TC_Http::http_header_type &header)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
map<int32_t, RequestPack>::iterator it = mReq_.find(reqid);
if (it != mReq_.end())
header = it->second.header;
else
return -1;
return 0;
}
int TC_Http2Session::getBody(int32_t reqid, string &body)
{
TC_LockT<TC_SpinLock> lock(reqLock_);
map<int32_t, RequestPack>::iterator it = mReq_.find(reqid);
if (it != mReq_.end())
body = it->second.body;
else
return -1;
return 0;
}
}
#endif

View File

@ -1,245 +0,0 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#if TARS_HTTP2
#include <string>
#include <algorithm>
#include <iostream>
#include "nghttp2/nghttp2.h"
#include "util/tc_nghttp2.h"
// #include "util/tc_http2clientmgr.h"
#include "util/tc_base64.h"
namespace tars
{
void Http2Response::swap(Http2Response& other)
{
if (this == &other)
return;
std::swap(streamId, other.streamId);
headers.swap(other.headers);
body.swap(other.body);
std::swap(state, other.state);
}
ssize_t TC_NgHttp2::onSend(nghttp2_session* session,
const uint8_t* data,
size_t length,
int flags,
void* user_data)
{
TC_NgHttp2* nghttp2 = (TC_NgHttp2* )user_data;
nghttp2->_sendBuf.append((const char*)data, length);
return length;
}
int TC_NgHttp2::onBeginHeaders(nghttp2_session* session,
const nghttp2_frame* frame,
void* user_data)
{
TC_NgHttp2* nghttp2 = (TC_NgHttp2* )user_data;
if (frame->hd.type == NGHTTP2_HEADERS)
{
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
Http2Response rsp;
rsp.streamId = frame->hd.stream_id;
rsp.state = ResponseNone;
nghttp2->_responses[rsp.streamId] = rsp;
}
}
return 0;
}
int TC_NgHttp2::onHeader(nghttp2_session* session, const nghttp2_frame* frame,
const uint8_t* name, size_t namelen,
const uint8_t* value, size_t valuelen,
uint8_t flags, void* user_data)
{
TC_NgHttp2* nghttp2 = (TC_NgHttp2* )user_data;
int streamId = frame->hd.stream_id;
std::map<int, Http2Response>::iterator it = nghttp2->_responses.find(streamId);
if (it == nghttp2->_responses.end())
{
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
std::string n((const char*)name, namelen);
std::string v((const char*)value, valuelen);
it->second.headers.insert(std::make_pair(n, v));
return 0;
}
int TC_NgHttp2::onFrameRecv(nghttp2_session* session,
const nghttp2_frame* frame,
void* user_data)
{
TC_NgHttp2* nghttp2 = (TC_NgHttp2* )user_data;
int streamId = frame->hd.stream_id;
if (streamId == 0)
return 0;
std::map<int, Http2Response>::iterator it = nghttp2->_responses.find(streamId);
if (it == nghttp2->_responses.end())
{
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS)
{
it->second.state = ResponseHeadersDone;
}
return 0;
default:
break;
}
return 0;
}
int TC_NgHttp2::onDataChunkRecv(nghttp2_session* session, uint8_t flags,
int32_t stream_id, const uint8_t* data,
size_t len, void* user_data)
{
TC_NgHttp2* nghttp2 = (TC_NgHttp2* )user_data;
std::map<int, Http2Response>::iterator it = nghttp2->_responses.find(stream_id);
if (it == nghttp2->_responses.end())
{
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
it->second.body.append((const char* )data, len);
return 0;
}
int TC_NgHttp2::onStreamClose(nghttp2_session* session, int32_t stream_id,
uint32_t error_code, void* user_data)
{
TC_NgHttp2* nghttp2 = (TC_NgHttp2* )user_data;
std::map<int, Http2Response>::iterator it = nghttp2->_responses.find(stream_id);
if (it == nghttp2->_responses.end())
{
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
it->second.state = ResponseBodyDone;
nghttp2->_doneResponses[stream_id].swap(it->second);
nghttp2->_responses.erase(it);
return 0;
}
TC_NgHttp2::TC_NgHttp2(bool isServer) :
_session(NULL),
_state(None),
_isServer(isServer)
{
nghttp2_session_callbacks* callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, &TC_NgHttp2::onSend);
nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, &TC_NgHttp2::onBeginHeaders);
nghttp2_session_callbacks_set_on_header_callback(callbacks, &TC_NgHttp2::onHeader);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, &TC_NgHttp2::onFrameRecv);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, &TC_NgHttp2::onDataChunkRecv);
nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, &TC_NgHttp2::onStreamClose);
if (isServer)
nghttp2_session_server_new(&_session, callbacks, this);
else
nghttp2_session_client_new(&_session, callbacks, this);
nghttp2_session_callbacks_del(callbacks);
}
TC_NgHttp2::~TC_NgHttp2()
{
}
int TC_NgHttp2::getState() const
{
return (int)_state;
}
void TC_NgHttp2::Init()
{
if (_state != None)
return;
_state = Http2;
}
int TC_NgHttp2::settings(unsigned int maxCurrentStreams)
{
nghttp2_settings_entry iv[2] = {
{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, maxCurrentStreams},
{NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 1024 * 1024 * 1024},
};
/* 24 bytes magic string also will be sent*/
int rv = nghttp2_submit_settings(_session,
NGHTTP2_FLAG_NONE,
iv,
sizeof(iv)/sizeof(iv[0]));
rv = nghttp2_session_send(_session);
return rv;
}
void TC_NgHttp2::onNegotiateDone(bool succ)
{
// assert (_state == Negotiating);
_state = succ ? Http2: Http1;
if (succ)
{
int rv = nghttp2_session_upgrade(_session,
(const uint8_t*)_settings.data(),
_settings.size(),
NULL);
if (rv)
cerr << "nghttp2_session_upgrade error: " << nghttp2_strerror(rv) << endl;
}
}
string& TC_NgHttp2::sendBuffer()
{
return _sendBuf;
}
nghttp2_session* TC_NgHttp2::session() const
{
return _session;
}
} // end namespace tars
#endif // end #if TARS_HTTP2