From fe25b3aed408cbb7a35812e9ae140b7025a81d23 Mon Sep 17 00:00:00 2001 From: ruanshudong Date: Tue, 22 Mar 2022 16:01:51 +0800 Subject: [PATCH] Fix: there is no business interface callback if there is no available node during asynchronous call Optimize: In the service model, after receiving data, it is inserted into the queue in batch Add: tc_json adds the function of merge Optimize: the time occupation when parsing the protocol at the network layer, and release the network thread every 1ms to reduce the delay --- servant/libservant/AdapterProxy.cpp | 2 +- servant/libservant/Application.cpp | 5 +- servant/libservant/AsyncProcThread.cpp | 5 - servant/libservant/Communicator.cpp | 23 +- servant/libservant/CommunicatorEpoll.cpp | 9 +- servant/libservant/Current.cpp | 2 +- servant/servant/ServantProxy.h | 6 +- tools/tars2cpp/tars2cpp.cpp | 12 +- util/include/util/tc_common.h | 36 +- util/include/util/tc_epoll_server.h | 11 +- util/include/util/tc_epoller.h | 4 +- util/include/util/tc_ex.h | 2 +- util/include/util/tc_json.h | 7 + util/include/util/tc_port.h | 4 +- util/include/util/tc_timer.h | 14 +- util/include/util/tc_transceiver.h | 27 +- util/src/tc_common.cpp | 1 - util/src/tc_epoll_server.cpp | 70 +- util/src/tc_json.cpp | 50 + util/src/tc_port.cpp | 7 + util/src/tc_timer.cpp | 14 + util/src/tc_transceiver.cpp | 1807 +++++++++++----------- 22 files changed, 1170 insertions(+), 948 deletions(-) diff --git a/servant/libservant/AdapterProxy.cpp b/servant/libservant/AdapterProxy.cpp index 8783a0a..4003bf5 100755 --- a/servant/libservant/AdapterProxy.cpp +++ b/servant/libservant/AdapterProxy.cpp @@ -186,7 +186,7 @@ TC_NetWorkBuffer::PACKET_TYPE AdapterProxy::onParserCallback(TC_NetWorkBuffer& b } catch(exception &ex) { - TLOGERROR("[AdapterProxy::onParserCallback parser error:" << ex.what() << "," << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl); + TLOG_ERROR(ex.what() << ", obj: " << _objectProxy->name() << ", desc:" << _trans->getConnectionString()<< endl); } return TC_NetWorkBuffer::PACKET_ERR; diff --git a/servant/libservant/Application.cpp b/servant/libservant/Application.cpp index 670e071..089e9e6 100644 --- a/servant/libservant/Application.cpp +++ b/servant/libservant/Application.cpp @@ -698,8 +698,9 @@ void Application::main(int argc, char *argv[]) void Application::main(const TC_Option &option) { - //直接输出编译的TARS版本 - if(option.hasParam("version")) + __out__.modFlag(0xfffff, false); + //直接输出编译的TAF版本 + if (option.hasParam("version")) { __out__.debug() << "TARS:" << TARS_VERSION << endl; exit(0); diff --git a/servant/libservant/AsyncProcThread.cpp b/servant/libservant/AsyncProcThread.cpp index a453b37..0cf9279 100644 --- a/servant/libservant/AsyncProcThread.cpp +++ b/servant/libservant/AsyncProcThread.cpp @@ -119,15 +119,10 @@ void AsyncProcThread::callback(ReqMessage * msg) pServantProxyThreadData->_data._dyeing = msg->data._dyeing; pServantProxyThreadData->_data._dyeingKey = msg->data._dyeingKey; pServantProxyThreadData->_data._cookie = msg->data._cookie; -//======= -// pServantProxyThreadData->_dyeing = msg->bDyeing; -// pServantProxyThreadData->_dyeingKey = msg->sDyeingKey; pServantProxyThreadData->_traceCall = msg->bTraceCall; pServantProxyThreadData->initTrace(msg->sTraceKey); -// pServantProxyThreadData->_cookie = msg->cookie; -//>>>>>>> origin/delay if(msg->adapter) { diff --git a/servant/libservant/Communicator.cpp b/servant/libservant/Communicator.cpp index b5faab1..c34df8b 100644 --- a/servant/libservant/Communicator.cpp +++ b/servant/libservant/Communicator.cpp @@ -709,20 +709,21 @@ void Communicator::terminate() void Communicator::pushAsyncThreadQueue(ReqMessage * msg) { - if(msg->pObjectProxy->getRootServantProxy()->_callback) { - ReqMessagePtr msgPtr = msg; - - msg->pObjectProxy->getRootServantProxy()->_callback(msgPtr); - } - else if (msg->pObjectProxy->getRootServantProxy()->_callbackHash) + if (msg->pObjectProxy->getRootServantProxy()->_callback) + { + ReqMessagePtr msgPtr = msg; + msg->pObjectProxy->getRootServantProxy()->_callback(msgPtr); + } + else if (msg->pObjectProxy->getRootServantProxy()->_callbackHash && msg->adapter ) { //先不考虑每个线程队列数目不一致的情况 - _asyncThread[((uint32_t)msg->adapter->trans()->fd()) % _asyncThreadNum]->push_back(msg); + _asyncThread[((uint32_t) msg->adapter->trans()->fd()) % _asyncThreadNum]->push_back(msg); + } + else + { + //先不考虑每个线程队列数目不一致的情况 + _asyncThread[(_asyncSeq++) % _asyncThreadNum]->push_back(msg); } - else { - //先不考虑每个线程队列数目不一致的情况 - _asyncThread[(_asyncSeq++) % _asyncThreadNum]->push_back(msg); - } } void Communicator::doStat() diff --git a/servant/libservant/CommunicatorEpoll.cpp b/servant/libservant/CommunicatorEpoll.cpp index 640e068..74eab9c 100755 --- a/servant/libservant/CommunicatorEpoll.cpp +++ b/servant/libservant/CommunicatorEpoll.cpp @@ -538,28 +538,25 @@ bool CommunicatorEpoll::handleNotify(const shared_ptr &da ReqMessage * msg = NULL; - size_t maxProcessCount = 0; - try { + int64_t now = TNOWMS; + while (pFDInfo->msgQueue->pop_front(msg)) { msg->pObjectProxy->invoke(msg); - if(++maxProcessCount > 1000) + if(TNOWMS - now >= 1) { //避免包太多的时候, 循环占用网路线程, 导致连接都建立不上, 一个包都无法发送出去 data->mod(EPOLLOUT); - TLOGTARS("[CommunicatorEpoll::handle max process count: " << maxProcessCount << ", fd:" << data->fd() << "]" << endl); break; } } if (pFDInfo->msgQueue->empty() && pFDInfo->autoDestroy) { -// LOG_CONSOLE_DEBUG << "iSeq:" << pFDInfo->iSeq << ", fd:" << pFDInfo->notify.notifyFd() << endl; - delete pFDInfo; return false; } diff --git a/servant/libservant/Current.cpp b/servant/libservant/Current.cpp index df0cccf..5e60a2a 100644 --- a/servant/libservant/Current.cpp +++ b/servant/libservant/Current.cpp @@ -58,7 +58,7 @@ Current::~Current() const string &Current::getHostName() const { auto it = _request.context.find("node_name"); - if(it != _request.context.end()) + if(it != _request.context.end() && !(it->second.empty()) ) { return it->second; } diff --git a/servant/servant/ServantProxy.h b/servant/servant/ServantProxy.h index 03ab2c9..2d5f102 100644 --- a/servant/servant/ServantProxy.h +++ b/servant/servant/ServantProxy.h @@ -337,7 +337,11 @@ public: } if (flags.size() >= 2) { - maxLen = max(maxLen, TC_Common::strto(flags[1])); + // TODO(greatsong): std::max Win32下编译有问题, 添加NOMAXMIN宏不起作用 + //maxLen = std::max(maxLen, TC_Common::strto(flags[1])); + auto f = TC_Common::strto(flags[1]); + if (maxLen < f) + maxLen = f; } // type = strtol(tid.substr(0, pos).c_str(), NULL, 16); diff --git a/tools/tars2cpp/tars2cpp.cpp b/tools/tars2cpp/tars2cpp.cpp index 97297e3..cb741a5 100755 --- a/tools/tars2cpp/tars2cpp.cpp +++ b/tools/tars2cpp/tars2cpp.cpp @@ -1636,17 +1636,17 @@ string Tars2Cpp::generateServantDispatch(const OperationPtr& pPtr, const string& vector& vParamDecl = pPtr->getAllParamDeclPtr(); - string routekey; +// string routekey; for(size_t i = 0; i < vParamDecl.size(); i++) { s << TAB << tostr(vParamDecl[i]->getTypeIdPtr()->getTypePtr()) << " "<< vParamDecl[i]->getTypeIdPtr()->getId() << generateInitValue(vParamDecl[i]->getTypeIdPtr()) << ";" << endl; - - if (routekey.empty() && vParamDecl[i]->isRouteKey()) - { - routekey = vParamDecl[i]->getTypeIdPtr()->getId(); - } +// +// if (routekey.empty() && vParamDecl[i]->isRouteKey()) +// { +// routekey = vParamDecl[i]->getTypeIdPtr()->getId(); +// } } diff --git a/util/include/util/tc_common.h b/util/include/util/tc_common.h index 69858ff..3f0015e 100644 --- a/util/include/util/tc_common.h +++ b/util/include/util/tc_common.h @@ -1054,20 +1054,11 @@ namespace p } }; + //for enum template struct strto2 { - D operator()(const string &sStr, typename std::enable_if::value, void ***>::type dummy = 0) - { - istringstream sBuffer(sStr); - D t; - - sBuffer >> t; - - return t; - } - - D operator()(const string &sStr, typename std::enable_if::value, void ***>::type dummy = 0) + D operator()(const string &sStr) { istringstream sBuffer(sStr); int i; @@ -1075,10 +1066,27 @@ namespace p return (D)i; } + }; + //for class + template + struct strto3 + { + D operator()(const string &sStr) + { + istringstream sBuffer(sStr); + D t; + + sBuffer >> t; + + return t; + } + }; + + //for string template<> - struct strto2 + struct strto3 { const string &operator()(const string &sStr) { @@ -1091,7 +1099,9 @@ namespace p template T TC_Common::strto(const string &sStr) { - using strto_type = typename std::conditional::value, p::strto1, p::strto2>::type; + using strto_enum_type = typename std::conditional::value, p::strto2, p::strto3>::type; + + using strto_type = typename std::conditional::value, p::strto1, strto_enum_type>::type; return strto_type()(sStr); } diff --git a/util/include/util/tc_epoll_server.h b/util/include/util/tc_epoll_server.h index 07c1b1f..c425768 100755 --- a/util/include/util/tc_epoll_server.h +++ b/util/include/util/tc_epoll_server.h @@ -296,6 +296,7 @@ public: * @param recv */ inline void push_back(const shared_ptr &recv ) { _rbuffer.push_back(recv); } + inline void push_back(const deque> &recv ) { _rbuffer.push_back(recv); } /** * 在队列上等待 @@ -335,6 +336,7 @@ public: * @param recv */ void insertRecvQueue(const shared_ptr &recv); + void insertRecvQueue(const deque> &recv); /** * 等待在队列上 @@ -427,7 +429,7 @@ public: /** * wait time for queue */ - int64_t _iWaitTime = 10000; + int64_t _iWaitTime = 3000; }; //////////////////////////////////////////////////////////////////////////// @@ -663,6 +665,7 @@ public: TC_NetWorkBuffer::PACKET_TYPE onParserCallback(TC_NetWorkBuffer& buff, TC_Transceiver *trans); std::shared_ptr onOpensslCallback(TC_Transceiver* trans); + void onCompleteNetworkCallback(TC_Transceiver* trans); bool handleOutputImp(const shared_ptr &data); bool handleInputImp(const shared_ptr &data); @@ -728,6 +731,11 @@ public: */ list> _messages; + /** + * 接收到数据 + */ + deque> _recv; + /** * message队列中消息内存大小 */ @@ -1223,6 +1231,7 @@ public: * @param force 强制必须插入(无论是否过载, 比如close事件) */ void insertRecvQueue(const shared_ptr & recv, bool force = false); + void insertRecvQueue(const deque> & recv); /** * 接收队列的大小 diff --git a/util/include/util/tc_epoller.h b/util/include/util/tc_epoller.h index 715498e..34813ce 100755 --- a/util/include/util/tc_epoller.h +++ b/util/include/util/tc_epoller.h @@ -94,13 +94,13 @@ public: * 句柄 * @return */ - inline int fd() { return _fd; } + inline int fd() const { return _fd; } /** * 是否有效 * @return */ - inline bool valid() { return _fd != INVALID_SOCKET; } + inline bool valid() const { return _fd != INVALID_SOCKET; } /** * 设置cookie和析构器, 可以在EpollInfo析构时调用 diff --git a/util/include/util/tc_ex.h b/util/include/util/tc_ex.h index adae534..f2801d1 100644 --- a/util/include/util/tc_ex.h +++ b/util/include/util/tc_ex.h @@ -22,8 +22,8 @@ namespace tars { -using namespace std; +using namespace std; ///////////////////////////////////////////////// /** * @file tc_ex.h diff --git a/util/include/util/tc_json.h b/util/include/util/tc_json.h index 65eee4d..c9576b3 100755 --- a/util/include/util/tc_json.h +++ b/util/include/util/tc_json.h @@ -320,7 +320,14 @@ public: //Conversion of JSON string to JSON structure static JsonValuePtr getValue(const string & str); static JsonValuePtr getValue(const vector& buf); + + // 两个json串合并 + static string mergeJson(const string& json1, const string& json2); + static void mergeJson(const string& json1, const string& json2, string& jsonRet); private: + + static void mergeObj(JsonValuePtr from, JsonValuePtr to, vector& path); + //string 类型到json字符串 //string type to json string static void writeString(const JsonValueStringPtr & p, string& ostr); diff --git a/util/include/util/tc_port.h b/util/include/util/tc_port.h index 646e04e..b93fcee 100755 --- a/util/include/util/tc_port.h +++ b/util/include/util/tc_port.h @@ -109,7 +109,7 @@ public: * 运行一个脚本 * @param cmd * @param err - * @return 程序的标准输出 + * @return 程序的标准输出(最大2k的输出长度) */ static std::string exec(const char* cmd); @@ -117,7 +117,7 @@ public: * 运行一个脚本(程序+命令行) * @param cmd * @param err - * @return: 程序的标准输出 + * @return: 程序的标准输出(最大2k的输出长度) */ static std::string exec(const char* cmd, std::string &err); diff --git a/util/include/util/tc_timer.h b/util/include/util/tc_timer.h index 26fd90d..b32a9af 100644 --- a/util/include/util/tc_timer.h +++ b/util/include/util/tc_timer.h @@ -71,6 +71,18 @@ public: */ virtual ~TC_TimerBase(); + /** + * 事件总个数 + * @return + */ + size_t count(); + + /** + * 重复事件个数 + * @return + */ + size_t repeatCount(); + /** * @brief 指定fireMillseconds时间执行 * @param fireMillseconds, 触发时间(毫秒) @@ -145,7 +157,7 @@ public: /** * 下一次定时器的时间 - */ + */ int64_t nextTimer() const { return _nextTimer; } /** diff --git a/util/include/util/tc_transceiver.h b/util/include/util/tc_transceiver.h index 418d83a..51fdc8f 100644 --- a/util/include/util/tc_transceiver.h +++ b/util/include/util/tc_transceiver.h @@ -153,6 +153,8 @@ public: using onparser_callback = std::function ; //完整解析完一个包之后的回调 using oncompletepackage_callback = std::function ; + //完整一次网络解析之后回调(一般有多次解析onparser_callback 以及 oncompletepackage_callback 之后回调, 通常在业务层可以在这个函数中可以把解析的数据一次性写入到队列中) + using oncompletenetwork_callback = std::function ; //cient侧: 发送鉴权包的回调, 业务层在回调里面组织鉴权包 using onclientsendauth_callback = std::function(TC_Transceiver*)> ; //client侧: 收到鉴权包的的回调, 业务层解包(注意返回PACKET_FULL, 表示鉴权成功) @@ -160,6 +162,9 @@ public: //server侧: 验证鉴权包并返回验证包的回调 using onserververifyauth_callback = std::function>(TC_NetWorkBuffer &, TC_Transceiver*)> ; + //网络层单次时间收发包最长时间(毫秒), 为了避免网络层一直在收包, 没机会发送包, 默认就1ms, 加速范文 + static uint64_t LONG_NETWORK_TRANS_TIME; + /** * 构造函数 * @param epoller @@ -178,13 +183,14 @@ public: * 初始化客户端的连接 * 句柄是connect时创建的 */ - void initializeClient(const oncreate_callback &oncreate, - const onclose_callback &onclose, - const onconnect_callback &onconnect, - const onrequest_callback &onrequest, - const onparser_callback &onparser, - const onopenssl_callback &onopenssl, - const oncompletepackage_callback &onfinish = oncompletepackage_callback()); + void initializeClient(const oncreate_callback &oncreate, + const onclose_callback &onclose, + const onconnect_callback &onconnect, + const onrequest_callback &onrequest, + const onparser_callback &onparser, + const onopenssl_callback &onopenssl, + const oncompletepackage_callback &onfinish = oncompletepackage_callback(), + const oncompletenetwork_callback &onfinishAll = oncompletenetwork_callback()); /** * @@ -195,7 +201,8 @@ public: const onrequest_callback &onrequest, const onparser_callback &onparser, const onopenssl_callback &onopenssl, - const oncompletepackage_callback &onfinish = oncompletepackage_callback()); + const oncompletepackage_callback &onfinish = oncompletepackage_callback(), + const oncompletenetwork_callback &onfinishAll = oncompletenetwork_callback()); /** * 设置绑定地址(对客户端有效, 服务端本身就是绑定的) @@ -629,8 +636,10 @@ protected: oncompletepackage_callback _onCompletePackageCallback; + oncompletenetwork_callback _onCompleteNetworkCallback; + onclientsendauth_callback _onClientSendAuthCallback; - + onclientverifyauth_callback _onClientVerifyAuthCallback; onserververifyauth_callback _onServerVerifyAuthCallback; diff --git a/util/src/tc_common.cpp b/util/src/tc_common.cpp index bb4fffd..946c0c0 100644 --- a/util/src/tc_common.cpp +++ b/util/src/tc_common.cpp @@ -1018,7 +1018,6 @@ void TC_Common::getRandomHexChars(char *p, unsigned int len) } - #endif string TC_Common::nextDate(const string &sDate) diff --git a/util/src/tc_epoll_server.cpp b/util/src/tc_epoll_server.cpp index e0b0d12..516e05e 100644 --- a/util/src/tc_epoll_server.cpp +++ b/util/src/tc_epoll_server.cpp @@ -91,6 +91,31 @@ void TC_EpollServer::DataBuffer::insertRecvQueue(const shared_ptr & } } +void TC_EpollServer::DataBuffer::insertRecvQueue(const deque> &recv) +{ + if (recv.empty()) + { + return; + } + + _iRecvBufferSize += recv.size(); + + getDataQueue(recv.back()->fd())->push_back(recv); + + if (_schedulers[0] != NULL) + { + //存在调度器, 处于协程中 + if (isQueueMode()) + { + _schedulers[index(recv.back()->fd())]->notify(); + } + else + { + _schedulers[index(rand())]->notify(); + } + } +} + bool TC_EpollServer::DataBuffer::wait(uint32_t handleIndex) { return getDataQueue(handleIndex)->wait(_iWaitTime); @@ -527,9 +552,11 @@ void TC_EpollServer::Connection::initialize(TC_Epoller *epoller, unsigned int ui #endif _trans->initializeServer(std::bind(&Connection::onCloseCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), - std::bind(&Connection::onRequestCallback, this, std::placeholders::_1), - std::bind(&Connection::onParserCallback, this, std::placeholders::_1, std::placeholders::_2), - std::bind(&Connection::onOpensslCallback, this, std::placeholders::_1)); + std::bind(&Connection::onRequestCallback, this, std::placeholders::_1), + std::bind(&Connection::onParserCallback, this, std::placeholders::_1, std::placeholders::_2), + std::bind(&Connection::onOpensslCallback, this, std::placeholders::_1), + TC_Transceiver::oncompletepackage_callback(), + std::bind(&Connection::onCompleteNetworkCallback, this, std::placeholders::_1)); _trans->setServerAuthCallback(_pBindAdapter->_onVerifyCallback); @@ -717,6 +744,7 @@ TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::Connection::onParserCallback(TC_Ne } } + rbuf.setConnection(this); vector ro; @@ -731,13 +759,24 @@ TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::Connection::onParserCallback(TC_Ne //收到完整的包才算 this->_bEmptyConn = false; + _recv.push_back(recv); //收到完整包 - insertRecvQueue(recv); + // insertRecvQueue(recv); } return ret; } +void TC_EpollServer::Connection::onCompleteNetworkCallback(TC_Transceiver *trans) +{ + _pBindAdapter->insertRecvQueue(_recv); + + //收到完整包 + // insertRecvQueue(_recv); + + _recv.clear(); +} + int TC_EpollServer::Connection::sendBufferDirect(const char* buff, size_t length) { _pBindAdapter->increaseSendBufferSize(); @@ -1466,6 +1505,29 @@ void TC_EpollServer::BindAdapter::insertRecvQueue(const shared_ptr } } +void TC_EpollServer::BindAdapter::insertRecvQueue(const deque> &recv) +{ + int iRet = isOverloadorDiscard(); + + if (iRet == 0) //未过载 + { + _dataBuffer->insertRecvQueue(recv); + } + else if (iRet == -1) //超过队列长度4/5,需要进行overload处理 + { + for(auto r : recv) + { + r->setOverload(); + } + + _dataBuffer->insertRecvQueue(recv); + } + else //接受队列满,需要丢弃 + { + _epollServer->error("[BindAdapter::insertRecvQueue] overload discard package"); + } +} + TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::BindAdapter::echo_protocol(TC_NetWorkBuffer &r, vector &o) { o = r.getBuffers(); diff --git a/util/src/tc_json.cpp b/util/src/tc_json.cpp index 41bb2ea..e33c8ee 100755 --- a/util/src/tc_json.cpp +++ b/util/src/tc_json.cpp @@ -641,6 +641,56 @@ bool TC_Json::isspace(char c) return false; } +// 两个json串合并 +string TC_Json::mergeJson(const string& json1, const string& json2) +{ + string ret; + mergeJson(json1, json2, ret); + return ret; +} + +void TC_Json::mergeJson(const string& json1, const string& json2, string& jsonRet) +{ + JsonValuePtr p1 =TC_Json::getValue(json1); + JsonValuePtr p2 =TC_Json::getValue(json2); + if (p1->getType() != eJsonTypeObj || p2->getType() != eJsonTypeObj) + { + throw TC_Json_Exception("Error: mergeing json string must be two json object string"); + } + vector path; + mergeObj(p2, p1, path); + jsonRet = TC_Json::writeValue(p1); +} + +void TC_Json::mergeObj(JsonValuePtr from, JsonValuePtr to, vector& path) +{ + if (from->getType() != eJsonTypeObj) + { + JsonValuePtr tmp = to; + for (size_t i = 0; i < path.size() - 1; i++) + { + JsonValueObjPtr obj = JsonValueObjPtr::dynamicCast(tmp); + if (obj->value.find(path[i]) == obj->value.end()) + { + JsonValuePtr p = new JsonValueObj(); + obj->value[path[i]] = p; + } + tmp = obj->value[path[i]]; + } + JsonValueObjPtr::dynamicCast(tmp)->value[path[path.size()-1]] = from; + path.pop_back(); + } + else + { + JsonValueObjPtr fromObj = JsonValueObjPtr::dynamicCast(from); + for (auto it = fromObj->value.begin(); it != fromObj->value.end(); it++) + { + path.push_back(it->first); + mergeObj(it->second, to, path); + } + } +} + ////////////////////////////////////////////////////// void TC_JsonWriteOstream::writeValue(const JsonValuePtr & p, ostream& ostr, bool withSpace) { diff --git a/util/src/tc_port.cpp b/util/src/tc_port.cpp index ff5f73e..3dec26c 100755 --- a/util/src/tc_port.cpp +++ b/util/src/tc_port.cpp @@ -17,9 +17,16 @@ #include "util/tc_port.h" #include "util/tc_common.h" #include "util/tc_logger.h" +#include "util/tc_file.h" +#include "util/tc_platform.h" #include #include +#if TARGET_PLATFORM_LINUX +#include +#include +#endif + #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS #include #include diff --git a/util/src/tc_timer.cpp b/util/src/tc_timer.cpp index 3803bd0..0ee9c5b 100644 --- a/util/src/tc_timer.cpp +++ b/util/src/tc_timer.cpp @@ -34,6 +34,20 @@ void TC_TimerBase::clear() _nextTimer = -1; } +size_t TC_TimerBase::count() +{ + std::lock_guard lock(_mutex); + + return _mapEvent.size(); +} + +size_t TC_TimerBase::repeatCount() +{ + std::lock_guard lock(_mutex); + + return _repeatIds.size(); +} + uint32_t TC_TimerBase::genUniqueId() { uint32_t i = ++_increaseId; diff --git a/util/src/tc_transceiver.cpp b/util/src/tc_transceiver.cpp index a45f2aa..f00aa8b 100644 --- a/util/src/tc_transceiver.cpp +++ b/util/src/tc_transceiver.cpp @@ -8,431 +8,442 @@ namespace tars { - class CloseClourse +class CloseClourse +{ +public: + CloseClourse(TC_Transceiver* trans, TC_Transceiver::CloseReason reason, const string& err) : _trans(trans), + _reason(reason), + _err(err) { - public: - CloseClourse(TC_Transceiver *trans, TC_Transceiver::CloseReason reason, const string &err) : _trans(trans), _reason(reason), _err(err) - {} + } - ~CloseClourse() { - _trans->tcpClose(false, _reason, _err); - } - protected: - TC_Transceiver *_trans; - TC_Transceiver::CloseReason _reason; - string _err; - }; + ~CloseClourse() + { + _trans->tcpClose(false, _reason, _err); + } + +protected: + TC_Transceiver* _trans; + TC_Transceiver::CloseReason _reason; + string _err; +}; #define THROW_ERROR(x, r, y) { CloseClourse c(this, r, y); THROW_EXCEPTION_SYSCODE(x, y); } - static const int BUFFER_SIZE = 16 * 1024; - +static const int BUFFER_SIZE = 16 * 1024; +uint64_t TC_Transceiver::LONG_NETWORK_TRANS_TIME = 1; /////////////////////////////////////////////////////////////////////// - int TC_Transceiver::createSocket(bool udp, bool isLocal, bool isIpv6) - { +int TC_Transceiver::createSocket(bool udp, bool isLocal, bool isIpv6) +{ #if TARGET_PLATFORM_WINDOWS - int domain = (isIpv6 ? PF_INET6 : PF_INET); + int domain = (isIpv6 ? PF_INET6 : PF_INET); #else - int domain = isLocal ? PF_LOCAL : (isIpv6 ? PF_INET6 : PF_INET); + int domain = isLocal ? PF_LOCAL : (isIpv6 ? PF_INET6 : PF_INET); #endif - int type = udp ? SOCK_DGRAM : SOCK_STREAM; + int type = udp ? SOCK_DGRAM : SOCK_STREAM; - TC_Socket s; - s.createSocket(type, domain); + TC_Socket s; + s.createSocket(type, domain); - if(!udp) + if (!udp) + { + if (!isLocal) { - if(!isLocal) - { - s.setTcpNoDelay(); - s.setKeepAlive(); - s.setNoCloseWait(); - } + s.setTcpNoDelay(); + s.setKeepAlive(); + s.setNoCloseWait(); } - else - { - s.setRecvBufferSize(512*1024); - s.setSendBufferSize(512*1024); - } - - s.setOwner(false); - s.setblock(false); - return s.getfd(); + } + else + { + s.setRecvBufferSize(512 * 1024); + s.setSendBufferSize(512 * 1024); } - bool TC_Transceiver::doConnect(int fd, const struct sockaddr *addr, socklen_t len) + s.setOwner(false); + s.setblock(false); + return s.getfd(); +} + +bool TC_Transceiver::doConnect(int fd, const struct sockaddr* addr, socklen_t len) +{ + bool bConnected = false; + + int iRet = ::connect(fd, addr, len); + + if (iRet == 0) { - bool bConnected = false; - - int iRet = ::connect(fd, addr, len); - - if (iRet == 0) - { - bConnected = true; - } - else if (!TC_Socket::isInProgress()) - { - THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect error, " + _desc);//, TC_Exception::getSystemCode()); - } + bConnected = true; + } + else if (!TC_Socket::isInProgress()) + { + THROW_ERROR(TC_Transceiver_Exception, CR_Connect, + "connect error, " + _desc);//, TC_Exception::getSystemCode()); + } // LOG_CONSOLE_DEBUG << bConnected << endl; return bConnected; } - TC_Transceiver::TC_Transceiver(TC_Epoller* epoller, const TC_Endpoint &ep) - : _epoller(epoller) - , _ep(ep) - , _desc(ep.toString()) - , _fd(-1) - , _connStatus(eUnconnected) - , _sendBuffer(this) - , _recvBuffer(this) - , _authState(eAuthInit) +TC_Transceiver::TC_Transceiver(TC_Epoller* epoller, const TC_Endpoint& ep) + : _epoller(epoller), _ep(ep), _desc(ep.toString()), _fd(-1), _connStatus(eUnconnected), _sendBuffer(this), + _recvBuffer(this), _authState(eAuthInit) +{ + // LOG_CONSOLE_DEBUG << endl; + if (ep.isUdp()) { - // LOG_CONSOLE_DEBUG << endl; - if (ep.isUdp()) - { - _pRecvBuffer = std::make_shared(); - _nRecvBufferSize = DEFAULT_RECV_BUFFERSIZE; - _pRecvBuffer->alloc(_nRecvBufferSize); - } + _pRecvBuffer = std::make_shared(); + _nRecvBufferSize = DEFAULT_RECV_BUFFERSIZE; + _pRecvBuffer->alloc(_nRecvBufferSize); + } // _serverAddr = TC_Socket::createSockAddr(_ep.getHost().c_str()); - _serverAddr = TC_Socket::createSockAddr(_ep); - } + _serverAddr = TC_Socket::createSockAddr(_ep); +} - TC_Transceiver::~TC_Transceiver() +TC_Transceiver::~TC_Transceiver() +{ + if (!isValid()) return; + + if (_ep.isTcp()) { - if(!isValid()) return; - - if(_ep.isTcp()) - { - tcpClose(true, CR_DECONSTRUCTOR, ""); - } - else - { - udpClose(); - } + tcpClose(true, CR_DECONSTRUCTOR, ""); } - - void TC_Transceiver::initializeClient(const oncreate_callback &oncreate, - const onclose_callback &onclose, - const onconnect_callback &onconnect, - const onrequest_callback &onrequest, - const onparser_callback &onparser, - const onopenssl_callback &onopenssl, - const oncompletepackage_callback &onfinish) + else { - _isServer = false; - - _createSocketCallback = oncreate; - - _onConnectCallback = onconnect; - - _onRequestCallback = onrequest; - - _onCloseCallback = onclose; - - _onParserCallback = onparser; - - _onCompletePackageCallback = onfinish; - - _onOpensslCallback = onopenssl; - + udpClose(); } +} - void TC_Transceiver::initializeServer(const onclose_callback &onclose, - const onrequest_callback &onrequest, - const onparser_callback &onparser, - const onopenssl_callback &onopenssl, - const oncompletepackage_callback &onfinish) - { - _isServer = true; +void TC_Transceiver::initializeClient(const oncreate_callback& oncreate, + const onclose_callback& onclose, + const onconnect_callback& onconnect, + const onrequest_callback& onrequest, + const onparser_callback& onparser, + const onopenssl_callback& onopenssl, + const oncompletepackage_callback& onfinish, + const oncompletenetwork_callback& onfinishAll) +{ + _isServer = false; - _connStatus = eConnected; + _createSocketCallback = oncreate; - _onRequestCallback = onrequest; + _onConnectCallback = onconnect; - _onCloseCallback = onclose; + _onRequestCallback = onrequest; - _onParserCallback = onparser; + _onCloseCallback = onclose; - _onCompletePackageCallback = onfinish; + _onParserCallback = onparser; - _onOpensslCallback = onopenssl; + _onCompletePackageCallback = onfinish; + + _onOpensslCallback = onopenssl; + + _onCompleteNetworkCallback = onfinishAll; + +} + +void TC_Transceiver::initializeServer(const onclose_callback& onclose, + const onrequest_callback& onrequest, + const onparser_callback& onparser, + const onopenssl_callback& onopenssl, + const oncompletepackage_callback& onfinish, + const oncompletenetwork_callback& onfinishAll) +{ + _isServer = true; + + _connStatus = eConnected; + + _onRequestCallback = onrequest; + + _onCloseCallback = onclose; + + _onParserCallback = onparser; + + _onCompletePackageCallback = onfinish; + + _onOpensslCallback = onopenssl; + + _onCompleteNetworkCallback = onfinishAll; #if TARS_SSL - if (isSSL()) + if (isSSL()) + { + _openssl = _onOpensslCallback(this); + if (!_openssl) + { + THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error]"); + } + + _openssl->init(true); + + _openssl->recvBuffer()->setConnection(this); + + int ret = _openssl->doHandshake(_sendBuffer); + if (ret != 0) { - _openssl = _onOpensslCallback(this); - if (!_openssl) - { - THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error]"); - } - - _openssl->init(true); - - _openssl->recvBuffer()->setConnection(this); - - int ret = _openssl->doHandshake(_sendBuffer); - if (ret != 0) - { - THROW_ERROR(TC_Transceiver_Exception, CR_SSL_HANDSHAKE, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error: " + _openssl->getErrMsg() + "]"); - } - - // send the encrypt data from write buffer - if (!_sendBuffer.empty()) - { - doRequest(); - } + THROW_ERROR(TC_Transceiver_Exception, CR_SSL_HANDSHAKE, "[TC_Transceiver::initializeServer create '" + _desc + "' ssl client error: " + _openssl->getErrMsg() + "]"); } + + // send the encrypt data from write buffer + if (!_sendBuffer.empty()) + { + doRequest(); + } + } #endif +} + +void TC_Transceiver::setClientAuthCallback(const onclientsendauth_callback& onsendauth, + const onclientverifyauth_callback& onverifyauth) +{ + _onClientSendAuthCallback = onsendauth; + + _onClientVerifyAuthCallback = onverifyauth; +} + +void TC_Transceiver::setServerAuthCallback(const onserververifyauth_callback& onverifyauth) +{ + _onServerVerifyAuthCallback = onverifyauth; +} + +void TC_Transceiver::setBindAddr(const char* host) +{ + if (_isServer) + { + THROW_ERROR(TC_Transceiver_Exception, CR_Type, + "setBindAddr(" + string(host) + ") only use in client, " + _desc); + } + _bindAddr = TC_Socket::createSockAddr(host); +} + +void TC_Transceiver::setBindAddr(const TC_Socket::addr_type& bindAddr) +{ + if (_isServer) + { + THROW_ERROR(TC_Transceiver_Exception, CR_Type, "setBindAddr only use in client, " + _desc); + } + _bindAddr = bindAddr; +} + +shared_ptr TC_Transceiver::bindFd(int fd) +{ + if (!_isServer) + { + THROW_ERROR(TC_Transceiver_Exception, CR_Type, "client should not call bindFd, " + _desc); + } + _connStatus = eConnected; + + _fd = fd; + + //设置套接口选项 + for (size_t i = 0; i < _socketOpts.size(); ++i) + { + setsockopt(_fd, _socketOpts[i].level, _socketOpts[i].optname, (const char*)_socketOpts[i].optval, + _socketOpts[i].optlen); } - void TC_Transceiver::setClientAuthCallback(const onclientsendauth_callback &onsendauth, const onclientverifyauth_callback &onverifyauth) - { - _onClientSendAuthCallback = onsendauth; + _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str()); - _onClientVerifyAuthCallback = onverifyauth; - } + getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second); - void TC_Transceiver::setServerAuthCallback(const onserververifyauth_callback &onverifyauth) - { - _onServerVerifyAuthCallback = onverifyauth; - } + _epollInfo = _epoller->createEpollInfo(_fd); - void TC_Transceiver::setBindAddr(const char *host) + return _epollInfo; +} + +void TC_Transceiver::setUdpRecvBuffer(size_t nSize) +{ + _nRecvBufferSize = nSize; + _pRecvBuffer->alloc(_nRecvBufferSize); +} + +void TC_Transceiver::checkConnect() +{ + //检查连接是否有错误 + if (isConnecting()) { - if(_isServer) + int iVal = 0; + SOCKET_LEN_TYPE iLen = static_cast(sizeof(int)); + int ret = ::getsockopt(_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast(&iVal), &iLen); + + if (ret < 0 || iVal) { - THROW_ERROR(TC_Transceiver_Exception, CR_Type, "setBindAddr(" + string(host) + ") only use in client, " + _desc); - } - _bindAddr = TC_Socket::createSockAddr(host); - } - - void TC_Transceiver::setBindAddr(const TC_Socket::addr_type &bindAddr) - { - if(_isServer) - { - THROW_ERROR(TC_Transceiver_Exception, CR_Type,"setBindAddr only use in client, " + _desc); - } - _bindAddr = bindAddr; - } - - shared_ptr TC_Transceiver::bindFd(int fd) - { - if(!_isServer) - { - THROW_ERROR(TC_Transceiver_Exception, CR_Type, "client should not call bindFd, " + _desc); - } - _connStatus = eConnected; - - _fd = fd; - - //设置套接口选项 - for(size_t i=0; i< _socketOpts.size(); ++i) - { - setsockopt(_fd,_socketOpts[i].level,_socketOpts[i].optname, (const char*)_socketOpts[i].optval,_socketOpts[i].optlen); + string err = TC_Exception::parseError(iVal); + THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect " + _desc + " error:" + err); } _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str()); getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second); - _epollInfo = _epoller->createEpollInfo(_fd); - - return _epollInfo; - } - - void TC_Transceiver::setUdpRecvBuffer(size_t nSize) - { - _nRecvBufferSize = nSize; - _pRecvBuffer->alloc(_nRecvBufferSize); - } - - void TC_Transceiver::checkConnect() - { - //检查连接是否有错误 - if(isConnecting()) + if (_bindAddr.first) { - int iVal = 0; - SOCKET_LEN_TYPE iLen = static_cast(sizeof(int)); - int ret = ::getsockopt(_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast(&iVal), &iLen); + //如果服务器终止后,服务器可以第二次快速启动而不用等待一段时间 + int iReuseAddr = 1; - if (ret < 0 || iVal) - { - string err = TC_Exception::parseError(iVal); - THROW_ERROR(TC_Transceiver_Exception, CR_Connect, "connect " + _desc + " error:" + err); - } + setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&iReuseAddr, sizeof(int)); - _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str()); - - getpeername(_fd, _clientAddr.first.get(), &_clientAddr.second); - - if(_bindAddr.first) - { - //如果服务器终止后,服务器可以第二次快速启动而不用等待一段时间 - int iReuseAddr = 1; - - setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&iReuseAddr, sizeof(int)); - - ::bind(_fd, _bindAddr.first.get(), _bindAddr.second); - } - setConnected(); + ::bind(_fd, _bindAddr.first.get(), _bindAddr.second); } + setConnected(); } +} - void TC_Transceiver::parseConnectAddress() - { +void TC_Transceiver::parseConnectAddress() +{ #if !TARGET_PLATFORM_WINDOWS - if(isUnixLocal()) - { - TC_Socket::parseUnixLocalAddr(getConnectEndpoint().getHost().c_str(), *(sockaddr_un*)_serverAddr.first.get()); - } - else + if (isUnixLocal()) + { + TC_Socket::parseUnixLocalAddr(getConnectEndpoint().getHost().c_str(), + *(sockaddr_un*)_serverAddr.first.get()); + } + else #endif - { - if (isConnectIPv6()) - { - TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), - *(sockaddr_in6*)_serverAddr.first.get()); - } - else - { - TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), - *(sockaddr_in*)_serverAddr.first.get()); - } - } - } - - bool TC_Transceiver::isSSL() const { - return _ep.isSSL(); - } - - void TC_Transceiver::connect() - { - if(_isServer) + if (isConnectIPv6()) { - THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call connect, " + _desc); - } - - if(isValid()) - { - return; - } - - if(_connStatus == eConnecting || _connStatus == eConnected) - { - return; - } - - if (_ep.isUdp()) - { - _fd = createSocket(true, isUnixLocal(), isConnectIPv6()); - - _connStatus = eConnected; - - _epollInfo = _epoller->createEpollInfo(_fd); - - _proxyInfo = _createSocketCallback(this); - if(_proxyInfo) - { - _desc = _proxyInfo->getEndpoint().toString(); - } - - //每次连接前都重新解析一下地址, 避免dns变了! - parseConnectAddress(); + TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), + *(sockaddr_in6*)_serverAddr.first.get()); } else { - _fd = createSocket(false, isUnixLocal(), isConnectIPv6()); - - _isConnTimeout = false; - - _epollInfo = _epoller->createEpollInfo(_fd); - - _connTimerId = _epoller->postDelayed(_connTimeout, std::bind(&TC_Transceiver::checkConnectTimeout, this)); - - _proxyInfo = _createSocketCallback(this); - if(_proxyInfo) - { - _desc = _proxyInfo->getEndpoint().toString(); - } - - //每次连接前都重新解析一下地址, 避免dns变了! - parseConnectAddress(); - - bool bConnected = doConnect(_fd, _serverAddr.first.get(), _serverAddr.second); - if(bConnected) - { - setConnected(); - } - else - { - _connStatus = TC_Transceiver::eConnecting; - } - } - - //设置套接口选项 - for(size_t i=0; i< _socketOpts.size(); ++i) - { - setsockopt(_fd,_socketOpts[i].level,_socketOpts[i].optname, (const char*)_socketOpts[i].optval,_socketOpts[i].optlen); + TC_Socket::parseAddrWithPort(getConnectEndpoint().getHost(), getConnectEndpoint().getPort(), + *(sockaddr_in*)_serverAddr.first.get()); } } +} - void TC_Transceiver::checkConnectTimeout() +bool TC_Transceiver::isSSL() const +{ + return _ep.isSSL(); +} + +void TC_Transceiver::connect() +{ + if (_isServer) { - if(_connStatus != eConnected) - { - _isConnTimeout = true; - THROW_ERROR(TC_Transceiver_Exception, CR_ConnectTimeout, "connect timeout, " + _desc); - } + THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call connect, " + _desc); } - void TC_Transceiver::setConnected() + if (isValid()) { - if(_isServer) - { - THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call setConnected, " + _desc); - } + return; + } + + if (_connStatus == eConnecting || _connStatus == eConnected) + { + return; + } + + if (_ep.isUdp()) + { + _fd = createSocket(true, isUnixLocal(), isConnectIPv6()); + _connStatus = eConnected; - if(_proxyInfo) + _epollInfo = _epoller->createEpollInfo(_fd); + + _proxyInfo = _createSocketCallback(this); + if (_proxyInfo) { - connectProxy(); + _desc = _proxyInfo->getEndpoint().toString(); + } + + //每次连接前都重新解析一下地址, 避免dns变了! + parseConnectAddress(); + } + else + { + _fd = createSocket(false, isUnixLocal(), isConnectIPv6()); + + _isConnTimeout = false; + + _epollInfo = _epoller->createEpollInfo(_fd); + + _connTimerId = _epoller->postDelayed(_connTimeout, std::bind(&TC_Transceiver::checkConnectTimeout, this)); + + _proxyInfo = _createSocketCallback(this); + if (_proxyInfo) + { + _desc = _proxyInfo->getEndpoint().toString(); + } + + //每次连接前都重新解析一下地址, 避免dns变了! + parseConnectAddress(); + + bool bConnected = doConnect(_fd, _serverAddr.first.get(), _serverAddr.second); + if (bConnected) + { + setConnected(); } else { - onSetConnected(); + _connStatus = TC_Transceiver::eConnecting; } } - void TC_Transceiver::onSetConnected() + //设置套接口选项 + for (size_t i = 0; i < _socketOpts.size(); ++i) { - if(_isServer) - { - THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onSetConnected, " + _desc); - } - onConnect(); - - _onConnectCallback(this); - - if (!isSSL()) - { - doAuthReq(); - } + setsockopt(_fd, _socketOpts[i].level, _socketOpts[i].optname, (const char*)_socketOpts[i].optval, + _socketOpts[i].optlen); } +} - void TC_Transceiver::onConnect() +void TC_Transceiver::checkConnectTimeout() +{ + if (_connStatus != eConnected) { - if(_isServer) - { - THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onConnect, " + _desc); - } + _isConnTimeout = true; + THROW_ERROR(TC_Transceiver_Exception, CR_ConnectTimeout, "connect timeout, " + _desc); + } +} + +void TC_Transceiver::setConnected() +{ + if (_isServer) + { + THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call setConnected, " + _desc); + } + _connStatus = eConnected; + + if (_proxyInfo) + { + connectProxy(); + } + else + { + onSetConnected(); + } +} + +void TC_Transceiver::onSetConnected() +{ + if (_isServer) + { + THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onSetConnected, " + _desc); + } + onConnect(); + + _onConnectCallback(this); + + if (!isSSL()) + { + doAuthReq(); + } +} + +void TC_Transceiver::onConnect() +{ + if (_isServer) + { + THROW_ERROR(TC_Transceiver_Exception, CR_Type, "server should not call onConnect, " + _desc); + } _epoller->erase(_connTimerId); _connTimerId = 0; @@ -446,42 +457,42 @@ namespace tars return; } - _openssl->init(false); + _openssl->init(false); - _openssl->setReadBufferSize(1024 * 8); - _openssl->setWriteBufferSize(1024 * 8); + _openssl->setReadBufferSize(1024 * 8); + _openssl->setWriteBufferSize(1024 * 8); - _openssl->recvBuffer()->setConnection(this); + _openssl->recvBuffer()->setConnection(this); - int ret = _openssl->doHandshake(_sendBuffer); - if (ret != 0) - { - THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "ssl hande shake failed, " + _desc + ", error:" + _openssl->getErrMsg()); - } - - // send the encrypt data from write buffer - if (!_sendBuffer.empty()) - { - doRequest(); - } - - return; + int ret = _openssl->doHandshake(_sendBuffer); + if (ret != 0) + { + THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "ssl hande shake failed, " + _desc + ", error:" + _openssl->getErrMsg()); } + + // send the encrypt data from write buffer + if (!_sendBuffer.empty()) + { + doRequest(); + } + + return; + } #endif - } +} - void TC_Transceiver::doAuthReq() +void TC_Transceiver::doAuthReq() +{ + if (_ep.getAuthType() == TC_Endpoint::AUTH_TYPENONE) { - if (_ep.getAuthType() == TC_Endpoint::AUTH_TYPENONE) - { - _authState = eAuthSucc; - _onRequestCallback(this); - } - else - { - //如果是客户端, 则主动发起鉴权请求 - shared_ptr buff = _onClientSendAuthCallback(this); + _authState = eAuthSucc; + _onRequestCallback(this); + } + else + { + //如果是客户端, 则主动发起鉴权请求 + shared_ptr buff = _onClientSendAuthCallback(this); #if TARS_SSL if(this->isSSL()) @@ -499,86 +510,88 @@ namespace tars } #else - _sendBuffer.addBuffer(buff); -#endif - - doRequest(); - } - } - - void TC_Transceiver::connectProxy() - { - assert(_proxyInfo); - - vector buff; - - bool succ = _proxyInfo->sendProxyPacket(buff, _ep); - if(!succ) - { - THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_SEND, "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg()); - } _sendBuffer.addBuffer(buff); +#endif doRequest(); } +} - int TC_Transceiver::doCheckProxy(const char *buff, size_t length) +void TC_Transceiver::connectProxy() +{ + assert(_proxyInfo); + + vector buff; + + bool succ = _proxyInfo->sendProxyPacket(buff, _ep); + if (!succ) { - if(!_proxyInfo || _proxyInfo->isSuccess()) - return 0; + THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_SEND, + "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg()); + } + _sendBuffer.addBuffer(buff); - bool succ = _proxyInfo->recvProxyPacket(buff, length); - if(!succ) - { - THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_RECV, "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg()); - } + doRequest(); +} - if(!_proxyInfo->isSuccess()) - { - connectProxy(); - } - else - { - onSetConnected(); - } +int TC_Transceiver::doCheckProxy(const char* buff, size_t length) +{ + if (!_proxyInfo || _proxyInfo->isSuccess()) + return 0; - return 1; + bool succ = _proxyInfo->recvProxyPacket(buff, length); + if (!succ) + { + THROW_ERROR(TC_Transceiver_Exception, CR_PROXY_RECV, + "connect to proxy, " + _desc + ", error:" + _proxyInfo->getErrMsg()); } - void TC_Transceiver::udpClose() + if (!_proxyInfo->isSuccess()) { - if (_ep.isUdp()) - { - _epoller->releaseEpollInfo(_epollInfo); - - _epollInfo.reset(); - - TC_Port::closeSocket(_fd); - - _fd = -1; - - _connStatus = eUnconnected; - - _sendBuffer.clearBuffers(); - - _recvBuffer.clearBuffers(); - } + connectProxy(); + } + else + { + onSetConnected(); } - void TC_Transceiver::close() + return 1; +} + +void TC_Transceiver::udpClose() +{ + if (_ep.isUdp()) { + _epoller->releaseEpollInfo(_epollInfo); + + _epollInfo.reset(); + + TC_Port::closeSocket(_fd); + + _fd = -1; + + _connStatus = eUnconnected; + + _sendBuffer.clearBuffers(); + + _recvBuffer.clearBuffers(); + } +} + +void TC_Transceiver::close() +{ // LOG_CONSOLE_DEBUG << this << endl; - if(!isValid()) return; + if (!isValid()) return; - if(_ep.isTcp()) - { - tcpClose(false, CR_ACTIVE, "active call"); - } - else - { - udpClose(); - } + if (_ep.isTcp()) + { + tcpClose(false, CR_ACTIVE, "active call"); } + else + { + udpClose(); + } +} void TC_Transceiver::tcpClose(bool deconstructor, CloseReason reason, const string &err) { @@ -594,88 +607,93 @@ namespace tars //LOG_CONSOLE_DEBUG << this << ", " << _fd << ", " << reason << ", " << err << ", " << deconstructor << endl; - if(_connTimerId != 0) { - _epoller->erase(_connTimerId); - _connTimerId = 0; - } + if (_connTimerId != 0) + { + _epoller->erase(_connTimerId); + _connTimerId = 0; + } - _epoller->releaseEpollInfo(_epollInfo); + _epoller->releaseEpollInfo(_epollInfo); - _epollInfo.reset(); + _epollInfo.reset(); - TC_Port::closeSocket(_fd); + TC_Port::closeSocket(_fd); - _fd = -1; + _fd = -1; - _connStatus = eUnconnected; + _connStatus = eUnconnected; - _sendBuffer.clearBuffers(); + _sendBuffer.clearBuffers(); - _recvBuffer.clearBuffers(); + _recvBuffer.clearBuffers(); - _authState = eAuthInit; + _authState = eAuthInit; - if(!deconstructor) - { - //注意必须放在最后, 主要避免_onCloseCallback里面析构了链接, 从而导致又进入tcpClose - //放在最后就不会有问题了, 因为不会再进入这个函数 - _onCloseCallback(this, reason, err); - } + if (!deconstructor) + { + //注意必须放在最后, 主要避免_onCloseCallback里面析构了链接, 从而导致又进入tcpClose + //放在最后就不会有问题了, 因为不会再进入这个函数 + _onCloseCallback(this, reason, err); } } +} - void TC_Transceiver::doRequest() +void TC_Transceiver::doRequest() +{ + if (!isValid()) return; + + checkConnect(); + + //buf不为空,先发送buffer的内容 + while (!_sendBuffer.empty()) { - if(!isValid()) return ; + auto data = _sendBuffer.getBufferPointer(); + assert(data.first != NULL && data.second != 0); - checkConnect(); + int iRet = this->send(data.first, (uint32_t)data.second, 0); - //buf不为空,先发送buffer的内容 - while(!_sendBuffer.empty()) + if (iRet <= 0) { - auto data = _sendBuffer.getBufferPointer(); - assert(data.first != NULL && data.second != 0); - - int iRet = this->send(data.first, (uint32_t) data.second, 0); - - if (iRet <= 0) - { - return; - } - - _sendBuffer.moveHeader(iRet); + return; } - if(_sendBuffer.empty()) - { - _onRequestCallback(this); - } + _sendBuffer.moveHeader(iRet); } - TC_Transceiver::ReturnStatus TC_Transceiver::sendRequest(const shared_ptr &buff, const TC_Socket::addr_type& addr) + if (_sendBuffer.empty()) { + _onRequestCallback(this); + } +} + +TC_Transceiver::ReturnStatus +TC_Transceiver::sendRequest(const shared_ptr& buff, const TC_Socket::addr_type& addr) +{ // LOG_CONSOLE_DEBUG << buff->length() << endl; - //空数据 直接返回成功 - if(buff->empty()) { - return eRetOk; - } + //空数据 直接返回成功 + if (buff->empty()) + { + return eRetOk; + } - // assert(_sendBuffer.empty()); - //buf不为空, 表示之前的数据还没发送完, 直接返回失败, 等buffer可写了,epoll会通知写事件 - if(!_sendBuffer.empty()) { - //不应该运行到这里 - return eRetNotSend; - } + // assert(_sendBuffer.empty()); + //buf不为空, 表示之前的数据还没发送完, 直接返回失败, 等buffer可写了,epoll会通知写事件 + if (!_sendBuffer.empty()) + { + //不应该运行到这里 + return eRetNotSend; + } - if(eConnected != _connStatus) - { - return eRetNotSend; - } + if (eConnected != _connStatus) + { + return eRetNotSend; + } - if(_proxyInfo && !_proxyInfo->isSuccess()) { - return eRetNotSend; - } + if (_proxyInfo && !_proxyInfo->isSuccess()) + { + return eRetNotSend; + } if (_ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && _authState != eAuthSucc) { @@ -685,8 +703,8 @@ namespace tars return eRetNotSend; } #endif - return eRetNotSend; // 需要鉴权但还没通过,不能发送非认证消息 - } + return eRetNotSend; // 需要鉴权但还没通过,不能发送非认证消息 + } #if TARS_SSL // 握手数据已加密,直接发送,会话数据需加密 @@ -696,349 +714,379 @@ namespace tars return eRetNotSend; } - int ret = _openssl->write(buff->buffer(), (uint32_t) buff->length(), _sendBuffer); - if(ret != 0) - { - close(); - return eRetError; - } - - buff->clear(); - } - else + int ret = _openssl->write(buff->buffer(), (uint32_t) buff->length(), _sendBuffer); + if(ret != 0) { - _sendBuffer.addBuffer(buff); + close(); + return eRetError; } -#else + + buff->clear(); + } + else + { _sendBuffer.addBuffer(buff); + } +#else + _sendBuffer.addBuffer(buff); #endif // LOG_CONSOLE_DEBUG << _sendBuffer.getBufferLength() << endl; - _lastAddr = addr; - do - { - auto data = _sendBuffer.getBufferPointer(); - - int iRet = this->send(data.first, (uint32_t) data.second, 0); - if(iRet < 0) - { - if(!isValid()) - { - _sendBuffer.clearBuffers(); - return eRetError; - } - else - { - return eRetFull; - } - } - - _sendBuffer.moveHeader(iRet); -// assert(iRet != 0); - } - while(!_sendBuffer.empty()); - - return eRetOk; - } - - - void TC_Transceiver::doAuthCheck(TC_NetWorkBuffer *buff) + _lastAddr = addr; + do { - if (!buff->empty() && _ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && _authState != eAuthSucc) + auto data = _sendBuffer.getBufferPointer(); + + int iRet = this->send(data.first, (uint32_t)data.second, 0); + if (iRet < 0) { - TC_NetWorkBuffer::PACKET_TYPE type; - - if(_isServer) + if (!isValid()) { - //验证鉴权 - auto ret = _onServerVerifyAuthCallback(*buff, this); - - type = ret.first; - - if(type == TC_NetWorkBuffer::PACKET_FULL) - { - _authState = eAuthSucc; - //服务器端, 鉴权通过, 可以响应包 - sendRequest(ret.second, _serverAddr); - } + _sendBuffer.clearBuffers(); + return eRetError; } else { - type = _onClientVerifyAuthCallback(*buff, this); - - if(type == TC_NetWorkBuffer::PACKET_FULL) - { - _authState = eAuthSucc; - //客户端, 鉴权通过可以发送业务包了 - _onRequestCallback(this); - } - } - - if(type == TC_NetWorkBuffer::PACKET_ERR) - { - THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "[TC_Transceiver::doProtocolAnalysis, auth error]"); + return eRetFull; } } - } - int TC_Transceiver::doProtocolAnalysis(TC_NetWorkBuffer *buff) + _sendBuffer.moveHeader(iRet); +// assert(iRet != 0); + } while (!_sendBuffer.empty()); + + return eRetOk; +} + + +void TC_Transceiver::doAuthCheck(TC_NetWorkBuffer* buff) +{ + if (!buff->empty() && _ep.isTcp() && _ep.getAuthType() == TC_Endpoint::AUTH_TYPELOCAL && + _authState != eAuthSucc) { - doAuthCheck(buff); + TC_NetWorkBuffer::PACKET_TYPE type; - TC_NetWorkBuffer::PACKET_TYPE ret; - - int packetCount = 0; - - int ioriginal = 0; - int isurplus = 0; - try + if (_isServer) { - do + //验证鉴权 + auto ret = _onServerVerifyAuthCallback(*buff, this); + + type = ret.first; + + if (type == TC_NetWorkBuffer::PACKET_FULL) { - ioriginal = buff->getBuffers().size(); - ret = _onParserCallback(*buff, this); - isurplus = buff->getBuffers().size(); - - if(ret == TC_NetWorkBuffer::PACKET_FULL || ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE) - { - ++packetCount; - } - - if(ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE) { - //full close模式下, 需要关闭连接 - tcpClose(false, CR_PROTOCOL, "protocol full close"); - } - - if(_onCompletePackageCallback) { - //收到一个完整的包 - _onCompletePackageCallback(this); - } - - // 当收到完整包时,解析完包后,buffer没movehead,则报错 - if (ret == TC_NetWorkBuffer::PACKET_FULL && ioriginal == isurplus) - { - ret = TC_NetWorkBuffer::PACKET_FULL_CLOSE; - string err = "parser buffer movehead error, " + _desc; - tcpClose(false, CR_PROTOCOL, err); // 这个地方会将连接关闭,为了方便后期问题定位 - throw TC_Transceiver_Exception(err); - } + _authState = eAuthSucc; + //服务器端, 鉴权通过, 可以响应包 + sendRequest(ret.second, _serverAddr); } - while (ret == TC_NetWorkBuffer::PACKET_FULL); } - catch (exception & ex) { - THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error:" + string(ex.what()) + "]"); - } - catch (...) { - THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error"); - } - - if (ret == TC_NetWorkBuffer::PACKET_ERR) + else { - string err = "parser decode error, " + _desc; - tcpClose(false, CR_PROTOCOL, err); - throw TC_Transceiver_Exception(err); + type = _onClientVerifyAuthCallback(*buff, this); + + if (type == TC_NetWorkBuffer::PACKET_FULL) + { + _authState = eAuthSucc; + //客户端, 鉴权通过可以发送业务包了 + _onRequestCallback(this); + } } - return packetCount; + if (type == TC_NetWorkBuffer::PACKET_ERR) + { + THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "[TC_Transceiver::doProtocolAnalysis, auth error]"); + } } +} + +int TC_Transceiver::doProtocolAnalysis(TC_NetWorkBuffer* buff) +{ + doAuthCheck(buff); + + TC_NetWorkBuffer::PACKET_TYPE ret; + + int packetCount = 0; + + int ioriginal = 0; + int isurplus = 0; + try + { + do + { + ioriginal = buff->getBuffers().size(); + ret = _onParserCallback(*buff, this); + isurplus = buff->getBuffers().size(); + + if (ret == TC_NetWorkBuffer::PACKET_FULL || ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE) + { + ++packetCount; + } + if (_onCompletePackageCallback) + { + //收到一个完整的包 + _onCompletePackageCallback(this); + } + + if (ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE) + { + //full close模式下, 需要关闭连接 + tcpClose(false, CR_PROTOCOL, "protocol full close"); + } + + // 当收到完整包时,解析完包后,buffer没movehead,则报错 + if (ret == TC_NetWorkBuffer::PACKET_FULL && ioriginal == isurplus) + { + ret = TC_NetWorkBuffer::PACKET_FULL_CLOSE; + string err = "parser buffer movehead error, " + _desc; + tcpClose(false, CR_PROTOCOL, err); // 这个地方会将连接关闭,为了方便后期问题定位 + throw TC_Transceiver_Exception(err); + } + + } while (ret == TC_NetWorkBuffer::PACKET_FULL); + + if (_onCompleteNetworkCallback) + { + try + { _onCompleteNetworkCallback(this); } + catch (...) + {} + } + } + catch (exception& ex) + { + if (_onCompleteNetworkCallback) + { + try + { + _onCompleteNetworkCallback(this); + } + catch (...) + { + } + } + THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error:" + string(ex.what()) + "]"); + } + catch (...) + { + if (_onCompleteNetworkCallback) + { + try + { + _onCompleteNetworkCallback(this); + } + catch (...) + { + } + } + THROW_ERROR(TC_Transceiver_Exception, CR_PROTOCOL, "parser decode error"); + } + + if (ret == TC_NetWorkBuffer::PACKET_ERR) + { + string err = "parser decode error, " + _desc; + tcpClose(false, CR_PROTOCOL, err); + throw TC_Transceiver_Exception(err); + } + + return packetCount; +} ////////////////////////////////////////////////////////// - TC_TCPTransceiver::TC_TCPTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep) - : TC_Transceiver(epoller, ep) - { - assert(epoller); - } +TC_TCPTransceiver::TC_TCPTransceiver(TC_Epoller* epoller, const TC_Endpoint& ep) + : TC_Transceiver(epoller, ep) +{ + assert(epoller); +} + + //不同的内存分配机制 #if 0 - bool TC_TCPTransceiver::doResponse() +bool TC_TCPTransceiver::doResponse() { - checkConnect(); +checkConnect(); + +int iRet = 0; +int64_t now = TNOWMS; + +// int packetCount = 0; +do +{ + char buff[BUFFER_SIZE]; + + if ((iRet = this->recv((void*)buff, BUFFER_SIZE, 0)) > 0) + { + int check = doCheckProxy(buff, iRet); + if(check != 0) + { + _recvBuffer.clearBuffers(); + return true; + } + + _recvBuffer.addBuffer(buff, iRet); + + //解析协议 + doProtocolAnalysis(&_recvBuffer); + //收包太多了, 中断一下, 释放线程给send等 + if (TNOWMS - now >= LONG_NETWORK_TRANS_TIME && isValid()) + { + _epollInfo->mod(EPOLLIN | EPOLLOUT); + break; + } + + //接收的数据小于buffer大小, 内核会再次通知你 + if(iRet < BUFFER_SIZE) + { + break; + } + } +} +while (iRet>0); + +if(iRet == 0) +{ + tcpClose(false, CR_PEER_CLOSE, "peer close connection"); +} + +return iRet != 0; +} + +#else + +bool TC_TCPTransceiver::doResponse() +{ + checkConnect(); int iRet = 0; + int64_t now = TNOWMS; - int packetCount = 0; +// int packetCount = 0; do - { - char buff[BUFFER_SIZE]; + { + auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE / 8, BUFFER_SIZE); - if ((iRet = this->recv((void*)buff, BUFFER_SIZE, 0)) > 0) - { - int check = doCheckProxy(buff, iRet); - if(check != 0) - { - _recvBuffer.clearBuffers(); - return true; - } + uint32_t left = (uint32_t)data->left(); - _recvBuffer.addBuffer(buff, iRet); + if ((iRet = this->recv((void*)data->free(), left, 0)) > 0) + { + int check = doCheckProxy(data->free(), iRet); + if (check != 0) + { + _recvBuffer.clearBuffers(); + return true; + } - //解析协议 - packetCount += doProtocolAnalysis(&_recvBuffer); + data->addWriteIdx(iRet); - //收包太多了, 中断一下, 释放线程给send等 - if (packetCount >= 2000 && isValid()) - { - _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT); - break; - } + _recvBuffer.addLength(iRet); - //接收的数据小于buffer大小, 内核会再次通知你 - if(iRet < BUFFER_SIZE) - { - break; - } - } - } - while (iRet>0); + //解析协议 + doProtocolAnalysis(&_recvBuffer); - if(iRet == 0) + //收包太多了, 中断一下, 释放线程给send等 + if (TNOWMS - now >= LONG_NETWORK_TRANS_TIME && isValid()) + { + _epollInfo->mod(EPOLLIN | EPOLLOUT); + break; + } + + //接收的数据小于buffer大小, 内核会再次通知你 + if (iRet < (int)left) + { + break; + } + } + } while (iRet > 0); + + if (iRet == 0) { tcpClose(false, CR_PEER_CLOSE, "peer close connection"); } return iRet != 0; + } -#else - bool TC_TCPTransceiver::doResponse() - { - checkConnect(); - - int iRet = 0; - - int packetCount = 0; - do - { - auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE); - - uint32_t left = (uint32_t)data->left(); - - if ((iRet = this->recv((void*)data->free(), left, 0)) > 0) - { - int check = doCheckProxy(data->free(), iRet); - if(check != 0) - { - _recvBuffer.clearBuffers(); - return true; - } - - data->addWriteIdx(iRet); - - _recvBuffer.addLength(iRet); - - //解析协议 - packetCount += doProtocolAnalysis(&_recvBuffer); - - //收包太多了, 中断一下, 释放线程给send等 - if (packetCount >= 2000 && isValid()) - { - _epollInfo->mod(EPOLLIN | EPOLLOUT); - break; - } - - //接收的数据小于buffer大小, 内核会再次通知你 - if(iRet < (int)left) - { - break; - } - } - } - while (iRet>0); - - if(iRet == 0) - { - tcpClose(false, CR_PEER_CLOSE, "peer close connection"); - } - - return iRet != 0; - - } - #endif - int TC_TCPTransceiver::send(const void* buf, uint32_t len, uint32_t flag) +int TC_TCPTransceiver::send(const void* buf, uint32_t len, uint32_t flag) +{ + //只有是连接状态才能收发数据 + if (eConnected != _connStatus) { - //只有是连接状态才能收发数据 - if(eConnected != _connStatus) - { - return -1; - } + return -1; + } - int iRet = ::send(_fd, (const char*)buf, len, flag); + int iRet = ::send(_fd, (const char*)buf, len, flag); // LOG_CONSOLE_DEBUG << this << ", send, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << ", len:" << len << endl; - if (iRet < 0 && !TC_Socket::isPending()) - { - THROW_ERROR(TC_Transceiver_Exception, CR_SEND, "TC_TCPTransceiver::send, " + _desc + ", fd:" + TC_Common::tostr(_fd)); - } - -#if TARGET_PLATFORM_WINDOWS - if(iRet < 0 && TC_Socket::isPending()) - { - _epollInfo->mod(EPOLLIN | EPOLLOUT); - } -#endif - - return iRet; - } - - int TC_TCPTransceiver::recv(void* buf, uint32_t len, uint32_t flag) + if (iRet < 0 && !TC_Socket::isPending()) { - //只有是连接状态才能收发数据 - if(eConnected != _connStatus) - return -1; - - int iRet = ::recv(_fd, (char*)buf, len, flag); - -// LOG_CONSOLE_DEBUG << this << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << endl; -// LOG_CONSOLE_DEBUG << (const char*)buf << endl; - -// if (iRet == 0 || (iRet < 0 && !TC_Socket::isPending())) - if ((iRet < 0 && !TC_Socket::isPending())) - { - int nerr = TC_Exception::getSystemCode(); - string err = "recv error, errno:" + TC_Common::tostr(nerr) + "," + TC_Exception::parseError(nerr); - THROW_ERROR(TC_Transceiver_Exception, CR_RECV, err + ", " + _desc + ", fd:" + TC_Common::tostr(_fd)); - } -// else if(iRet == 0) -// { -// THROW_ERROR(TC_Transceiver_Exception, CR_PEER_CLOSE, "peer close connection, " + _desc + ", fd:" + TC_Common::tostr(_fd)); -// } + THROW_ERROR(TC_Transceiver_Exception, CR_SEND, + "TC_TCPTransceiver::send, " + _desc + ", fd:" + TC_Common::tostr(_fd)); + } #if TARGET_PLATFORM_WINDOWS - if(iRet < 0 && TC_Socket::isPending()) - { - _epollInfo->mod(EPOLLIN | EPOLLOUT); - } + if(iRet < 0 && TC_Socket::isPending()) + { + _epollInfo->mod(EPOLLIN | EPOLLOUT); + } #endif - return iRet; + return iRet; +} + +int TC_TCPTransceiver::recv(void* buf, uint32_t len, uint32_t flag) +{ + //只有是连接状态才能收发数据 + if (eConnected != _connStatus) + return -1; + + int iRet = ::recv(_fd, (char*)buf, len, flag); + +// LOG_CONSOLE_DEBUG << this << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << endl; + + if ((iRet < 0 && !TC_Socket::isPending())) + { + int nerr = TC_Exception::getSystemCode(); + string err = "recv error, errno:" + TC_Common::tostr(nerr) + "," + TC_Exception::parseError(nerr); + THROW_ERROR(TC_Transceiver_Exception, CR_RECV, err + ", " + _desc + ", fd:" + TC_Common::tostr(_fd)); } + +#if TARGET_PLATFORM_WINDOWS + if(iRet < 0 && TC_Socket::isPending()) + { + _epollInfo->mod(EPOLLIN | EPOLLOUT); + } +#endif + + return iRet; +} ///////////////////////////////////////////////////////////////// #if TARS_SSL - TC_SSLTransceiver::TC_SSLTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep) - : TC_TCPTransceiver(epoller, ep) - { - } +TC_SSLTransceiver::TC_SSLTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep) +: TC_TCPTransceiver(epoller, ep) +{ +} #if 0 - bool TC_SSLTransceiver::doResponse() +bool TC_SSLTransceiver::doResponse() { checkConnect(); int iRet = 0; + int64_t now = TNOWMS; - int packetCount = 0; +// int packetCount = 0; do { - char buff[BUFFER_SIZE] = {0x00}; - if ((iRet = this->recv(buff, BUFFER_SIZE, 0)) > 0) + char buff[BUFFER_SIZE] = {0x00}; + if ((iRet = this->recv(buff, BUFFER_SIZE, 0)) > 0) { - int check = doCheckProxy(buff, iRet); + int check = doCheckProxy(buff, iRet); if(check != 0) { return true; @@ -1046,7 +1094,7 @@ namespace tars const bool preHandshake = _openssl->isHandshaked(); - int ret = _openssl->read(buff, iRet, _sendBuffer); + int ret = _openssl->read(buff, iRet, _sendBuffer); if (ret != 0) { // LOG_CONSOLE_DEBUG << "ret:" << ret << ", " << _openssl->getErrMsg() << endl; @@ -1106,17 +1154,17 @@ namespace tars TC_NetWorkBuffer *rbuf = _openssl->recvBuffer(); //解析协议 - packetCount += doProtocolAnalysis(rbuf); + doProtocolAnalysis(rbuf); //收包太多了, 中断一下, 释放线程给send等 - if (packetCount >= 1000 && isValid()) + if (TNOWMS - now >= LONG_NETWORK_TRANS_TIME && isValid()) { - _epoller->mod(_epollInfo, EPOLLIN | EPOLLOUT); + _epollInfo->mod(EPOLLIN | EPOLLOUT); break; } //接收的数据小于buffer大小, 内核会再次通知你 - if(iRet < BUFFER_SIZE) + if(iRet < BUFFER_SIZE) { break; } @@ -1134,41 +1182,42 @@ namespace tars #else - bool TC_SSLTransceiver::doResponse() +bool TC_SSLTransceiver::doResponse() +{ + checkConnect(); + + int iRet = 0; + + int64_t now = TNOWMS; + + do { - checkConnect(); + auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE); - int iRet = 0; + uint32_t left = (uint32_t)data->left(); - int packetCount = 0; - do + if ((iRet = this->recv((void*)data->free(), left, 0)) > 0) { - auto data = _recvBuffer.getOrCreateBuffer(BUFFER_SIZE/8, BUFFER_SIZE); - - uint32_t left = (uint32_t)data->left(); - - if ((iRet = this->recv((void*)data->free(), left, 0)) > 0) - { - int check = doCheckProxy(data->free(), iRet); + int check = doCheckProxy(data->free(), iRet); if(check != 0) { return true; } - const bool preHandshake = _openssl->isHandshaked(); + const bool preHandshake = _openssl->isHandshaked(); - int ret = _openssl->read(data->free(), iRet, _sendBuffer); + int ret = _openssl->read(data->free(), iRet, _sendBuffer); - if (ret != 0) - { + if (ret != 0) + { // LOG_CONSOLE_DEBUG << "ret:" << ret << ", " << _openssl->getErrMsg() << endl; - THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, SSL_read handshake failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]"); - } - else if(!_sendBuffer.empty()) - { - doRequest(); - } + THROW_ERROR(TC_Transceiver_Exception, CR_SSL, "[TC_SSLTransceiver::doResponse, SSL_read handshake failed: " + _desc + ", info: " + _openssl->getErrMsg() + "]"); + } + else if(!_sendBuffer.empty()) + { + doRequest(); + } if (!_openssl->isHandshaked()) { @@ -1176,41 +1225,41 @@ namespace tars return true; } - if (!preHandshake) + if (!preHandshake) + { + if(_isServer) { - if(_isServer) + _onRequestCallback(this); + } + else + { + //握手完毕, 客户端直接发送鉴权请求 + doAuthReq(); + // doAuthReq失败,会close fd, 这里判断下是否还有效 + if (!isValid()) { - _onRequestCallback(this); + THROW_ERROR(TC_Transceiver_Exception, CR_SSL, + "[TC_SSLTransceiver::doResponse, doAuthReq failed: " + _desc + ", info: " + + _openssl->getErrMsg() + "]"); } else { - //握手完毕, 客户端直接发送鉴权请求 - doAuthReq(); - // doAuthReq失败,会close fd, 这里判断下是否还有效 - if (!isValid()) - { - THROW_ERROR(TC_Transceiver_Exception, CR_SSL, - "[TC_SSLTransceiver::doResponse, doAuthReq failed: " + _desc + ", info: " + - _openssl->getErrMsg() + "]"); - } - else - { // LOG_CONSOLE_DEBUG << "[Transceiver::doResponse prehandshake:" << preHandshake << ", handshake:" << _openssl->isHandshaked() << endl; - } } } + } - TC_NetWorkBuffer *rbuf = _openssl->recvBuffer(); + TC_NetWorkBuffer *rbuf = _openssl->recvBuffer(); - //解析协议 - packetCount += doProtocolAnalysis(rbuf); + //解析协议 + doProtocolAnalysis(rbuf); - //收包太多了, 中断一下, 释放线程给send等 - if (packetCount >= 1000 && isValid()) - { - _epollInfo->mod(EPOLLIN | EPOLLOUT); - break; - } + //收包太多了, 中断一下, 释放线程给send等 + if (TNOWMS - now >= LONG_NETWORK_TRANS_TIME && isValid()) + { + _epollInfo->mod(EPOLLIN | EPOLLOUT); + break; + } //接收的数据小于buffer大小, 内核会再次通知你 if(iRet < left) @@ -1233,104 +1282,100 @@ namespace tars #endif ///////////////////////////////////////////////////////////////// - TC_UDPTransceiver::TC_UDPTransceiver(TC_Epoller* epoller, const TC_Endpoint &ep) - : TC_Transceiver(epoller, ep) - { - } +TC_UDPTransceiver::TC_UDPTransceiver(TC_Epoller* epoller, const TC_Endpoint& ep) + : TC_Transceiver(epoller, ep) +{ +} - TC_UDPTransceiver::~TC_UDPTransceiver() - { - } +TC_UDPTransceiver::~TC_UDPTransceiver() +{ +} - bool TC_UDPTransceiver::doResponse() - { - checkConnect(); +bool TC_UDPTransceiver::doResponse() +{ + checkConnect(); - int iRet = 0; - int packetCount = 0; - do + int iRet = 0; + int64_t now = TNOWMS; + do + { + _recvBuffer.clearBuffers(); + + auto data = _recvBuffer.getOrCreateBuffer(_nRecvBufferSize, _nRecvBufferSize); + + uint32_t left = (uint32_t)data->left(); + + if ((iRet = this->recv((void*)data->free(), left, 0)) > 0) { - _recvBuffer.clearBuffers(); + data->addWriteIdx(iRet); + _recvBuffer.addLength(iRet); - auto data = _recvBuffer.getOrCreateBuffer(_nRecvBufferSize, _nRecvBufferSize); - - uint32_t left = (uint32_t)data->left(); - - if ((iRet = this->recv((void *)data->free(), left, 0)) > 0) + //解析协议 + doProtocolAnalysis(&_recvBuffer); + //收包太多了, 中断一下, 释放线程给send等 + if (TNOWMS - now >= LONG_NETWORK_TRANS_TIME && isValid()) { - data->addWriteIdx(iRet); - _recvBuffer.addLength(iRet); - - //解析协议 - packetCount += doProtocolAnalysis(&_recvBuffer); - - //收包太多了, 中断一下, 释放线程给send等 - if (packetCount >= 1000 && isValid()) - { - _epollInfo->mod(EPOLLIN | EPOLLOUT); - break; - } + _epollInfo->mod(EPOLLIN | EPOLLOUT); + break; } - } - while (iRet > 0); - return iRet != 0; + } + } while (iRet > 0); + + return iRet != 0; +} + +int TC_UDPTransceiver::send(const void* buf, uint32_t len, uint32_t flag) +{ + if (!isValid()) return -1; + + int iRet = 0; + if (_isServer) + { + iRet = ::sendto(_fd, (const char*)buf, len, flag, _lastAddr.first.get(), _lastAddr.second); + } + else + { + iRet = ::sendto(_fd, (const char*)buf, len, flag, _serverAddr.first.get(), _serverAddr.second); } - int TC_UDPTransceiver::send(const void* buf, uint32_t len, uint32_t flag) + if (iRet > 0) { - if(!isValid()) return -1; + //udp只发一次 发送一半也算全部发送成功 + return len; + } - int iRet = 0; - if(_isServer) - { - iRet=::sendto(_fd, (const char*)buf, len, flag, _lastAddr.first.get(), _lastAddr.second); - } - else - { - iRet=::sendto(_fd, (const char*)buf, len, flag, _serverAddr.first.get(), _serverAddr.second); - } + if (iRet < 0 && TC_Socket::isPending()) + { + //EAGAIN, 认为没有发送 + return 0; + } - if(iRet > 0) - { - //udp只发一次 发送一半也算全部发送成功 - return len; - } + return iRet; +} - if (iRet < 0 && TC_Socket::isPending()) +int TC_UDPTransceiver::recv(void* buf, uint32_t len, uint32_t flag) +{ + if (!isValid()) return -1; + + _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str()); + + int iRet = ::recvfrom(_fd, (char*)buf, len, flag, _clientAddr.first.get(), + &_clientAddr.second); //need check from_ip & port + + if (!_isServer) + { + //客户端才会关闭连接, 会重建socket, 服务端不会 + if (iRet < 0 && !TC_Socket::isPending()) { - //EAGAIN, 认为没有发送 + THROW_ERROR(TC_Transceiver_Exception, CR_RECV, + "TC_UDPTransceiver::udp recv, " + _desc + ", fd:" + TC_Common::tostr(_fd)); return 0; } - - return iRet; } - int TC_UDPTransceiver::recv(void* buf, uint32_t len, uint32_t flag) - { - if(!isValid()) return -1; - - _clientAddr = TC_Socket::createSockAddr(_ep.getHost().c_str()); - - int iRet = ::recvfrom(_fd, (char*)buf, len, flag, _clientAddr.first.get(), &_clientAddr.second); //need check from_ip & port - -// if(iRet < 0) -// { -// LOG_CONSOLE_DEBUG << this << ", " << TC_Socket::isPending() << ", " << _isServer << ", recv, fd:" << _fd << ", " << _desc << ", iRet:" << iRet << ", len:" << len << endl; -// } - - if(!_isServer) - { - //客户端才会关闭连接, 会重建socket, 服务端不会 - if (iRet < 0 && !TC_Socket::isPending()) - { - THROW_ERROR(TC_Transceiver_Exception, CR_RECV, "TC_UDPTransceiver::udp recv, " + _desc + ", fd:" + TC_Common::tostr(_fd)); - return 0; - } - } - - return iRet; - } + return iRet; +} ///////////////////////////////////////////////////////////////// }