Fix: there is no business interface callback if there is no available node during asynchronous call

Optimize: In the service model, after receiving data, it is inserted into the queue in batch
Add: tc_json adds the function of merge
Optimize: the time occupation when parsing the protocol at the network layer, and release the network thread every 1ms to reduce the delay
This commit is contained in:
ruanshudong 2022-03-22 16:01:51 +08:00
parent 3cba519c4c
commit fe25b3aed4
22 changed files with 1170 additions and 948 deletions

View File

@ -186,7 +186,7 @@ TC_NetWorkBuffer::PACKET_TYPE AdapterProxy::onParserCallback(TC_NetWorkBuffer& b
}
catch(exception &ex)
{
TLOGERROR("[AdapterProxy::onParserCallback parser error:" << ex.what() << "," << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
TLOG_ERROR(ex.what() << ", obj: " << _objectProxy->name() << ", desc:" << _trans->getConnectionString()<< endl);
}
return TC_NetWorkBuffer::PACKET_ERR;

View File

@ -698,8 +698,9 @@ void Application::main(int argc, char *argv[])
void Application::main(const TC_Option &option)
{
//直接输出编译的TARS版本
if(option.hasParam("version"))
__out__.modFlag(0xfffff, false);
//直接输出编译的TAF版本
if (option.hasParam("version"))
{
__out__.debug() << "TARS:" << TARS_VERSION << endl;
exit(0);

View File

@ -119,15 +119,10 @@ void AsyncProcThread::callback(ReqMessage * msg)
pServantProxyThreadData->_data._dyeing = msg->data._dyeing;
pServantProxyThreadData->_data._dyeingKey = msg->data._dyeingKey;
pServantProxyThreadData->_data._cookie = msg->data._cookie;
//=======
// pServantProxyThreadData->_dyeing = msg->bDyeing;
// pServantProxyThreadData->_dyeingKey = msg->sDyeingKey;
pServantProxyThreadData->_traceCall = msg->bTraceCall;
pServantProxyThreadData->initTrace(msg->sTraceKey);
// pServantProxyThreadData->_cookie = msg->cookie;
//>>>>>>> origin/delay
if(msg->adapter)
{

View File

@ -709,20 +709,21 @@ void Communicator::terminate()
void Communicator::pushAsyncThreadQueue(ReqMessage * msg)
{
if(msg->pObjectProxy->getRootServantProxy()->_callback) {
ReqMessagePtr msgPtr = msg;
msg->pObjectProxy->getRootServantProxy()->_callback(msgPtr);
}
else if (msg->pObjectProxy->getRootServantProxy()->_callbackHash)
if (msg->pObjectProxy->getRootServantProxy()->_callback)
{
ReqMessagePtr msgPtr = msg;
msg->pObjectProxy->getRootServantProxy()->_callback(msgPtr);
}
else if (msg->pObjectProxy->getRootServantProxy()->_callbackHash && msg->adapter )
{
//先不考虑每个线程队列数目不一致的情况
_asyncThread[((uint32_t)msg->adapter->trans()->fd()) % _asyncThreadNum]->push_back(msg);
_asyncThread[((uint32_t) msg->adapter->trans()->fd()) % _asyncThreadNum]->push_back(msg);
}
else
{
//先不考虑每个线程队列数目不一致的情况
_asyncThread[(_asyncSeq++) % _asyncThreadNum]->push_back(msg);
}
else {
//先不考虑每个线程队列数目不一致的情况
_asyncThread[(_asyncSeq++) % _asyncThreadNum]->push_back(msg);
}
}
void Communicator::doStat()

View File

@ -538,28 +538,25 @@ bool CommunicatorEpoll::handleNotify(const shared_ptr<TC_Epoller::EpollInfo> &da
ReqMessage * msg = NULL;
size_t maxProcessCount = 0;
try
{
int64_t now = TNOWMS;
while (pFDInfo->msgQueue->pop_front(msg))
{
msg->pObjectProxy->invoke(msg);
if(++maxProcessCount > 1000)
if(TNOWMS - now >= 1)
{
//避免包太多的时候, 循环占用网路线程, 导致连接都建立不上, 一个包都无法发送出去
data->mod(EPOLLOUT);
TLOGTARS("[CommunicatorEpoll::handle max process count: " << maxProcessCount << ", fd:" << data->fd() << "]" << endl);
break;
}
}
if (pFDInfo->msgQueue->empty() && pFDInfo->autoDestroy)
{
// LOG_CONSOLE_DEBUG << "iSeq:" << pFDInfo->iSeq << ", fd:" << pFDInfo->notify.notifyFd() << endl;
delete pFDInfo;
return false;
}

View File

@ -58,7 +58,7 @@ Current::~Current()
const string &Current::getHostName() const
{
auto it = _request.context.find("node_name");
if(it != _request.context.end())
if(it != _request.context.end() && !(it->second.empty()) )
{
return it->second;
}

View File

@ -337,7 +337,11 @@ public:
}
if (flags.size() >= 2)
{
maxLen = max(maxLen, TC_Common::strto<unsigned int>(flags[1]));
// TODO(greatsong): std::max Win32下编译有问题, 添加NOMAXMIN宏不起作用
//maxLen = std::max(maxLen, TC_Common::strto<unsigned int>(flags[1]));
auto f = TC_Common::strto<unsigned int>(flags[1]);
if (maxLen < f)
maxLen = f;
}
// type = strtol(tid.substr(0, pos).c_str(), NULL, 16);

View File

@ -1636,17 +1636,17 @@ string Tars2Cpp::generateServantDispatch(const OperationPtr& pPtr, const string&
vector<ParamDeclPtr>& vParamDecl = pPtr->getAllParamDeclPtr();
string routekey;
// string routekey;
for(size_t i = 0; i < vParamDecl.size(); i++)
{
s << TAB << tostr(vParamDecl[i]->getTypeIdPtr()->getTypePtr()) << " "<< vParamDecl[i]->getTypeIdPtr()->getId()
<< generateInitValue(vParamDecl[i]->getTypeIdPtr()) << ";" << endl;
if (routekey.empty() && vParamDecl[i]->isRouteKey())
{
routekey = vParamDecl[i]->getTypeIdPtr()->getId();
}
//
// if (routekey.empty() && vParamDecl[i]->isRouteKey())
// {
// routekey = vParamDecl[i]->getTypeIdPtr()->getId();
// }
}

View File

@ -1054,20 +1054,11 @@ namespace p
}
};
//for enum
template<typename D>
struct strto2
{
D operator()(const string &sStr, typename std::enable_if<!std::is_enum<D>::value, void ***>::type dummy = 0)
{
istringstream sBuffer(sStr);
D t;
sBuffer >> t;
return t;
}
D operator()(const string &sStr, typename std::enable_if<std::is_enum<D>::value, void ***>::type dummy = 0)
D operator()(const string &sStr)
{
istringstream sBuffer(sStr);
int i;
@ -1075,10 +1066,27 @@ namespace p
return (D)i;
}
};
//for class
template<typename D>
struct strto3
{
D operator()(const string &sStr)
{
istringstream sBuffer(sStr);
D t;
sBuffer >> t;
return t;
}
};
//for string
template<>
struct strto2<string>
struct strto3<string>
{
const string &operator()(const string &sStr)
{
@ -1091,7 +1099,9 @@ namespace p
template<typename T>
T TC_Common::strto(const string &sStr)
{
using strto_type = typename std::conditional<std::is_arithmetic<T>::value, p::strto1<T>, p::strto2<T>>::type;
using strto_enum_type = typename std::conditional<std::is_enum<T>::value, p::strto2<T>, p::strto3<T>>::type;
using strto_type = typename std::conditional<std::is_arithmetic<T>::value, p::strto1<T>, strto_enum_type>::type;
return strto_type()(sStr);
}

View File

@ -296,6 +296,7 @@ public:
* @param recv
*/
inline void push_back(const shared_ptr<RecvContext> &recv ) { _rbuffer.push_back(recv); }
inline void push_back(const deque<shared_ptr<RecvContext>> &recv ) { _rbuffer.push_back(recv); }
/**
*
@ -335,6 +336,7 @@ public:
* @param recv
*/
void insertRecvQueue(const shared_ptr<RecvContext> &recv);
void insertRecvQueue(const deque<shared_ptr<RecvContext>> &recv);
/**
*
@ -427,7 +429,7 @@ public:
/**
* wait time for queue
*/
int64_t _iWaitTime = 10000;
int64_t _iWaitTime = 3000;
};
////////////////////////////////////////////////////////////////////////////
@ -663,6 +665,7 @@ public:
TC_NetWorkBuffer::PACKET_TYPE onParserCallback(TC_NetWorkBuffer& buff, TC_Transceiver *trans);
std::shared_ptr<TC_OpenSSL> onOpensslCallback(TC_Transceiver* trans);
void onCompleteNetworkCallback(TC_Transceiver* trans);
bool handleOutputImp(const shared_ptr<TC_Epoller::EpollInfo> &data);
bool handleInputImp(const shared_ptr<TC_Epoller::EpollInfo> &data);
@ -728,6 +731,11 @@ public:
*/
list<shared_ptr<SendContext>> _messages;
/**
*
*/
deque<shared_ptr<RecvContext>> _recv;
/**
* message队列中消息内存大小
*/
@ -1223,6 +1231,7 @@ public:
* @param force (, close事件)
*/
void insertRecvQueue(const shared_ptr<RecvContext> & recv, bool force = false);
void insertRecvQueue(const deque<shared_ptr<RecvContext>> & recv);
/**
*

View File

@ -94,13 +94,13 @@ public:
*
* @return
*/
inline int fd() { return _fd; }
inline int fd() const { return _fd; }
/**
*
* @return
*/
inline bool valid() { return _fd != INVALID_SOCKET; }
inline bool valid() const { return _fd != INVALID_SOCKET; }
/**
* cookie和析构器, EpollInfo析构时调用

View File

@ -22,8 +22,8 @@
namespace tars
{
using namespace std;
using namespace std;
/////////////////////////////////////////////////
/**
* @file tc_ex.h

View File

@ -320,7 +320,14 @@ public:
//Conversion of JSON string to JSON structure
static JsonValuePtr getValue(const string & str);
static JsonValuePtr getValue(const vector<char>& buf);
// 两个json串合并
static string mergeJson(const string& json1, const string& json2);
static void mergeJson(const string& json1, const string& json2, string& jsonRet);
private:
static void mergeObj(JsonValuePtr from, JsonValuePtr to, vector<string>& path);
//string 类型到json字符串
//string type to json string
static void writeString(const JsonValueStringPtr & p, string& ostr);

View File

@ -109,7 +109,7 @@ public:
*
* @param cmd
* @param err
* @return
* @return (2k的输出长度)
*/
static std::string exec(const char* cmd);
@ -117,7 +117,7 @@ public:
* (+)
* @param cmd
* @param err
* @return:
* @return: (2k的输出长度)
*/
static std::string exec(const char* cmd, std::string &err);

View File

@ -71,6 +71,18 @@ public:
*/
virtual ~TC_TimerBase();
/**
*
* @return
*/
size_t count();
/**
*
* @return
*/
size_t repeatCount();
/**
* @brief fireMillseconds时间执行
* @param fireMillseconds, ()
@ -145,7 +157,7 @@ public:
/**
*
*/
*/
int64_t nextTimer() const { return _nextTimer; }
/**

View File

@ -153,6 +153,8 @@ public:
using onparser_callback = std::function<TC_NetWorkBuffer::PACKET_TYPE(TC_NetWorkBuffer&, TC_Transceiver*)> ;
//完整解析完一个包之后的回调
using oncompletepackage_callback = std::function<void(TC_Transceiver*)> ;
//完整一次网络解析之后回调(一般有多次解析onparser_callback 以及 oncompletepackage_callback 之后回调, 通常在业务层可以在这个函数中可以把解析的数据一次性写入到队列中)
using oncompletenetwork_callback = std::function<void(TC_Transceiver*)> ;
//cient侧: 发送鉴权包的回调, 业务层在回调里面组织鉴权包
using onclientsendauth_callback = std::function<shared_ptr<TC_NetWorkBuffer::Buffer>(TC_Transceiver*)> ;
//client侧: 收到鉴权包的的回调, 业务层解包(注意返回PACKET_FULL, 表示鉴权成功)
@ -160,6 +162,9 @@ public:
//server侧: 验证鉴权包并返回验证包的回调
using onserververifyauth_callback = std::function<pair<TC_NetWorkBuffer::PACKET_TYPE, shared_ptr<TC_NetWorkBuffer::Buffer>>(TC_NetWorkBuffer &, TC_Transceiver*)> ;
//网络层单次时间收发包最长时间(毫秒), 为了避免网络层一直在收包, 没机会发送包, 默认就1ms, 加速范文
static uint64_t LONG_NETWORK_TRANS_TIME;
/**
*
* @param epoller
@ -178,13 +183,14 @@ public:
*
* connect时创建的
*/
void initializeClient(const oncreate_callback &oncreate,
const onclose_callback &onclose,
const onconnect_callback &onconnect,
const onrequest_callback &onrequest,
const onparser_callback &onparser,
const onopenssl_callback &onopenssl,
const oncompletepackage_callback &onfinish = oncompletepackage_callback());
void initializeClient(const oncreate_callback &oncreate,
const onclose_callback &onclose,
const onconnect_callback &onconnect,
const onrequest_callback &onrequest,
const onparser_callback &onparser,
const onopenssl_callback &onopenssl,
const oncompletepackage_callback &onfinish = oncompletepackage_callback(),
const oncompletenetwork_callback &onfinishAll = oncompletenetwork_callback());
/**
*
@ -195,7 +201,8 @@ public:
const onrequest_callback &onrequest,
const onparser_callback &onparser,
const onopenssl_callback &onopenssl,
const oncompletepackage_callback &onfinish = oncompletepackage_callback());
const oncompletepackage_callback &onfinish = oncompletepackage_callback(),
const oncompletenetwork_callback &onfinishAll = oncompletenetwork_callback());
/**
* (, )
@ -629,8 +636,10 @@ protected:
oncompletepackage_callback _onCompletePackageCallback;
oncompletenetwork_callback _onCompleteNetworkCallback;
onclientsendauth_callback _onClientSendAuthCallback;
onclientverifyauth_callback _onClientVerifyAuthCallback;
onserververifyauth_callback _onServerVerifyAuthCallback;

View File

@ -1018,7 +1018,6 @@ void TC_Common::getRandomHexChars(char *p, unsigned int len)
}
#endif
string TC_Common::nextDate(const string &sDate)

View File

@ -91,6 +91,31 @@ void TC_EpollServer::DataBuffer::insertRecvQueue(const shared_ptr<RecvContext> &
}
}
void TC_EpollServer::DataBuffer::insertRecvQueue(const deque<shared_ptr<RecvContext>> &recv)
{
if (recv.empty())
{
return;
}
_iRecvBufferSize += recv.size();
getDataQueue(recv.back()->fd())->push_back(recv);
if (_schedulers[0] != NULL)
{
//存在调度器, 处于协程中
if (isQueueMode())
{
_schedulers[index(recv.back()->fd())]->notify();
}
else
{
_schedulers[index(rand())]->notify();
}
}
}
bool TC_EpollServer::DataBuffer::wait(uint32_t handleIndex)
{
return getDataQueue(handleIndex)->wait(_iWaitTime);
@ -527,9 +552,11 @@ void TC_EpollServer::Connection::initialize(TC_Epoller *epoller, unsigned int ui
#endif
_trans->initializeServer(std::bind(&Connection::onCloseCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3),
std::bind(&Connection::onRequestCallback, this, std::placeholders::_1),
std::bind(&Connection::onParserCallback, this, std::placeholders::_1, std::placeholders::_2),
std::bind(&Connection::onOpensslCallback, this, std::placeholders::_1));
std::bind(&Connection::onRequestCallback, this, std::placeholders::_1),
std::bind(&Connection::onParserCallback, this, std::placeholders::_1, std::placeholders::_2),
std::bind(&Connection::onOpensslCallback, this, std::placeholders::_1),
TC_Transceiver::oncompletepackage_callback(),
std::bind(&Connection::onCompleteNetworkCallback, this, std::placeholders::_1));
_trans->setServerAuthCallback(_pBindAdapter->_onVerifyCallback);
@ -717,6 +744,7 @@ TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::Connection::onParserCallback(TC_Ne
}
}
rbuf.setConnection(this);
vector<char> ro;
@ -731,13 +759,24 @@ TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::Connection::onParserCallback(TC_Ne
//收到完整的包才算
this->_bEmptyConn = false;
_recv.push_back(recv);
//收到完整包
insertRecvQueue(recv);
// insertRecvQueue(recv);
}
return ret;
}
void TC_EpollServer::Connection::onCompleteNetworkCallback(TC_Transceiver *trans)
{
_pBindAdapter->insertRecvQueue(_recv);
//收到完整包
// insertRecvQueue(_recv);
_recv.clear();
}
int TC_EpollServer::Connection::sendBufferDirect(const char* buff, size_t length)
{
_pBindAdapter->increaseSendBufferSize();
@ -1466,6 +1505,29 @@ void TC_EpollServer::BindAdapter::insertRecvQueue(const shared_ptr<RecvContext>
}
}
void TC_EpollServer::BindAdapter::insertRecvQueue(const deque<shared_ptr<RecvContext>> &recv)
{
int iRet = isOverloadorDiscard();
if (iRet == 0) //未过载
{
_dataBuffer->insertRecvQueue(recv);
}
else if (iRet == -1) //超过队列长度4/5需要进行overload处理
{
for(auto r : recv)
{
r->setOverload();
}
_dataBuffer->insertRecvQueue(recv);
}
else //接受队列满,需要丢弃
{
_epollServer->error("[BindAdapter::insertRecvQueue] overload discard package");
}
}
TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::BindAdapter::echo_protocol(TC_NetWorkBuffer &r, vector<char> &o)
{
o = r.getBuffers();

View File

@ -641,6 +641,56 @@ bool TC_Json::isspace(char c)
return false;
}
// 两个json串合并
string TC_Json::mergeJson(const string& json1, const string& json2)
{
string ret;
mergeJson(json1, json2, ret);
return ret;
}
void TC_Json::mergeJson(const string& json1, const string& json2, string& jsonRet)
{
JsonValuePtr p1 =TC_Json::getValue(json1);
JsonValuePtr p2 =TC_Json::getValue(json2);
if (p1->getType() != eJsonTypeObj || p2->getType() != eJsonTypeObj)
{
throw TC_Json_Exception("Error: mergeing json string must be two json object string");
}
vector<string> path;
mergeObj(p2, p1, path);
jsonRet = TC_Json::writeValue(p1);
}
void TC_Json::mergeObj(JsonValuePtr from, JsonValuePtr to, vector<string>& path)
{
if (from->getType() != eJsonTypeObj)
{
JsonValuePtr tmp = to;
for (size_t i = 0; i < path.size() - 1; i++)
{
JsonValueObjPtr obj = JsonValueObjPtr::dynamicCast(tmp);
if (obj->value.find(path[i]) == obj->value.end())
{
JsonValuePtr p = new JsonValueObj();
obj->value[path[i]] = p;
}
tmp = obj->value[path[i]];
}
JsonValueObjPtr::dynamicCast(tmp)->value[path[path.size()-1]] = from;
path.pop_back();
}
else
{
JsonValueObjPtr fromObj = JsonValueObjPtr::dynamicCast(from);
for (auto it = fromObj->value.begin(); it != fromObj->value.end(); it++)
{
path.push_back(it->first);
mergeObj(it->second, to, path);
}
}
}
//////////////////////////////////////////////////////
void TC_JsonWriteOstream::writeValue(const JsonValuePtr & p, ostream& ostr, bool withSpace)
{

View File

@ -17,9 +17,16 @@
#include "util/tc_port.h"
#include "util/tc_common.h"
#include "util/tc_logger.h"
#include "util/tc_file.h"
#include "util/tc_platform.h"
#include <thread>
#include <string.h>
#if TARGET_PLATFORM_LINUX
#include <sys/vfs.h>
#include <sys/sysinfo.h>
#endif
#if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS
#include <signal.h>
#include <limits.h>

View File

@ -34,6 +34,20 @@ void TC_TimerBase::clear()
_nextTimer = -1;
}
size_t TC_TimerBase::count()
{
std::lock_guard<std::mutex> lock(_mutex);
return _mapEvent.size();
}
size_t TC_TimerBase::repeatCount()
{
std::lock_guard<std::mutex> lock(_mutex);
return _repeatIds.size();
}
uint32_t TC_TimerBase::genUniqueId()
{
uint32_t i = ++_increaseId;

File diff suppressed because it is too large Load Diff