Merge pull request #250 from westonli/master

add common protocol call
This commit is contained in:
ruanshudong 2022-08-08 09:18:27 +08:00 committed by GitHub
commit 75c00b1f32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 230 additions and 2 deletions

View File

@ -216,7 +216,7 @@ void AdapterProxy::onCompletePackage(TC_Transceiver* trans)
shared_ptr<TC_NetWorkBuffer::Buffer> AdapterProxy::onSendAuthCallback(TC_Transceiver* trans)
{
// LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
// LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
// 走框架的AK/SK认证
BasicAuthInfo info;
@ -229,7 +229,7 @@ shared_ptr<TC_NetWorkBuffer::Buffer> AdapterProxy::onSendAuthCallback(TC_Transce
const int kAuthType = 0x40;
RequestPacket request;
request.sFuncName = "InnerAuthServer";
request.sServantName = "authServant";
request.sServantName = _objectProxy->name();
request.iVersion = TARSVERSION;
request.iRequestId = 1;
request.cPacketType = TARSNORMAL;
@ -242,6 +242,8 @@ shared_ptr<TC_NetWorkBuffer::Buffer> AdapterProxy::onSendAuthCallback(TC_Transce
TC_NetWorkBuffer::PACKET_TYPE AdapterProxy::onVerifyAuthCallback(TC_NetWorkBuffer &buff, TC_Transceiver*trans)
{
shared_ptr<ResponsePacket> rsp = std::make_shared<ResponsePacket>();
const int kAuthType = 0x40;
rsp->iMessageType = kAuthType;
TC_NetWorkBuffer::PACKET_TYPE ret = _objectProxy->getRootServantProxy()->tars_get_protocol().responseFunc(buff, *rsp.get());

View File

@ -23,6 +23,7 @@
#include "util/tc_autoptr.h"
#include "util/tc_proxy_info.h"
#include "util/tc_singleton.h"
#include "util/tc_custom_protocol.h"
#include "servant/Message.h"
#include "servant/AppProtocol.h"
#include "servant/Current.h"
@ -1122,6 +1123,85 @@ public:
*/
void http_call_async(const string &funcName, shared_ptr<TC_HttpRequest> &request, const HttpCallbackPtr &cb, bool bCoro = false);
void common_protocol_call(const string &funcName, shared_ptr<TC_CustomProtoReq> &request, shared_ptr<TC_CustomProtoRsp> &response)
{
if (_connectionSerial <= 0)
{
_connectionSerial = DEFAULT_CONNECTION_SERIAL;
}
ReqMessage *msg = new ReqMessage();
msg->init(ReqMessage::SYNC_CALL, this);
msg->bFromRpc = true;
msg->request.sFuncName = funcName;
msg->request.sBuffer.resize(sizeof(shared_ptr<TC_CustomProtoReq>));
msg->deconstructor = [msg] {
shared_ptr<TC_CustomProtoReq> &data = *(shared_ptr<TC_CustomProtoReq> *)(msg->request.sBuffer.data());
data.reset();
if (!msg->response->sBuffer.empty())
{
shared_ptr<TC_CustomProtoRsp> &rsp = *(shared_ptr<TC_CustomProtoRsp> *)(msg->response->sBuffer.data());
//主动reset一次
rsp.reset();
msg->response->sBuffer.clear();
}
};
shared_ptr<TC_CustomProtoReq> &data = *(shared_ptr<TC_CustomProtoReq> *)(msg->request.sBuffer.data());
data = request;
servant_invoke(msg, false);
response = *(shared_ptr<TC_CustomProtoRsp> *)(msg->response->sBuffer.data());
delete msg;
msg = NULL;
}
void common_protocol_call_async(const string &funcName, shared_ptr<TC_CustomProtoReq> &request, const ServantProxyCallbackPtr &cb, bool bCoro = false)
{
if (_connectionSerial <= 0)
{
_connectionSerial = DEFAULT_CONNECTION_SERIAL;
}
ReqMessage *msg = new ReqMessage();
msg->init(ReqMessage::ASYNC_CALL, this);
msg->bFromRpc = true;
msg->request.sFuncName = funcName;
msg->request.sServantName = tars_name();
msg->request.sBuffer.resize(sizeof(shared_ptr<TC_CustomProtoReq>));
msg->deconstructor = [msg] {
shared_ptr<TC_CustomProtoReq> &data = *(shared_ptr<TC_CustomProtoReq> *)(msg->request.sBuffer.data());
data.reset();
if (!msg->response->sBuffer.empty())
{
shared_ptr<TC_CustomProtoRsp> &rsp = *(shared_ptr<TC_CustomProtoRsp> *)(msg->response->sBuffer.data());
//主动reset一次
rsp.reset();
msg->response->sBuffer.clear();
}
};
*(shared_ptr<TC_CustomProtoReq> *)(msg->request.sBuffer.data()) = request;
msg->callback = cb;
servant_invoke(msg, bCoro);
}
/**
* TARS协议同步方法调用
*/
@ -1263,6 +1343,7 @@ private:
friend class AdapterProxy;
friend class CommunicatorEpoll;
private:
/**
*
*/

View File

@ -0,0 +1,145 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#ifndef __TC_CUSTOM_PROTOCOL_H_
#define __TC_CUSTOM_PROTOCOL_H_
#include "util/tc_ex.h"
#include "util/tc_port.h"
#include "util/tc_common.h"
#include "util/tc_autoptr.h"
#include "util/tc_thread.h"
#include "util/tc_socket.h"
#include "util/tc_epoller.h"
#include "util/tc_timeout_queue.h"
#include "util/tc_network_buffer.h"
#include <map>
#include <sstream>
#include <cassert>
#include <vector>
namespace tars
{
/////////////////////////////////////////////////
/**
* @file tc_custom_protocol.h
* @brief custom protocol类.
* @brief custom protocol class
*
* TC_CustomProtoReqTC_CustomProtoRsp两个类
* Including TC_CustomProtoReqTC_CustomProtoRsp Two classes of custom protocol;
*
*/
/////////////////////////////////////////////////
class TC_NetWorkBuffer;
/**
* @brief custom protocol
* @brief custom protocol protocol resolution exception class
*/
struct TC_CustomProto_Exception : public TC_Exception
{
TC_CustomProto_Exception(const string &sBuffer) : TC_Exception(sBuffer){};
~TC_CustomProto_Exception() {};
};
/**
* @brief custom
* @brief custom response Protocol Resolution Exception Class
*/
struct TC_CustomProtoReq_Exception : public TC_CustomProto_Exception
{
TC_CustomProtoReq_Exception(const string &sBuffer) : TC_CustomProto_Exception(sBuffer){};
~TC_CustomProtoReq_Exception() {};
};
/**
* @brief custom
* @brief custom request protocol resolution exception class
*/
struct TC_CustomProtoRsp_Exception : public TC_CustomProto_Exception
{
TC_CustomProtoRsp_Exception(const string &sBuffer) : TC_CustomProto_Exception(sBuffer){};
~TC_CustomProtoRsp_Exception() {};
};
class TC_CustomProtoReq
{
public:
void encode(shared_ptr<TC_NetWorkBuffer::Buffer>& buff)
{
buff->addBuffer(std::move(_buffer));
}
void sendBuffer(const string& buffer)
{
_buffer = buffer;
// LOG_CONSOLE_DEBUG << "_buffer:" << _buffer << endl;
}
private:
string _buffer;
};
class TC_CustomProtoRsp
{
public:
typedef std::function<bool(TC_NetWorkBuffer::Buffer&)> IncrementDecodeFunc;
virtual bool decode(TC_NetWorkBuffer::Buffer &data)
{
return true;
}
bool incrementDecode(TC_NetWorkBuffer &buff)
{
if(buff.empty())
return false;
buff.mergeBuffers();
size_t length = buff.getBufferLength();
auto sBuf = buff.getBuffer();
bool flag = decode(*sBuf.get());
buff.subLength(length - sBuf->length());
return flag;
}
string &getBuffer()
{
return _buffer;
}
void set_protocol(const IncrementDecodeFunc& incrementDecodeFunc)
{
_incrementDecodeFunc = incrementDecodeFunc;
}
protected:
string _buffer;
IncrementDecodeFunc _incrementDecodeFunc;
};
}
#endif