support http2 succ

This commit is contained in:
jarodruan 2020-02-10 16:23:17 +08:00
parent 9a9fda7b6e
commit 9f5da0f7bf
13 changed files with 193 additions and 771 deletions

View File

@ -77,17 +77,20 @@ static vector<char> customRequest(RequestPacket& request, Transceiver *)
class CustomCallBack : public ServantProxyCallback
{
public:
virtual int onDispatchResponse(const RequestPacket &req, const ResponsePacket &rsp)
virtual int onDispatch(ReqMessagePtr msg)
{
cout << "async response:" << rsp.sBuffer.data() << endl;
return 0;
if(msg->response->iRet != tars::TARSSERVERSUCCESS)
{
cout << "ret error:" << msg->response->iRet << endl;
}
else
{
cout << "succ:" << msg->response->sBuffer.data() << endl;
}
return msg->response->iRet;
}
virtual int onDispatchException(const RequestPacket &req, const ResponsePacket &rsp)
{
cout << "async exception:" << rsp.iRet << endl;
return 0;
}
};
typedef tars::TC_AutoPtr<CustomCallBack> CustomCallBackPtr;

View File

@ -189,29 +189,33 @@ struct TupCallback : public ServantProxyCallback
}
virtual int onDispatchResponse(const RequestPacket &request, const ResponsePacket &response)
virtual int onDispatch(ReqMessagePtr msg)
{
callback_count++;
if(msg->response->iRet != tars::TARSSERVERSUCCESS)
{
cout << "ret error:" << msg->response->iRet << endl;
}
else
{
callback_count++;
TarsUniPacket<> rsp;
TarsUniPacket<> rsp;
rsp.decode(response.sBuffer.data(), response.sBuffer.size());
rsp.decode(msg->response->sBuffer.data(), msg->response->sBuffer.size());
int ret = rsp.get<int>("");
string sRsp = rsp.get<string>("sRsp");
if(cur == count-1)
{
int64_t cost = TC_Common::now2us() - start;
cout << "TupCallback count:" << count << ", " << cost << " us, avg:" << 1.*cost/count << "us" << endl;
}
int ret = rsp.get<int>("");
string sRsp = rsp.get<string>("sRsp");
return 0;
}
if(cur == count-1)
{
int64_t cost = TC_Common::now2us() - start;
cout << "TupCallback count:" << count << ", " << cost << " us, avg:" << 1.*cost/count << "us" << endl;
}
virtual int onDispatchException(const RequestPacket &req, const ResponsePacket &rsp)
{
cout << "onDispatchException" << endl;
return 0;
return 0;
}
return msg->response->iRet;
}
int64_t start;

View File

@ -158,13 +158,6 @@ int AdapterProxy::invoke(ReqMessage * msg)
msg->request.iRequestId = _timeoutQueue->generateId();
}
// #if TARS_HTTP2
// if (getObjProxy()->getProtoName() == HTTP2)
// {
// msg->request.iRequestId = getId(); // session Id
// }
// #endif
#ifdef _USE_OPENTRACKING
startTrack(msg);
#endif

View File

@ -31,11 +31,11 @@
namespace tars
{
//TAFServer的协议解析器
TC_NetWorkBuffer::PACKET_TYPE AppProtocol::parseAdmin(TC_NetWorkBuffer &in, shared_ptr<TC_NetWorkBuffer::SendBuffer> &out)
{
return parse(in, out->getBuffer());
}
// //TAFServer的协议解析器
// TC_NetWorkBuffer::PACKET_TYPE AppProtocol::parseAdmin(TC_NetWorkBuffer &in, shared_ptr<TC_NetWorkBuffer::SendBuffer> &out)
// {
// return parse(in, out->getBuffer());
// }
vector<char> ProxyProtocol::tarsRequest(RequestPacket& request, Transceiver *)
{

View File

@ -55,7 +55,6 @@ ObjectProxy::ObjectProxy(CommunicatorEpoll * pCommunicatorEpoll, const string &
_proxyProtocol.requestFunc = ProxyProtocol::tarsRequest;
_proxyProtocol.responseFunc = ProxyProtocol::tarsResponse;
_protoName = "tars";
_endpointManger.reset(new EndpointManager(this, _communicatorEpoll->getCommunicator(), sObjectProxyName, pCommunicatorEpoll->isFirstNetThread(), setName));
@ -93,7 +92,7 @@ int ObjectProxy::loadLocator()
return 0;
}
void ObjectProxy::setProxyProtocol(const ProxyProtocol& protocol, const std::string& name)
void ObjectProxy::setProxyProtocol(const ProxyProtocol& protocol)
{
if(_hasSetProtocol)
{
@ -102,7 +101,6 @@ void ObjectProxy::setProxyProtocol(const ProxyProtocol& protocol, const std::str
_hasSetProtocol = true;
_proxyProtocol = protocol;
_protoName = name;
}
ProxyProtocol& ObjectProxy::getProxyProtocol()
@ -110,11 +108,6 @@ ProxyProtocol& ObjectProxy::getProxyProtocol()
return _proxyProtocol;
}
const std::string& ObjectProxy::getProtoName() const
{
return _protoName;
}
void ObjectProxy::setSocketOpt(int level, int optname, const void *optval, socklen_t optlen)
{
SocketOpt socketOpt;

View File

@ -161,7 +161,7 @@ ServantProxyCallback::ServantProxyCallback()
{
}
int ServantProxyCallback::onDispatch(ReqMessagePtr msg)
int HttpServantProxyCallback::onDispatch(ReqMessagePtr msg)
{
if (msg->response->iRet != tars::TARSSERVERSUCCESS)
{
@ -171,18 +171,6 @@ int ServantProxyCallback::onDispatch(ReqMessagePtr msg)
return onDispatchResponse(msg->request, *msg->response);
}
int ServantProxyCallback::onDispatchResponse(const RequestPacket &req, const ResponsePacket &rsp)
{
TLOGERROR("[TARS][ServantProxyCallback::onDispatchResponse not be implemented]"<<endl);
return 0;
}
int ServantProxyCallback::onDispatchException(const RequestPacket &req, const ResponsePacket &rsp)
{
TLOGERROR("[TARS][ServantProxyCallback::onDispatchException not be implemented]"<<endl);
return 0;
}
HttpServantProxyCallback::HttpServantProxyCallback(const HttpCallbackPtr& cb) :
_httpCb(cb)
{
@ -370,13 +358,13 @@ int ServantProxy::tars_async_timeout() const
}
void ServantProxy::tars_set_protocol(const ProxyProtocol& protocol, const std::string& protoName)
void ServantProxy::tars_set_protocol(const ProxyProtocol& protocol)
{
TC_LockT<TC_ThreadMutex> lock(*this);
for(size_t i = 0;i < _objectProxyNum; ++i)
{
(*(_objectProxy + i))->setProxyProtocol(protocol, protoName);
(*(_objectProxy + i))->setProxyProtocol(protocol);
}
}

View File

@ -310,7 +310,6 @@ TC_Http2Client* Transceiver::getHttp2Client()
if(_http2Client == NULL)
{
_http2Client = new TC_Http2Client();
// _http2Client->Init();
_http2Client->settings();
}
@ -318,56 +317,12 @@ TC_Http2Client* Transceiver::getHttp2Client()
}
#endif
// int Transceiver::doRequest()
// {
// if(!isValid())
// {
// return -1;
// }
// int iRet = 0;
// //buf不为空,先发生buffer的内容
// if(!_sendBuffer.IsEmpty())
// {
// size_t length = 0;
// void* data = NULL;
// _sendBuffer.PeekData(data, length);
// iRet = this->send(data, length, 0);
// //失败,直接返回
// if(iRet < 0)
// {
// return iRet;
// }
// if(iRet > 0)
// {
// _sendBuffer.Consume(iRet);
// if (_sendBuffer.IsEmpty())
// _sendBuffer.Shrink();
// else
// return 0;
// }
// }
// //取adapter里面积攒的数据
// _adapterProxy->doInvoke();
// //object里面应该是空的
// assert(_adapterProxy->getObjProxy()->timeoutQSize() == 0);
// return 0;
// }
int Transceiver::doRequest()
{
if(!isValid()) return -1;
// int64_t s = TC_Common::now2us();
//buf不为空,先发送buffer的内容
if(_sendBuffer && !_sendBuffer->empty())
{
@ -386,15 +341,12 @@ int Transceiver::doRequest()
_adapterProxy->doInvoke();
}
// cout << "doRequest:" << std::this_thread::get_id() << ", us:" << TC_Common::now2us() - s << ", invoke:" << voke << endl;
//object里面应该是空的
// assert(_adapterProxy->getObjProxy()->timeQEmpty());
return 0;
}
// int Transceiver::sendRequest(const char * pData, size_t iSize, bool forceSend)
int Transceiver::sendRequest(const shared_ptr<TC_NetWorkBuffer::SendBuffer> &buff, bool forceSend)
{
//空数据 直接返回成功

View File

@ -69,16 +69,6 @@ public:
return TC_NetWorkBuffer::parseBinary4<TARS_NET_MIN_PACKAGE_SIZE, TARS_NET_MAX_PACKAGE_SIZE>(in, out);
}
/**
*
* @param in, buffer
* @param out,
*
* @return int, 0, 1
*/
static TC_NetWorkBuffer::PACKET_TYPE parseAdmin(TC_NetWorkBuffer &in, shared_ptr<TC_NetWorkBuffer::SendBuffer> &out);
/**
*
* @param T

View File

@ -91,19 +91,14 @@ public:
*
* @return UserProtocol&
*/
void setProxyProtocol(const ProxyProtocol& protocol, const std::string& name = "tars");
void setProxyProtocol(const ProxyProtocol& protocol);
/**
*
* @return ProxyProtocol&
*/
ProxyProtocol& getProxyProtocol();
/**
*
* @return ProxyProtocol&
*/
const std::string& getProtoName() const;
/**
*
*/
@ -178,21 +173,6 @@ public:
return _invokeSetId;
}
/**
* Id
*/
// inline uint32_t generateId()
// {
// _id++;
// if(_id == 0)
// {
// ++_id;
// }
// return _id;
// }
/**
*
* @return int
@ -280,11 +260,6 @@ private:
*/
bool _isInvokeBySet;
/*
* id
*/
// uint32_t _id;
/*
* tars_set_protocol设置过proxy的协议函数
*
@ -295,10 +270,6 @@ private:
*
*/
ProxyProtocol _proxyProtocol;
/*
*
*/
std::string _protoName;
/*
*

View File

@ -298,21 +298,7 @@ public:
* @param msg
* @return int
*/
virtual int onDispatch(ReqMessagePtr ptr);
/**
*
* @param msg
* @return int
*/
virtual int onDispatchResponse(const RequestPacket &req, const ResponsePacket &rsp);
/**
* ()
* @param msg
* @return int
*/
virtual int onDispatchException(const RequestPacket &req, const ResponsePacket &rsp);
virtual int onDispatch(ReqMessagePtr ptr) = 0;
protected:
@ -351,6 +337,13 @@ class HttpServantProxyCallback : virtual public ServantProxyCallback
public:
explicit HttpServantProxyCallback(const HttpCallbackPtr& cb);
/**
*
* @param msg
* @return int
*/
virtual int onDispatch(ReqMessagePtr ptr);
/**
*
* @param msg
@ -519,7 +512,7 @@ public:
*
* @param protocol
*/
void tars_set_protocol(const ProxyProtocol& protocol, const std::string& protoName = "tars");
void tars_set_protocol(const ProxyProtocol& protocol);
/**
*
@ -600,7 +593,6 @@ public:
tars::TarsOutputStream<tars::BufferWriterVector>& buf,
const map<string, string>& context,
const map<string, string>& status);
// ResponsePacket& rep);
/**
* TARS协议异步方法调用

View File

@ -119,7 +119,6 @@ public:
* fd缓冲区已满,
* ,
*/
// int sendRequest(const char * pData,size_t iSize, bool forceSend = false);
int sendRequest(const shared_ptr<TC_NetWorkBuffer::SendBuffer> &pData, bool forceSend = false);
/*
@ -292,17 +291,6 @@ protected:
* buffer
*/
TC_NetWorkBuffer _recvBuffer;
// /*
// * 发送缓存buff
// */
// TC_Buffer _sendBuffer;
// /*
// * 接收缓存buff
// */
// TC_Buffer _recvBuffer;
};
//////////////////////////////////////////////////////////

File diff suppressed because it is too large Load Diff

View File

@ -309,16 +309,7 @@ protected:
*
* @return string
*/
string generateDispatchResponseAsync(const OperationPtr &pPtr, const string &cn) const;
/**
* onDispatchException
* @param pPtr
* @param cn
*
* @return string
*/
string generateDispatchExceptionAsync(const OperationPtr& pPtr, const string& cn) const;
string generateDispatchAsync(const OperationPtr &pPtr, const string &cn) const;
/**
*
@ -327,17 +318,7 @@ protected:
*
* @return string
*/
string generateDispatchCoroResponseAsync(const OperationPtr &pPtr, const string &cn) const;
/**
*
* @param pPtr
* @param cn
*
* @return string
*/
string generateDispatchCoroExceptionAsync(const OperationPtr &pPtr, const string &cn) const;
string generateDispatchCoroAsync(const OperationPtr &pPtr, const string &cn) const;
/**
* servant操作函数调用的源码
@ -477,6 +458,4 @@ private:
bool _tarsMaster;
};
#endif
#endif