From cfd4d576c7647a8f31efb9dcd0e31d3b2860faf1 Mon Sep 17 00:00:00 2001 From: ruanshudong Date: Sun, 2 Feb 2020 15:42:56 +0800 Subject: [PATCH] fix coro bug --- cmake/BuildTarsServer.cmake | 4 +- examples/CMakeLists.txt | 5 + examples/CoroutineDemo/AServer/AServant.h | 76 +- examples/CoroutineDemo/AServer/CMakeLists.txt | 2 +- examples/CoroutineDemo/AServer/makefile | 12 - examples/CoroutineDemo/BServer/AServant.h | 687 ------------------ examples/CoroutineDemo/BServer/AServant.tars | 14 - examples/CoroutineDemo/BServer/BServant.h | 76 +- .../CoroutineDemo/BServer/BServantImp.cpp | 2 +- examples/CoroutineDemo/BServer/CMakeLists.txt | 2 +- examples/CoroutineDemo/BServer/makefile | 12 - examples/CoroutineDemo/CMakeLists.txt | 4 + examples/CoroutineDemo/client/BServant.h | 687 ------------------ examples/CoroutineDemo/client/BServant.tars | 15 - examples/CoroutineDemo/client/CMakeLists.txt | 2 +- examples/CoroutineDemo/client/main.cpp | 54 +- examples/CoroutineDemo/client/makefile | 12 - examples/CoroutineDemo/testCoro/BServant.h | 687 ------------------ examples/CoroutineDemo/testCoro/BServant.tars | 15 - .../CoroutineDemo/testCoro/CMakeLists.txt | 2 +- examples/CoroutineDemo/testCoro/main.cpp | 20 +- examples/CoroutineDemo/testCoro/makefile | 12 - .../CoroutineDemo/testParallelCoro/BServant.h | 687 ------------------ .../testParallelCoro/BServant.tars | 15 - .../testParallelCoro/CMakeLists.txt | 2 +- .../CoroutineDemo/testParallelCoro/main.cpp | 151 +--- .../CoroutineDemo/testParallelCoro/makefile | 12 - examples/HttpDemo/HttpClient/CMakeLists.txt | 2 +- examples/HttpDemo/HttpServer/CMakeLists.txt | 2 +- examples/PushDemo/PushClient/CMakeLists.txt | 2 +- examples/PushDemo/PushClient/Makefile | 13 - examples/PushDemo/PushServer/CMakeLists.txt | 2 +- examples/PushDemo/PushServer/makefile | 17 - .../HelloServer/Client/CMakeLists.txt | 2 +- .../HelloServer/Client/main.cpp | 19 - .../HelloServer/Server/CMakeLists.txt | 2 +- .../QuickStartDemo/HelloServer/Server/Hello.h | 60 +- .../ProxyServer/Client/CMakeLists.txt | 2 +- .../ProxyServer/Client/main.cpp | 17 +- .../ProxyServer/Client/makefile | 17 - .../ProxyServer/Server/CMakeLists.txt | 2 +- .../QuickStartDemo/ProxyServer/Server/Proxy.h | 60 +- .../ProxyServer/Server/ProxyImp.cpp | 2 +- .../ProxyServer/Server/makefile | 17 - .../TarsStressClient/CMakeLists.txt | 2 +- .../TarsStressServer/CMakeLists.txt | 2 +- examples/StressDemo/TarsStressServer/Stress.h | 60 +- examples/scripts/run-http.sh | 2 + examples/scripts/run-quick-start.sh | 9 +- servant/libservant/Application.cpp | 6 - servant/libservant/AsyncProcThread.cpp | 17 +- servant/libservant/Communicator.cpp | 81 ++- servant/libservant/CommunicatorEpoll.cpp | 205 ++---- servant/libservant/CoroutineScheduler.cpp | 93 --- servant/servant/AsyncProcThread.h | 3 +- servant/servant/Communicator.h | 32 + servant/servant/CommunicatorEpoll.h | 83 +-- servant/servant/ServantProxy.h | 3 +- tools/tars2cpp/tars2cpp.cpp | 4 +- util/src/tc_epoll_server.cpp | 34 +- 60 files changed, 472 insertions(+), 3671 deletions(-) delete mode 100644 examples/CoroutineDemo/AServer/makefile delete mode 100644 examples/CoroutineDemo/BServer/AServant.h delete mode 100644 examples/CoroutineDemo/BServer/AServant.tars delete mode 100644 examples/CoroutineDemo/BServer/makefile delete mode 100644 examples/CoroutineDemo/client/BServant.h delete mode 100644 examples/CoroutineDemo/client/BServant.tars delete mode 100644 examples/CoroutineDemo/client/makefile delete mode 100644 examples/CoroutineDemo/testCoro/BServant.h delete mode 100644 examples/CoroutineDemo/testCoro/BServant.tars delete mode 100644 examples/CoroutineDemo/testCoro/makefile delete mode 100644 examples/CoroutineDemo/testParallelCoro/BServant.h delete mode 100644 examples/CoroutineDemo/testParallelCoro/BServant.tars delete mode 100644 examples/CoroutineDemo/testParallelCoro/makefile delete mode 100644 examples/PushDemo/PushClient/Makefile delete mode 100644 examples/PushDemo/PushServer/makefile delete mode 100644 examples/QuickStartDemo/ProxyServer/Client/makefile delete mode 100644 examples/QuickStartDemo/ProxyServer/Server/makefile diff --git a/cmake/BuildTarsServer.cmake b/cmake/BuildTarsServer.cmake index fdd858f..4486c2a 100644 --- a/cmake/BuildTarsServer.cmake +++ b/cmake/BuildTarsServer.cmake @@ -1,5 +1,5 @@ -macro(build_tars_server MODULE) +macro(build_tars_server MODULE DEPS) include_directories(./) @@ -50,7 +50,7 @@ macro(build_tars_server MODULE) endif (TARS_LIST) - add_dependencies(${MODULE} tarsservant tarsutil) + add_dependencies(${MODULE} ${DEPS} tarsservant tarsutil) target_link_libraries(${MODULE} tarsservant tarsutil) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 16aee67..40011fb 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -15,3 +15,8 @@ add_custom_target(run-http DEPENDS HttpServer HttpClient COMMAND sh ${CMAKE_CURRENT_SOURCE_DIR}/scripts/run-http.sh COMMENT "call run http") + +add_custom_target(run-co + DEPENDS CoroutineDemoAServer CoroutineDemoBServer CoroutineDemoClient testCoro testParallelCoro + COMMAND sh ${CMAKE_CURRENT_SOURCE_DIR}/scripts/run-co.sh + COMMENT "call run co") diff --git a/examples/CoroutineDemo/AServer/AServant.h b/examples/CoroutineDemo/AServer/AServant.h index 94bd982..f679553 100644 --- a/examples/CoroutineDemo/AServer/AServant.h +++ b/examples/CoroutineDemo/AServer/AServant.h @@ -193,44 +193,6 @@ namespace Test "testStr" }; - pair r = equal_range(__AServant_all, __AServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __AServant_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testInt_exception(response.iRet); - - return response.iRet; - - } - case 2: - { - callback_testStr_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __AServant_all[]= - { - "test", - "testInt", - "testStr" - }; - pair r = equal_range(__AServant_all, __AServant_all+3, request.sFuncName); if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; switch(r.first - __AServant_all) @@ -336,6 +298,44 @@ namespace Test return tars::TARSSERVERNOFUNCERR; } + virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) + { + static ::std::string __AServant_all[]= + { + "test", + "testInt", + "testStr" + }; + + pair r = equal_range(__AServant_all, __AServant_all+3, request.sFuncName); + if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; + switch(r.first - __AServant_all) + { + case 0: + { + callback_test_exception(response.iRet); + + return response.iRet; + + } + case 1: + { + callback_testInt_exception(response.iRet); + + return response.iRet; + + } + case 2: + { + callback_testStr_exception(response.iRet); + + return response.iRet; + + } + } + return tars::TARSSERVERNOFUNCERR; + } + protected: map _mRspContext; }; diff --git a/examples/CoroutineDemo/AServer/CMakeLists.txt b/examples/CoroutineDemo/AServer/CMakeLists.txt index 7a3eece..69fbe70 100644 --- a/examples/CoroutineDemo/AServer/CMakeLists.txt +++ b/examples/CoroutineDemo/AServer/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("AServer") \ No newline at end of file +build_tars_server("CoroutineDemoAServer" "") \ No newline at end of file diff --git a/examples/CoroutineDemo/AServer/makefile b/examples/CoroutineDemo/AServer/makefile deleted file mode 100644 index 9922db3..0000000 --- a/examples/CoroutineDemo/AServer/makefile +++ /dev/null @@ -1,12 +0,0 @@ - -#----------------------------------------------------------------------- -APP := Test -TARGET := AServer -CONFIG := -STRIP_FLAG:= N - -INCLUDE += - -#----------------------------------------------------------------------- -include /usr/local/tars/cpp/makefile/makefile.tars -#----------------------------------------------------------------------- diff --git a/examples/CoroutineDemo/BServer/AServant.h b/examples/CoroutineDemo/BServer/AServant.h deleted file mode 100644 index 94bd982..0000000 --- a/examples/CoroutineDemo/BServer/AServant.h +++ /dev/null @@ -1,687 +0,0 @@ -// ********************************************************************** -// This file was generated by a TARS parser! -// TARS version 1.4.0. -// ********************************************************************** - -#ifndef __ASERVANT_H_ -#define __ASERVANT_H_ - -#include -#include -#include -#include "tup/Tars.h" -#include "tup/TarsJson.h" -using namespace std; -#include "servant/ServantProxy.h" -#include "servant/Servant.h" - - -namespace Test -{ - - /* callback of async proxy for client */ - class AServantPrxCallback: public tars::ServantProxyCallback - { - public: - virtual ~AServantPrxCallback(){} - virtual void callback_test(tars::Int32 ret) - { throw std::runtime_error("callback_test() override incorrect."); } - virtual void callback_test_exception(tars::Int32 ret) - { throw std::runtime_error("callback_test_exception() override incorrect."); } - - virtual void callback_testInt(tars::Int32 ret, tars::Int32 iOut) - { throw std::runtime_error("callback_testInt() override incorrect."); } - virtual void callback_testInt_exception(tars::Int32 ret) - { throw std::runtime_error("callback_testInt_exception() override incorrect."); } - - virtual void callback_testStr(tars::Int32 ret, const std::string& sOut) - { throw std::runtime_error("callback_testStr() override incorrect."); } - virtual void callback_testStr_exception(tars::Int32 ret) - { throw std::runtime_error("callback_testStr_exception() override incorrect."); } - - public: - virtual const map & getResponseContext() const - { - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - if(!pCbtd->getContextValid()) - { - throw TC_Exception("cann't get response context"); - } - return pCbtd->getResponseContext(); - } - - public: - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __AServant_all[]= - { - "test", - "testInt", - "testStr" - }; - pair r = equal_range(__AServant_all, __AServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __AServant_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testInt_exception(response.iRet); - - return response.iRet; - - } - case 2: - { - callback_testStr_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchResponse(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __AServant_all[]= - { - "test", - "testInt", - "testStr" - }; - pair r = equal_range(__AServant_all, __AServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __AServant_all) - { - case 0: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_test(_ret); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - tars::Int32 iOut; - _is.read(iOut, 2, true); - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_testInt(_ret, iOut); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_testStr(_ret, sOut); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - }; - typedef tars::TC_AutoPtr AServantPrxCallbackPtr; - - /* callback of coroutine async proxy for client */ - class AServantCoroPrxCallback: public AServantPrxCallback - { - public: - virtual ~AServantCoroPrxCallback(){} - public: - virtual const map & getResponseContext() const { return _mRspContext; } - - virtual void setResponseContext(const map &mContext) { _mRspContext = mContext; } - - public: - virtual int onDispatchResponse(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __AServant_all[]= - { - "test", - "testInt", - "testStr" - }; - - pair r = equal_range(__AServant_all, __AServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __AServant_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testInt_exception(response.iRet); - - return response.iRet; - - } - case 2: - { - callback_testStr_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __AServant_all[]= - { - "test", - "testInt", - "testStr" - }; - - pair r = equal_range(__AServant_all, __AServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __AServant_all) - { - case 0: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - setResponseContext(response.context); - - callback_test(_ret); - - } - catch(std::exception &ex) - { - callback_test_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_test_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - tars::Int32 iOut; - _is.read(iOut, 2, true); - setResponseContext(response.context); - - callback_testInt(_ret, iOut); - - } - catch(std::exception &ex) - { - callback_testInt_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_testInt_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - setResponseContext(response.context); - - callback_testStr(_ret, sOut); - - } - catch(std::exception &ex) - { - callback_testStr_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_testStr_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - protected: - map _mRspContext; - }; - typedef tars::TC_AutoPtr AServantCoroPrxCallbackPtr; - - /* proxy for client */ - class AServantProxy : public tars::ServantProxy - { - public: - typedef map TARS_CONTEXT; - tars::Int32 test(const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"test", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - return _ret; - } - - void async_test(AServantPrxCallbackPtr callback,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback); - } - - void coro_test(AServantCoroPrxCallbackPtr callback,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback, true); - } - - tars::Int32 testInt(tars::Int32 iIn,tars::Int32 &iOut,const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - _os.write(iIn, 1); - _os.write(iOut, 2); - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"testInt", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - _is.read(iOut, 2, true); - return _ret; - } - - void async_testInt(AServantPrxCallbackPtr callback,tars::Int32 iIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(iIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testInt", _os, context, _mStatus, callback); - } - - void coro_testInt(AServantCoroPrxCallbackPtr callback,tars::Int32 iIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(iIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testInt", _os, context, _mStatus, callback, true); - } - - tars::Int32 testStr(const std::string & sIn,std::string &sOut,const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - _os.write(sOut, 2); - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"testStr", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - _is.read(sOut, 2, true); - return _ret; - } - - void async_testStr(AServantPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testStr", _os, context, _mStatus, callback); - } - - void coro_testStr(AServantCoroPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testStr", _os, context, _mStatus, callback, true); - } - - AServantProxy* tars_hash(int64_t key) - { - return (AServantProxy*)ServantProxy::tars_hash(key); - } - - AServantProxy* tars_consistent_hash(int64_t key) - { - return (AServantProxy*)ServantProxy::tars_consistent_hash(key); - } - - AServantProxy* tars_set_timeout(int msecond) - { - return (AServantProxy*)ServantProxy::tars_set_timeout(msecond); - } - - static const char* tars_prxname() { return "AServantProxy"; } - }; - typedef tars::TC_AutoPtr AServantPrx; - - /* servant for server */ - class AServant : public tars::Servant - { - public: - virtual ~AServant(){} - virtual tars::Int32 test(tars::TarsCurrentPtr current) = 0; - static void async_response_test(tars::TarsCurrentPtr current, tars::Int32 _ret) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - virtual tars::Int32 testInt(tars::Int32 iIn,tars::Int32 &iOut,tars::TarsCurrentPtr current) = 0; - static void async_response_testInt(tars::TarsCurrentPtr current, tars::Int32 _ret, tars::Int32 iOut) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("iOut", iOut); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - _os.write(iOut, 2); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - virtual tars::Int32 testStr(const std::string & sIn,std::string &sOut,tars::TarsCurrentPtr current) = 0; - static void async_response_testStr(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &sOut) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - _os.write(sOut, 2); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - public: - int onDispatch(tars::TarsCurrentPtr _current, vector &_sResponseBuffer) - { - static ::std::string __Test__AServant_all[]= - { - "test", - "testInt", - "testStr" - }; - - pair r = equal_range(__Test__AServant_all, __Test__AServant_all+3, _current->getFuncName()); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __Test__AServant_all) - { - case 0: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - } - else - { - } - tars::Int32 _ret = test(_current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - tars::Int32 iIn; - tars::Int32 iOut; - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - tarsAttr.get("iIn", iIn); - tarsAttr.getByDefault("iOut", iOut, iOut); - } - else - { - _is.read(iIn, 1, true); - _is.read(iOut, 2, false); - } - tars::Int32 _ret = testInt(iIn,iOut, _current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("iOut", iOut); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.write(iOut, 2); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - std::string sIn; - std::string sOut; - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - tarsAttr.get("sIn", sIn); - tarsAttr.getByDefault("sOut", sOut, sOut); - } - else - { - _is.read(sIn, 1, true); - _is.read(sOut, 2, false); - } - tars::Int32 _ret = testStr(sIn,sOut, _current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.write(sOut, 2); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - }; - - -} - - - -#endif diff --git a/examples/CoroutineDemo/BServer/AServant.tars b/examples/CoroutineDemo/BServer/AServant.tars deleted file mode 100644 index 5487126..0000000 --- a/examples/CoroutineDemo/BServer/AServant.tars +++ /dev/null @@ -1,14 +0,0 @@ - -module Test -{ - -interface AServant -{ - int test(); - - int testInt(int iIn,out int iOut); - - int testStr(string sIn, out string sOut); -}; - -}; diff --git a/examples/CoroutineDemo/BServer/BServant.h b/examples/CoroutineDemo/BServer/BServant.h index 7947ea3..bf20992 100644 --- a/examples/CoroutineDemo/BServer/BServant.h +++ b/examples/CoroutineDemo/BServer/BServant.h @@ -193,44 +193,6 @@ namespace Test "testCoroSerial" }; - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testCoroParallel_exception(response.iRet); - - return response.iRet; - - } - case 2: - { - callback_testCoroSerial_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; switch(r.first - __BServant_all) @@ -336,6 +298,44 @@ namespace Test return tars::TARSSERVERNOFUNCERR; } + virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) + { + static ::std::string __BServant_all[]= + { + "test", + "testCoroParallel", + "testCoroSerial" + }; + + pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); + if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; + switch(r.first - __BServant_all) + { + case 0: + { + callback_test_exception(response.iRet); + + return response.iRet; + + } + case 1: + { + callback_testCoroParallel_exception(response.iRet); + + return response.iRet; + + } + case 2: + { + callback_testCoroSerial_exception(response.iRet); + + return response.iRet; + + } + } + return tars::TARSSERVERNOFUNCERR; + } + protected: map _mRspContext; }; diff --git a/examples/CoroutineDemo/BServer/BServantImp.cpp b/examples/CoroutineDemo/BServer/BServantImp.cpp index e09bde3..df152ea 100644 --- a/examples/CoroutineDemo/BServer/BServantImp.cpp +++ b/examples/CoroutineDemo/BServer/BServantImp.cpp @@ -28,7 +28,7 @@ void BServantImp::initialize() { //initialize servant here: //... - _pPrx = Application::getCommunicator()->stringToProxy("Test.AServer.AServantObj"); + _pPrx = Application::getCommunicator()->stringToProxy("Test.AServer.AServantObj@tcp -h 127.0.0.1 -p 9000 -t 10000"); } ////////////////////////////////////////////////////// void BServantImp::destroy() diff --git a/examples/CoroutineDemo/BServer/CMakeLists.txt b/examples/CoroutineDemo/BServer/CMakeLists.txt index f9d2cdf..fa5cc1c 100644 --- a/examples/CoroutineDemo/BServer/CMakeLists.txt +++ b/examples/CoroutineDemo/BServer/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("BServer") \ No newline at end of file +build_tars_server("CoroutineDemoBServer" "AServer") \ No newline at end of file diff --git a/examples/CoroutineDemo/BServer/makefile b/examples/CoroutineDemo/BServer/makefile deleted file mode 100644 index 4c9555b..0000000 --- a/examples/CoroutineDemo/BServer/makefile +++ /dev/null @@ -1,12 +0,0 @@ - -#----------------------------------------------------------------------- -APP := Test -TARGET := BServer -CONFIG := -STRIP_FLAG:= N - -INCLUDE += - -#----------------------------------------------------------------------- -include /usr/local/tars/cpp/makefile/makefile.tars -#----------------------------------------------------------------------- diff --git a/examples/CoroutineDemo/CMakeLists.txt b/examples/CoroutineDemo/CMakeLists.txt index ca2ee7b..76e6727 100644 --- a/examples/CoroutineDemo/CMakeLists.txt +++ b/examples/CoroutineDemo/CMakeLists.txt @@ -1,6 +1,10 @@ +include_directories(AServer) add_subdirectory(AServer) + +include_directories(BServer) add_subdirectory(BServer) + add_subdirectory(client) add_subdirectory(testCoro) add_subdirectory(testParallelCoro) diff --git a/examples/CoroutineDemo/client/BServant.h b/examples/CoroutineDemo/client/BServant.h deleted file mode 100644 index 7947ea3..0000000 --- a/examples/CoroutineDemo/client/BServant.h +++ /dev/null @@ -1,687 +0,0 @@ -// ********************************************************************** -// This file was generated by a TARS parser! -// TARS version 1.4.0. -// ********************************************************************** - -#ifndef __BSERVANT_H_ -#define __BSERVANT_H_ - -#include -#include -#include -#include "tup/Tars.h" -#include "tup/TarsJson.h" -using namespace std; -#include "servant/ServantProxy.h" -#include "servant/Servant.h" - - -namespace Test -{ - - /* callback of async proxy for client */ - class BServantPrxCallback: public tars::ServantProxyCallback - { - public: - virtual ~BServantPrxCallback(){} - virtual void callback_test(tars::Int32 ret) - { throw std::runtime_error("callback_test() override incorrect."); } - virtual void callback_test_exception(tars::Int32 ret) - { throw std::runtime_error("callback_test_exception() override incorrect."); } - - virtual void callback_testCoroParallel(tars::Int32 ret, const std::string& sOut) - { throw std::runtime_error("callback_testCoroParallel() override incorrect."); } - virtual void callback_testCoroParallel_exception(tars::Int32 ret) - { throw std::runtime_error("callback_testCoroParallel_exception() override incorrect."); } - - virtual void callback_testCoroSerial(tars::Int32 ret, const std::string& sOut) - { throw std::runtime_error("callback_testCoroSerial() override incorrect."); } - virtual void callback_testCoroSerial_exception(tars::Int32 ret) - { throw std::runtime_error("callback_testCoroSerial_exception() override incorrect."); } - - public: - virtual const map & getResponseContext() const - { - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - if(!pCbtd->getContextValid()) - { - throw TC_Exception("cann't get response context"); - } - return pCbtd->getResponseContext(); - } - - public: - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testCoroParallel_exception(response.iRet); - - return response.iRet; - - } - case 2: - { - callback_testCoroSerial_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchResponse(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_test(_ret); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_testCoroParallel(_ret, sOut); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_testCoroSerial(_ret, sOut); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - }; - typedef tars::TC_AutoPtr BServantPrxCallbackPtr; - - /* callback of coroutine async proxy for client */ - class BServantCoroPrxCallback: public BServantPrxCallback - { - public: - virtual ~BServantCoroPrxCallback(){} - public: - virtual const map & getResponseContext() const { return _mRspContext; } - - virtual void setResponseContext(const map &mContext) { _mRspContext = mContext; } - - public: - virtual int onDispatchResponse(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testCoroParallel_exception(response.iRet); - - return response.iRet; - - } - case 2: - { - callback_testCoroSerial_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - setResponseContext(response.context); - - callback_test(_ret); - - } - catch(std::exception &ex) - { - callback_test_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_test_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - setResponseContext(response.context); - - callback_testCoroParallel(_ret, sOut); - - } - catch(std::exception &ex) - { - callback_testCoroParallel_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_testCoroParallel_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - setResponseContext(response.context); - - callback_testCoroSerial(_ret, sOut); - - } - catch(std::exception &ex) - { - callback_testCoroSerial_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_testCoroSerial_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - protected: - map _mRspContext; - }; - typedef tars::TC_AutoPtr BServantCoroPrxCallbackPtr; - - /* proxy for client */ - class BServantProxy : public tars::ServantProxy - { - public: - typedef map TARS_CONTEXT; - tars::Int32 test(const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"test", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - return _ret; - } - - void async_test(BServantPrxCallbackPtr callback,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback); - } - - void coro_test(BServantCoroPrxCallbackPtr callback,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback, true); - } - - tars::Int32 testCoroParallel(const std::string & sIn,std::string &sOut,const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - _os.write(sOut, 2); - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"testCoroParallel", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - _is.read(sOut, 2, true); - return _ret; - } - - void async_testCoroParallel(BServantPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroParallel", _os, context, _mStatus, callback); - } - - void coro_testCoroParallel(BServantCoroPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroParallel", _os, context, _mStatus, callback, true); - } - - tars::Int32 testCoroSerial(const std::string & sIn,std::string &sOut,const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - _os.write(sOut, 2); - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"testCoroSerial", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - _is.read(sOut, 2, true); - return _ret; - } - - void async_testCoroSerial(BServantPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroSerial", _os, context, _mStatus, callback); - } - - void coro_testCoroSerial(BServantCoroPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroSerial", _os, context, _mStatus, callback, true); - } - - BServantProxy* tars_hash(int64_t key) - { - return (BServantProxy*)ServantProxy::tars_hash(key); - } - - BServantProxy* tars_consistent_hash(int64_t key) - { - return (BServantProxy*)ServantProxy::tars_consistent_hash(key); - } - - BServantProxy* tars_set_timeout(int msecond) - { - return (BServantProxy*)ServantProxy::tars_set_timeout(msecond); - } - - static const char* tars_prxname() { return "BServantProxy"; } - }; - typedef tars::TC_AutoPtr BServantPrx; - - /* servant for server */ - class BServant : public tars::Servant - { - public: - virtual ~BServant(){} - virtual tars::Int32 test(tars::TarsCurrentPtr current) = 0; - static void async_response_test(tars::TarsCurrentPtr current, tars::Int32 _ret) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - virtual tars::Int32 testCoroParallel(const std::string & sIn,std::string &sOut,tars::TarsCurrentPtr current) = 0; - static void async_response_testCoroParallel(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &sOut) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - _os.write(sOut, 2); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - virtual tars::Int32 testCoroSerial(const std::string & sIn,std::string &sOut,tars::TarsCurrentPtr current) = 0; - static void async_response_testCoroSerial(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &sOut) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - _os.write(sOut, 2); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - public: - int onDispatch(tars::TarsCurrentPtr _current, vector &_sResponseBuffer) - { - static ::std::string __Test__BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - - pair r = equal_range(__Test__BServant_all, __Test__BServant_all+3, _current->getFuncName()); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __Test__BServant_all) - { - case 0: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - } - else - { - } - tars::Int32 _ret = test(_current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - std::string sIn; - std::string sOut; - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - tarsAttr.get("sIn", sIn); - tarsAttr.getByDefault("sOut", sOut, sOut); - } - else - { - _is.read(sIn, 1, true); - _is.read(sOut, 2, false); - } - tars::Int32 _ret = testCoroParallel(sIn,sOut, _current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.write(sOut, 2); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - std::string sIn; - std::string sOut; - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - tarsAttr.get("sIn", sIn); - tarsAttr.getByDefault("sOut", sOut, sOut); - } - else - { - _is.read(sIn, 1, true); - _is.read(sOut, 2, false); - } - tars::Int32 _ret = testCoroSerial(sIn,sOut, _current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.write(sOut, 2); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - }; - - -} - - - -#endif diff --git a/examples/CoroutineDemo/client/BServant.tars b/examples/CoroutineDemo/client/BServant.tars deleted file mode 100644 index 7c83e51..0000000 --- a/examples/CoroutineDemo/client/BServant.tars +++ /dev/null @@ -1,15 +0,0 @@ - -module Test -{ - -interface BServant -{ - int test(); - - int testCoroSerial(string sIn, out string sOut); - - int testCoroParallel(string sIn, out string sOut); - -}; - -}; diff --git a/examples/CoroutineDemo/client/CMakeLists.txt b/examples/CoroutineDemo/client/CMakeLists.txt index 113ea42..b944fd5 100644 --- a/examples/CoroutineDemo/client/CMakeLists.txt +++ b/examples/CoroutineDemo/client/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("client") \ No newline at end of file +build_tars_server("CoroutineDemoClient" "AServer BServer") \ No newline at end of file diff --git a/examples/CoroutineDemo/client/main.cpp b/examples/CoroutineDemo/client/main.cpp index f0cc6f4..6121230 100644 --- a/examples/CoroutineDemo/client/main.cpp +++ b/examples/CoroutineDemo/client/main.cpp @@ -26,7 +26,7 @@ using namespace tars; class Test1 { public: - Test1(const string &sStr); + Test1(); ~Test1(); @@ -37,11 +37,11 @@ private: BServantPrx _prx; }; -Test1::Test1(const string &sStr) +Test1::Test1() { - _comm.setProperty("locator", "tars.tarsregistry.QueryObj@tcp -h 10.208.139.242 -p 17890 -t 10000"); - _comm.setProperty("stat", "tars.tarsstat.StatObj"); - _comm.stringToProxy(sStr, _prx); + // _comm.setProperty("locator", "tars.tarsregistry.QueryObj@tcp -h 10.208.139.242 -p 17890 -t 10000"); + // _comm.setProperty("stat", "tars.tarsstat.StatObj"); + _prx = _comm.stringToProxy("Test.BServer.BServantObj@tcp -h 127.0.0.1 -p 9100"); } Test1::~Test1() @@ -54,14 +54,11 @@ void Test1::queryResult(int iFlag, int iExecuteNum) string sIn(10,'a'); string sOut(""); - tars::Int32 count = 0; - unsigned long sum = 0; - - time_t _iTime=TC_TimeProvider::getInstance()->getNowMs(); + time_t t = TC_Common::now2us(); for(int i=0; itestCoroParallel(sIn, sOut); } - if(ret == 0) - { - ++sum; - ++count; - if(count == iExecuteNum) - { - cout << "pthread id: " << pthread_self() << " | " << TC_TimeProvider::getInstance()->getNowMs() - _iTime << endl; - _iTime=TC_TimeProvider::getInstance()->getNowMs(); - count = 0; - } - } + assert(sIn == sOut); + // cout << ret << ", " << sIn << ", " << sOut << endl; } catch(TC_Exception &e) { - cout << "pthread id: " << pthread_self() << "id: " << i << "exception: " << e.what() << endl; + cout << "pthread id: " << std::this_thread::get_id() << "id: " << i << "exception: " << e.what() << endl; } catch(...) { - cout << "pthread id: " << pthread_self() << "id: " << i << "unknown exception." << endl; + cout << "pthread id: " << std::this_thread::get_id() << "id: " << i << "unknown exception." << endl; } } - cout << "succ:" << sum << endl; - cout << "sOut:" << sOut << endl; + + int64_t cost = TC_Common::now2us() - t; + cout << "syncCall total:" << cost << "us, avg:" << 1.*cost/iExecuteNum << "us" << endl; } int main(int argc,char ** argv) { - if(argc != 5) + if(argc != 4) { - cout << "usage: " << argv[0] << " sObj ThreadNum CallTimes CallMode" << endl; + cout << "usage: " << argv[0] << " ThreadNum CallTimes CallMode" << endl; return -1; } - string s = string(argv[1]); - - Test1 test1(s); + Test1 test1; try { - tars::Int32 threads = TC_Common::strto(string(argv[2])); + tars::Int32 threads = TC_Common::strto(string(argv[1])); TC_ThreadPool tp; tp.init(threads); tp.start(); - tars::Int32 times = TC_Common::strto(string(argv[3])); - tars::Int32 callMode = TC_Common::strto(string(argv[4])); + tars::Int32 times = TC_Common::strto(string(argv[2])); + tars::Int32 callMode = TC_Common::strto(string(argv[3])); for(int i = 0; i -#include -#include -#include "tup/Tars.h" -#include "tup/TarsJson.h" -using namespace std; -#include "servant/ServantProxy.h" -#include "servant/Servant.h" - - -namespace Test -{ - - /* callback of async proxy for client */ - class BServantPrxCallback: public tars::ServantProxyCallback - { - public: - virtual ~BServantPrxCallback(){} - virtual void callback_test(tars::Int32 ret) - { throw std::runtime_error("callback_test() override incorrect."); } - virtual void callback_test_exception(tars::Int32 ret) - { throw std::runtime_error("callback_test_exception() override incorrect."); } - - virtual void callback_testCoroParallel(tars::Int32 ret, const std::string& sOut) - { throw std::runtime_error("callback_testCoroParallel() override incorrect."); } - virtual void callback_testCoroParallel_exception(tars::Int32 ret) - { throw std::runtime_error("callback_testCoroParallel_exception() override incorrect."); } - - virtual void callback_testCoroSerial(tars::Int32 ret, const std::string& sOut) - { throw std::runtime_error("callback_testCoroSerial() override incorrect."); } - virtual void callback_testCoroSerial_exception(tars::Int32 ret) - { throw std::runtime_error("callback_testCoroSerial_exception() override incorrect."); } - - public: - virtual const map & getResponseContext() const - { - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - if(!pCbtd->getContextValid()) - { - throw TC_Exception("cann't get response context"); - } - return pCbtd->getResponseContext(); - } - - public: - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testCoroParallel_exception(response.iRet); - - return response.iRet; - - } - case 2: - { - callback_testCoroSerial_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchResponse(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_test(_ret); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_testCoroParallel(_ret, sOut); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_testCoroSerial(_ret, sOut); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - }; - typedef tars::TC_AutoPtr BServantPrxCallbackPtr; - - /* callback of coroutine async proxy for client */ - class BServantCoroPrxCallback: public BServantPrxCallback - { - public: - virtual ~BServantCoroPrxCallback(){} - public: - virtual const map & getResponseContext() const { return _mRspContext; } - - virtual void setResponseContext(const map &mContext) { _mRspContext = mContext; } - - public: - virtual int onDispatchResponse(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testCoroParallel_exception(response.iRet); - - return response.iRet; - - } - case 2: - { - callback_testCoroSerial_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - setResponseContext(response.context); - - callback_test(_ret); - - } - catch(std::exception &ex) - { - callback_test_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_test_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - setResponseContext(response.context); - - callback_testCoroParallel(_ret, sOut); - - } - catch(std::exception &ex) - { - callback_testCoroParallel_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_testCoroParallel_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - setResponseContext(response.context); - - callback_testCoroSerial(_ret, sOut); - - } - catch(std::exception &ex) - { - callback_testCoroSerial_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_testCoroSerial_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - protected: - map _mRspContext; - }; - typedef tars::TC_AutoPtr BServantCoroPrxCallbackPtr; - - /* proxy for client */ - class BServantProxy : public tars::ServantProxy - { - public: - typedef map TARS_CONTEXT; - tars::Int32 test(const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"test", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - return _ret; - } - - void async_test(BServantPrxCallbackPtr callback,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback); - } - - void coro_test(BServantCoroPrxCallbackPtr callback,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback, true); - } - - tars::Int32 testCoroParallel(const std::string & sIn,std::string &sOut,const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - _os.write(sOut, 2); - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"testCoroParallel", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - _is.read(sOut, 2, true); - return _ret; - } - - void async_testCoroParallel(BServantPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroParallel", _os, context, _mStatus, callback); - } - - void coro_testCoroParallel(BServantCoroPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroParallel", _os, context, _mStatus, callback, true); - } - - tars::Int32 testCoroSerial(const std::string & sIn,std::string &sOut,const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - _os.write(sOut, 2); - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"testCoroSerial", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - _is.read(sOut, 2, true); - return _ret; - } - - void async_testCoroSerial(BServantPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroSerial", _os, context, _mStatus, callback); - } - - void coro_testCoroSerial(BServantCoroPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroSerial", _os, context, _mStatus, callback, true); - } - - BServantProxy* tars_hash(int64_t key) - { - return (BServantProxy*)ServantProxy::tars_hash(key); - } - - BServantProxy* tars_consistent_hash(int64_t key) - { - return (BServantProxy*)ServantProxy::tars_consistent_hash(key); - } - - BServantProxy* tars_set_timeout(int msecond) - { - return (BServantProxy*)ServantProxy::tars_set_timeout(msecond); - } - - static const char* tars_prxname() { return "BServantProxy"; } - }; - typedef tars::TC_AutoPtr BServantPrx; - - /* servant for server */ - class BServant : public tars::Servant - { - public: - virtual ~BServant(){} - virtual tars::Int32 test(tars::TarsCurrentPtr current) = 0; - static void async_response_test(tars::TarsCurrentPtr current, tars::Int32 _ret) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - virtual tars::Int32 testCoroParallel(const std::string & sIn,std::string &sOut,tars::TarsCurrentPtr current) = 0; - static void async_response_testCoroParallel(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &sOut) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - _os.write(sOut, 2); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - virtual tars::Int32 testCoroSerial(const std::string & sIn,std::string &sOut,tars::TarsCurrentPtr current) = 0; - static void async_response_testCoroSerial(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &sOut) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - _os.write(sOut, 2); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - public: - int onDispatch(tars::TarsCurrentPtr _current, vector &_sResponseBuffer) - { - static ::std::string __Test__BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - - pair r = equal_range(__Test__BServant_all, __Test__BServant_all+3, _current->getFuncName()); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __Test__BServant_all) - { - case 0: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - } - else - { - } - tars::Int32 _ret = test(_current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - std::string sIn; - std::string sOut; - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - tarsAttr.get("sIn", sIn); - tarsAttr.getByDefault("sOut", sOut, sOut); - } - else - { - _is.read(sIn, 1, true); - _is.read(sOut, 2, false); - } - tars::Int32 _ret = testCoroParallel(sIn,sOut, _current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.write(sOut, 2); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - std::string sIn; - std::string sOut; - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - tarsAttr.get("sIn", sIn); - tarsAttr.getByDefault("sOut", sOut, sOut); - } - else - { - _is.read(sIn, 1, true); - _is.read(sOut, 2, false); - } - tars::Int32 _ret = testCoroSerial(sIn,sOut, _current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.write(sOut, 2); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - }; - - -} - - - -#endif diff --git a/examples/CoroutineDemo/testCoro/BServant.tars b/examples/CoroutineDemo/testCoro/BServant.tars deleted file mode 100644 index 7c83e51..0000000 --- a/examples/CoroutineDemo/testCoro/BServant.tars +++ /dev/null @@ -1,15 +0,0 @@ - -module Test -{ - -interface BServant -{ - int test(); - - int testCoroSerial(string sIn, out string sOut); - - int testCoroParallel(string sIn, out string sOut); - -}; - -}; diff --git a/examples/CoroutineDemo/testCoro/CMakeLists.txt b/examples/CoroutineDemo/testCoro/CMakeLists.txt index 0191251..add2c2b 100644 --- a/examples/CoroutineDemo/testCoro/CMakeLists.txt +++ b/examples/CoroutineDemo/testCoro/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("testCoro") \ No newline at end of file +build_tars_server("testCoro" "AServer BServer") \ No newline at end of file diff --git a/examples/CoroutineDemo/testCoro/main.cpp b/examples/CoroutineDemo/testCoro/main.cpp index edfecc5..7ea1d07 100644 --- a/examples/CoroutineDemo/testCoro/main.cpp +++ b/examples/CoroutineDemo/testCoro/main.cpp @@ -27,7 +27,7 @@ using namespace tars; class TestCoroutine : public Coroutine { public: - TestCoroutine(int iNum, const string &sObj); + TestCoroutine(int iNum); ~TestCoroutine() {} @@ -35,17 +35,16 @@ public: private: int _num; - string _sObj; Communicator _comm; BServantPrx _prx; }; -TestCoroutine::TestCoroutine(int iNum, const string &sObj) +TestCoroutine::TestCoroutine(int iNum) : _num(iNum) -, _sObj(sObj) { - _comm.setProperty("locator", "tars.tarsregistry.QueryObj@tcp -h 10.208.139.242 -p 17890 -t 10000"); - _comm.stringToProxy(_sObj, _prx); + // _comm.setProperty("locator", "tars.tarsregistry.QueryObj@tcp -h 10.208.139.242 -p 17890 -t 10000"); + _prx = _comm.stringToProxy("Test.BServer.BServantObj@tcp -h 127.0.0.1 -p 9100"); + // _comm.stringToProxy(_sObj, _prx); } void TestCoroutine::handle() @@ -85,18 +84,17 @@ void TestCoroutine::handle() int main(int argc,char ** argv) { - if(argc != 3) + if(argc != 2) { - cout << "usage: " << argv[0] << " CallTimes sObj" << endl; + cout << "usage: " << argv[0] << " CallTimes " << endl; return -1; } tars::Int32 iNum = TC_Common::strto(string(argv[1])); - string sObj = string(argv[2]); - - TestCoroutine testCoro(iNum, sObj); + TestCoroutine testCoro(iNum); + //start 10 co testCoro.setCoroInfo(10, 128, 128*1024); testCoro.start(); diff --git a/examples/CoroutineDemo/testCoro/makefile b/examples/CoroutineDemo/testCoro/makefile deleted file mode 100644 index 432ad49..0000000 --- a/examples/CoroutineDemo/testCoro/makefile +++ /dev/null @@ -1,12 +0,0 @@ - -#----------------------------------------------------------------------- -APP := Test -TARGET := testCoro -CONFIG := -STRIP_FLAG:= N - -INCLUDE += - -#----------------------------------------------------------------------- -include /usr/local/tars/cpp/makefile/makefile.tars -#----------------------------------------------------------------------- diff --git a/examples/CoroutineDemo/testParallelCoro/BServant.h b/examples/CoroutineDemo/testParallelCoro/BServant.h deleted file mode 100644 index 7947ea3..0000000 --- a/examples/CoroutineDemo/testParallelCoro/BServant.h +++ /dev/null @@ -1,687 +0,0 @@ -// ********************************************************************** -// This file was generated by a TARS parser! -// TARS version 1.4.0. -// ********************************************************************** - -#ifndef __BSERVANT_H_ -#define __BSERVANT_H_ - -#include -#include -#include -#include "tup/Tars.h" -#include "tup/TarsJson.h" -using namespace std; -#include "servant/ServantProxy.h" -#include "servant/Servant.h" - - -namespace Test -{ - - /* callback of async proxy for client */ - class BServantPrxCallback: public tars::ServantProxyCallback - { - public: - virtual ~BServantPrxCallback(){} - virtual void callback_test(tars::Int32 ret) - { throw std::runtime_error("callback_test() override incorrect."); } - virtual void callback_test_exception(tars::Int32 ret) - { throw std::runtime_error("callback_test_exception() override incorrect."); } - - virtual void callback_testCoroParallel(tars::Int32 ret, const std::string& sOut) - { throw std::runtime_error("callback_testCoroParallel() override incorrect."); } - virtual void callback_testCoroParallel_exception(tars::Int32 ret) - { throw std::runtime_error("callback_testCoroParallel_exception() override incorrect."); } - - virtual void callback_testCoroSerial(tars::Int32 ret, const std::string& sOut) - { throw std::runtime_error("callback_testCoroSerial() override incorrect."); } - virtual void callback_testCoroSerial_exception(tars::Int32 ret) - { throw std::runtime_error("callback_testCoroSerial_exception() override incorrect."); } - - public: - virtual const map & getResponseContext() const - { - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - if(!pCbtd->getContextValid()) - { - throw TC_Exception("cann't get response context"); - } - return pCbtd->getResponseContext(); - } - - public: - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testCoroParallel_exception(response.iRet); - - return response.iRet; - - } - case 2: - { - callback_testCoroSerial_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchResponse(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_test(_ret); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_testCoroParallel(_ret, sOut); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - CallbackThreadData * pCbtd = CallbackThreadData::getData(); - assert(pCbtd != NULL); - - pCbtd->setResponseContext(response.context); - - callback_testCoroSerial(_ret, sOut); - - pCbtd->delResponseContext(); - - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - }; - typedef tars::TC_AutoPtr BServantPrxCallbackPtr; - - /* callback of coroutine async proxy for client */ - class BServantCoroPrxCallback: public BServantPrxCallback - { - public: - virtual ~BServantCoroPrxCallback(){} - public: - virtual const map & getResponseContext() const { return _mRspContext; } - - virtual void setResponseContext(const map &mContext) { _mRspContext = mContext; } - - public: - virtual int onDispatchResponse(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testCoroParallel_exception(response.iRet); - - return response.iRet; - - } - case 2: - { - callback_testCoroSerial_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - - pair r = equal_range(__BServant_all, __BServant_all+3, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __BServant_all) - { - case 0: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - setResponseContext(response.context); - - callback_test(_ret); - - } - catch(std::exception &ex) - { - callback_test_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_test_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - setResponseContext(response.context); - - callback_testCoroParallel(_ret, sOut); - - } - catch(std::exception &ex) - { - callback_testCoroParallel_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_testCoroParallel_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - - _is.setBuffer(response.sBuffer); - try - { - tars::Int32 _ret; - _is.read(_ret, 0, true); - - std::string sOut; - _is.read(sOut, 2, true); - setResponseContext(response.context); - - callback_testCoroSerial(_ret, sOut); - - } - catch(std::exception &ex) - { - callback_testCoroSerial_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - catch(...) - { - callback_testCoroSerial_exception(tars::TARSCLIENTDECODEERR); - - return tars::TARSCLIENTDECODEERR; - } - - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - protected: - map _mRspContext; - }; - typedef tars::TC_AutoPtr BServantCoroPrxCallbackPtr; - - /* proxy for client */ - class BServantProxy : public tars::ServantProxy - { - public: - typedef map TARS_CONTEXT; - tars::Int32 test(const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"test", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - return _ret; - } - - void async_test(BServantPrxCallbackPtr callback,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback); - } - - void coro_test(BServantCoroPrxCallbackPtr callback,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback, true); - } - - tars::Int32 testCoroParallel(const std::string & sIn,std::string &sOut,const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - _os.write(sOut, 2); - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"testCoroParallel", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - _is.read(sOut, 2, true); - return _ret; - } - - void async_testCoroParallel(BServantPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroParallel", _os, context, _mStatus, callback); - } - - void coro_testCoroParallel(BServantCoroPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroParallel", _os, context, _mStatus, callback, true); - } - - tars::Int32 testCoroSerial(const std::string & sIn,std::string &sOut,const map &context = TARS_CONTEXT(),map * pResponseContext = NULL) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - _os.write(sOut, 2); - std::map _mStatus; - shared_ptr rep = tars_invoke(tars::TARSNORMAL,"testCoroSerial", _os, context, _mStatus); - if(pResponseContext) - { - pResponseContext->swap(rep->context); - } - - tars::TarsInputStream _is; - _is.setBuffer(rep->sBuffer); - tars::Int32 _ret; - _is.read(_ret, 0, true); - _is.read(sOut, 2, true); - return _ret; - } - - void async_testCoroSerial(BServantPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroSerial", _os, context, _mStatus, callback); - } - - void coro_testCoroSerial(BServantCoroPrxCallbackPtr callback,const std::string &sIn,const map& context = TARS_CONTEXT()) - { - tars::TarsOutputStream _os; - _os.write(sIn, 1); - std::map _mStatus; - tars_invoke_async(tars::TARSNORMAL,"testCoroSerial", _os, context, _mStatus, callback, true); - } - - BServantProxy* tars_hash(int64_t key) - { - return (BServantProxy*)ServantProxy::tars_hash(key); - } - - BServantProxy* tars_consistent_hash(int64_t key) - { - return (BServantProxy*)ServantProxy::tars_consistent_hash(key); - } - - BServantProxy* tars_set_timeout(int msecond) - { - return (BServantProxy*)ServantProxy::tars_set_timeout(msecond); - } - - static const char* tars_prxname() { return "BServantProxy"; } - }; - typedef tars::TC_AutoPtr BServantPrx; - - /* servant for server */ - class BServant : public tars::Servant - { - public: - virtual ~BServant(){} - virtual tars::Int32 test(tars::TarsCurrentPtr current) = 0; - static void async_response_test(tars::TarsCurrentPtr current, tars::Int32 _ret) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - virtual tars::Int32 testCoroParallel(const std::string & sIn,std::string &sOut,tars::TarsCurrentPtr current) = 0; - static void async_response_testCoroParallel(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &sOut) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - _os.write(sOut, 2); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - virtual tars::Int32 testCoroSerial(const std::string & sIn,std::string &sOut,tars::TarsCurrentPtr current) = 0; - static void async_response_testCoroSerial(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &sOut) - { - if (current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - - vector sTupResponseBuffer; - tarsAttr.encode(sTupResponseBuffer); - current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - - _os.write(sOut, 2); - - current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer()); - } - } - - public: - int onDispatch(tars::TarsCurrentPtr _current, vector &_sResponseBuffer) - { - static ::std::string __Test__BServant_all[]= - { - "test", - "testCoroParallel", - "testCoroSerial" - }; - - pair r = equal_range(__Test__BServant_all, __Test__BServant_all+3, _current->getFuncName()); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __Test__BServant_all) - { - case 0: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - } - else - { - } - tars::Int32 _ret = test(_current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - case 1: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - std::string sIn; - std::string sOut; - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - tarsAttr.get("sIn", sIn); - tarsAttr.getByDefault("sOut", sOut, sOut); - } - else - { - _is.read(sIn, 1, true); - _is.read(sOut, 2, false); - } - tars::Int32 _ret = testCoroParallel(sIn,sOut, _current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.write(sOut, 2); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - case 2: - { - tars::TarsInputStream _is; - _is.setBuffer(_current->getRequestBuffer()); - std::string sIn; - std::string sOut; - if (_current->getRequestVersion() == TUPVERSION) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.decode(_current->getRequestBuffer()); - tarsAttr.get("sIn", sIn); - tarsAttr.getByDefault("sOut", sOut, sOut); - } - else - { - _is.read(sIn, 1, true); - _is.read(sOut, 2, false); - } - tars::Int32 _ret = testCoroSerial(sIn,sOut, _current); - if(_current->isResponse()) - { - if (_current->getRequestVersion() == TUPVERSION ) - { - UniAttribute tarsAttr; - tarsAttr.setVersion(_current->getRequestVersion()); - tarsAttr.put("", _ret); - tarsAttr.put("sOut", sOut); - tarsAttr.encode(_sResponseBuffer); - } - else - { - tars::TarsOutputStream _os; - _os.write(_ret, 0); - _os.write(sOut, 2); - _os.swap(_sResponseBuffer); - } - } - return tars::TARSSERVERSUCCESS; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - }; - - -} - - - -#endif diff --git a/examples/CoroutineDemo/testParallelCoro/BServant.tars b/examples/CoroutineDemo/testParallelCoro/BServant.tars deleted file mode 100644 index 7c83e51..0000000 --- a/examples/CoroutineDemo/testParallelCoro/BServant.tars +++ /dev/null @@ -1,15 +0,0 @@ - -module Test -{ - -interface BServant -{ - int test(); - - int testCoroSerial(string sIn, out string sOut); - - int testCoroParallel(string sIn, out string sOut); - -}; - -}; diff --git a/examples/CoroutineDemo/testParallelCoro/CMakeLists.txt b/examples/CoroutineDemo/testParallelCoro/CMakeLists.txt index 4264be7..35ce160 100644 --- a/examples/CoroutineDemo/testParallelCoro/CMakeLists.txt +++ b/examples/CoroutineDemo/testParallelCoro/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("testParallelCoro") \ No newline at end of file +build_tars_server("testParallelCoro" "AServer BServer") \ No newline at end of file diff --git a/examples/CoroutineDemo/testParallelCoro/main.cpp b/examples/CoroutineDemo/testParallelCoro/main.cpp index fc38cce..6d8a8b6 100644 --- a/examples/CoroutineDemo/testParallelCoro/main.cpp +++ b/examples/CoroutineDemo/testParallelCoro/main.cpp @@ -57,140 +57,12 @@ public: }; typedef tars::TC_AutoPtr BServantCoroCallbackPtr; -// //自定义协程类 -// class CoroutineClass : public TC_Thread -// { -// public: -// /** -// * 构造函数 -// */ -// CoroutineClass(); - -// /** -// * 析构函数 -// */ -// virtual ~CoroutineClass(); - -// /** -// * 返回0,代表成功,-1,表示失败 -// */ -// int registerFunc(const vector< std::function > &vFunc); - -// /** -// * 线程初始化 -// */ -// virtual void initialize() {} - -// /** -// * 线程处理方法 -// */ -// virtual void run(); - -// /** -// * 停止线程 -// */ -// void terminate(); - -// protected: -// /** -// * 线程已经启动, 进入具体协程处理前调用 -// */ -// virtual void startCoro() {} - -// /** -// * 线程马上要退出时调用 -// */ -// virtual void stopCoro() {} - -// /** -// * 具体的处理逻辑 -// */ -// virtual void handleCoro(); - -// protected: -// CoroutineScheduler *_coroSched; -// uint32_t _iPoolSize; -// size_t _iStackSize; -// vector > _vFunc; -// }; - -// CoroutineClass::CoroutineClass() -// : _coroSched(NULL) -// , _iPoolSize(1024) -// , _iStackSize(128*1024) -// { -// } - -// CoroutineClass::~CoroutineClass() -// { -// if(isAlive()) -// { -// terminate(); - -// getThreadControl().join(); -// } -// } - -// int CoroutineClass::registerFunc(const vector< std::function > &vFunc) -// { -// if(vFunc.size() > _iPoolSize || vFunc.size() <= 0) -// { -// return -1; -// } - -// _vFunc = vFunc; - -// return 0; -// } - -// void CoroutineClass::run() -// { -// initialize(); - -// startCoro(); - -// handleCoro(); - -// stopCoro(); -// } - -// void CoroutineClass::terminate() -// { -// if(_coroSched) -// { -// _coroSched->terminate(); -// } -// } - -// void CoroutineClass::handleCoro() -// { -// _coroSched = new CoroutineScheduler(); - -// _coroSched->init(_iPoolSize, _iStackSize); - -// ServantProxyThreadData * pSptd = ServantProxyThreadData::getData(); - -// assert(pSptd != NULL); - -// pSptd->_sched = _coroSched; - -// for(size_t i = 0; i < _vFunc.size(); ++i) -// { -// _coroSched->createCoroutine(_vFunc[i]); -// } - -// _coroSched->run(); - -// delete _coroSched; -// _coroSched = NULL; -// } - //////////////////////////////////////////// //继承框架的协程类 class TestCoroutine : public Coroutine { public: - TestCoroutine(int iNum, const string &sObj); + TestCoroutine(int iNum); ~TestCoroutine() {} @@ -198,17 +70,16 @@ public: private: int _num; - string _sObj; Communicator _comm; BServantPrx _prx; }; -TestCoroutine::TestCoroutine(int iNum, const string &sObj) +TestCoroutine::TestCoroutine(int iNum) : _num(iNum) -, _sObj(sObj) { - _comm.setProperty("locator", "tars.tarsregistry.QueryObj@tcp -h 10.208.139.242 -p 17890 -t 10000"); - _comm.stringToProxy(_sObj, _prx); + // _comm.setProperty("locator", "tars.tarsregistry.QueryObj@tcp -h 10.208.139.242 -p 17890 -t 10000"); + _prx = _comm.stringToProxy("Test.BServer.BServantObj@tcp -h 127.0.0.1 -p 9100"); + // _comm.stringToProxy(_sObj, _prx); } void TestCoroutine::handle() @@ -232,8 +103,8 @@ void TestCoroutine::handle() coroWhenAll(sharedPtr); - cout << "ret1:" << cb1->_sOut << "|ret2:" << cb2->_sOut << endl; - ; + // cout << "ret1:" << cb1->_sOut << "|ret2:" << cb2->_sOut << endl; + if(cb1->_iRet == 0 && cb2->_iRet == 0 && cb1->_iException == 0 && cb2->_iException == 0) { ++sum; @@ -254,17 +125,15 @@ void TestCoroutine::handle() int main(int argc,char ** argv) { - if(argc != 3) + if(argc != 2) { - cout << "usage: " << argv[0] << " CallTimes sObj" << endl; + cout << "usage: " << argv[0] << " CallTimes " << endl; return -1; } tars::Int32 iNum = TC_Common::strto(string(argv[1])); - string sObj = string(argv[2]); - - TestCoroutine testCoro(iNum, sObj); + TestCoroutine testCoro(iNum); testCoro.setCoroInfo(10, 128, 128*1024); diff --git a/examples/CoroutineDemo/testParallelCoro/makefile b/examples/CoroutineDemo/testParallelCoro/makefile deleted file mode 100644 index 432ad49..0000000 --- a/examples/CoroutineDemo/testParallelCoro/makefile +++ /dev/null @@ -1,12 +0,0 @@ - -#----------------------------------------------------------------------- -APP := Test -TARGET := testCoro -CONFIG := -STRIP_FLAG:= N - -INCLUDE += - -#----------------------------------------------------------------------- -include /usr/local/tars/cpp/makefile/makefile.tars -#----------------------------------------------------------------------- diff --git a/examples/HttpDemo/HttpClient/CMakeLists.txt b/examples/HttpDemo/HttpClient/CMakeLists.txt index 2aae31b..37cf276 100644 --- a/examples/HttpDemo/HttpClient/CMakeLists.txt +++ b/examples/HttpDemo/HttpClient/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("HttpClient") \ No newline at end of file +build_tars_server("HttpClient" "HttpServer") \ No newline at end of file diff --git a/examples/HttpDemo/HttpServer/CMakeLists.txt b/examples/HttpDemo/HttpServer/CMakeLists.txt index 0e21ae5..8e915cb 100644 --- a/examples/HttpDemo/HttpServer/CMakeLists.txt +++ b/examples/HttpDemo/HttpServer/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("HttpServer") \ No newline at end of file +build_tars_server("HttpServer" "") \ No newline at end of file diff --git a/examples/PushDemo/PushClient/CMakeLists.txt b/examples/PushDemo/PushClient/CMakeLists.txt index ab9804c..2833332 100644 --- a/examples/PushDemo/PushClient/CMakeLists.txt +++ b/examples/PushDemo/PushClient/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("PushClient") \ No newline at end of file +build_tars_server("PushClient" "PushServer") \ No newline at end of file diff --git a/examples/PushDemo/PushClient/Makefile b/examples/PushDemo/PushClient/Makefile deleted file mode 100644 index 8358cf4..0000000 --- a/examples/PushDemo/PushClient/Makefile +++ /dev/null @@ -1,13 +0,0 @@ -APP := Test -TARGET := TestPushClient -MFLAGS := -CONFIG := -STRIP_FLAG := N -TARS2CPP_FLAG:= - -INCLUDE += -LIB += - -#----------------------------------------------------------------------- -include /usr/local/tars/cpp/makefile/makefile.tars -#----------------------------------------------------------------------- diff --git a/examples/PushDemo/PushServer/CMakeLists.txt b/examples/PushDemo/PushServer/CMakeLists.txt index 5607f98..35fb9b9 100644 --- a/examples/PushDemo/PushServer/CMakeLists.txt +++ b/examples/PushDemo/PushServer/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("PushServer") \ No newline at end of file +build_tars_server("PushServer" "") \ No newline at end of file diff --git a/examples/PushDemo/PushServer/makefile b/examples/PushDemo/PushServer/makefile deleted file mode 100644 index 7464e88..0000000 --- a/examples/PushDemo/PushServer/makefile +++ /dev/null @@ -1,17 +0,0 @@ - -#----------------------------------------------------------------------- - -APP := Test -TARGET := TestPushServer -CONFIG := -STRIP_FLAG := N -TARS2CPP_FLAG:= - -INCLUDE += -LIB += - -#----------------------------------------------------------------------- - -include /usr/local/tars/cpp/makefile/makefile.tars - -#----------------------------------------------------------------------- diff --git a/examples/QuickStartDemo/HelloServer/Client/CMakeLists.txt b/examples/QuickStartDemo/HelloServer/Client/CMakeLists.txt index 13d9f3f..0162eb8 100644 --- a/examples/QuickStartDemo/HelloServer/Client/CMakeLists.txt +++ b/examples/QuickStartDemo/HelloServer/Client/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("QuickStartDemoClient") \ No newline at end of file +build_tars_server("QuickStartDemoClient" "QuickStartDemo") \ No newline at end of file diff --git a/examples/QuickStartDemo/HelloServer/Client/main.cpp b/examples/QuickStartDemo/HelloServer/Client/main.cpp index c30a630..bfeb4dd 100644 --- a/examples/QuickStartDemo/HelloServer/Client/main.cpp +++ b/examples/QuickStartDemo/HelloServer/Client/main.cpp @@ -210,25 +210,6 @@ struct TupCallback : public ServantProxyCallback return 0; } - // virtual int onDispatch(ReqMessagePtr msg) - // { - // callback_count++; - - // TarsUniPacket<> rsp; - - // rsp.decode(package.sBuffer.data(), package.sBuffer.size()); - - // int ret = rsp.get(""); - // string sRsp = rsp.get("sRsp"); - // if(cur == count-1) - // { - // int64_t cost = TC_Common::now2us() - start; - // cout << "TupCallback count:" << count << ", " << cost << " us, avg:" << 1.*cost/count << "us" << endl; - // } - - // return 0; - // } - int64_t start; int cur; int count; diff --git a/examples/QuickStartDemo/HelloServer/Server/CMakeLists.txt b/examples/QuickStartDemo/HelloServer/Server/CMakeLists.txt index eec8189..e44bc9a 100644 --- a/examples/QuickStartDemo/HelloServer/Server/CMakeLists.txt +++ b/examples/QuickStartDemo/HelloServer/Server/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("QuickStartDemo") +build_tars_server("QuickStartDemo" "") diff --git a/examples/QuickStartDemo/HelloServer/Server/Hello.h b/examples/QuickStartDemo/HelloServer/Server/Hello.h index 5c5a31d..cc7bba9 100644 --- a/examples/QuickStartDemo/HelloServer/Server/Hello.h +++ b/examples/QuickStartDemo/HelloServer/Server/Hello.h @@ -156,36 +156,6 @@ namespace TestApp "testHello" }; - pair r = equal_range(__Hello_all, __Hello_all+2, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __Hello_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testHello_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __Hello_all[]= - { - "test", - "testHello" - }; - pair r = equal_range(__Hello_all, __Hello_all+2, request.sFuncName); if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; switch(r.first - __Hello_all) @@ -258,6 +228,36 @@ namespace TestApp return tars::TARSSERVERNOFUNCERR; } + virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) + { + static ::std::string __Hello_all[]= + { + "test", + "testHello" + }; + + pair r = equal_range(__Hello_all, __Hello_all+2, request.sFuncName); + if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; + switch(r.first - __Hello_all) + { + case 0: + { + callback_test_exception(response.iRet); + + return response.iRet; + + } + case 1: + { + callback_testHello_exception(response.iRet); + + return response.iRet; + + } + } + return tars::TARSSERVERNOFUNCERR; + } + protected: map _mRspContext; }; diff --git a/examples/QuickStartDemo/ProxyServer/Client/CMakeLists.txt b/examples/QuickStartDemo/ProxyServer/Client/CMakeLists.txt index 2c609ca..1a9a5a0 100644 --- a/examples/QuickStartDemo/ProxyServer/Client/CMakeLists.txt +++ b/examples/QuickStartDemo/ProxyServer/Client/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("ProxyServerClient") \ No newline at end of file +build_tars_server("ProxyServerClient" "ProxyServer") \ No newline at end of file diff --git a/examples/QuickStartDemo/ProxyServer/Client/main.cpp b/examples/QuickStartDemo/ProxyServer/Client/main.cpp index b204c49..87ba4c8 100644 --- a/examples/QuickStartDemo/ProxyServer/Client/main.cpp +++ b/examples/QuickStartDemo/ProxyServer/Client/main.cpp @@ -29,16 +29,23 @@ int main(int argc,char ** argv) try { ProxyPrx prx; - comm.stringToProxy("TestApp.ProxyServer.ProxyObj@tcp -h 10.208.139.242 -p 10007" , prx); + comm.stringToProxy("TestApp.ProxyServer.ProxyObj@tcp -h 127.0.0.1 -p 9200" , prx); try { - string sReq("hello"); - string sRsp(""); + int i = 1000; + while(i-- >= 0) + { + string sReq("hello"); + string sRsp(""); - int iRet = prx->testProxy(sReq, sRsp); + int iRet = prx->testProxy(sReq, sRsp); - cout<<"iRet:"< r = equal_range(__Proxy_all, __Proxy_all+2, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __Proxy_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testProxy_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __Proxy_all[]= - { - "test", - "testProxy" - }; - pair r = equal_range(__Proxy_all, __Proxy_all+2, request.sFuncName); if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; switch(r.first - __Proxy_all) @@ -258,6 +228,36 @@ namespace TestApp return tars::TARSSERVERNOFUNCERR; } + virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) + { + static ::std::string __Proxy_all[]= + { + "test", + "testProxy" + }; + + pair r = equal_range(__Proxy_all, __Proxy_all+2, request.sFuncName); + if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; + switch(r.first - __Proxy_all) + { + case 0: + { + callback_test_exception(response.iRet); + + return response.iRet; + + } + case 1: + { + callback_testProxy_exception(response.iRet); + + return response.iRet; + + } + } + return tars::TARSSERVERNOFUNCERR; + } + protected: map _mRspContext; }; diff --git a/examples/QuickStartDemo/ProxyServer/Server/ProxyImp.cpp b/examples/QuickStartDemo/ProxyServer/Server/ProxyImp.cpp index 5cc0fc4..62cd452 100644 --- a/examples/QuickStartDemo/ProxyServer/Server/ProxyImp.cpp +++ b/examples/QuickStartDemo/ProxyServer/Server/ProxyImp.cpp @@ -50,7 +50,7 @@ void ProxyImp::initialize() //initialize servant here: //... - _prx = Application::getCommunicator()->stringToProxy("TestApp.HelloServer.HelloObj"); + _prx = Application::getCommunicator()->stringToProxy("TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 8999"); } ////////////////////////////////////////////////////// diff --git a/examples/QuickStartDemo/ProxyServer/Server/makefile b/examples/QuickStartDemo/ProxyServer/Server/makefile deleted file mode 100644 index 7138201..0000000 --- a/examples/QuickStartDemo/ProxyServer/Server/makefile +++ /dev/null @@ -1,17 +0,0 @@ - -#----------------------------------------------------------------------- - -APP := TestApp -TARGET := ProxyServer -CONFIG := -STRIP_FLAG:= N -TARS2CPP_FLAG:= - -INCLUDE += -LIB += - -#----------------------------------------------------------------------- -include /home/tarsproto/TestApp/HelloServer/HelloServer.mk -include /usr/local/tars/cpp/makefile/makefile.tars - -#----------------------------------------------------------------------- diff --git a/examples/StressDemo/TarsStressClient/CMakeLists.txt b/examples/StressDemo/TarsStressClient/CMakeLists.txt index 3874e8c..ae9ebb0 100644 --- a/examples/StressDemo/TarsStressClient/CMakeLists.txt +++ b/examples/StressDemo/TarsStressClient/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("TarsStressClient") \ No newline at end of file +build_tars_server("TarsStressClient" "TarsStressServer") \ No newline at end of file diff --git a/examples/StressDemo/TarsStressServer/CMakeLists.txt b/examples/StressDemo/TarsStressServer/CMakeLists.txt index a5ff52a..8481df3 100644 --- a/examples/StressDemo/TarsStressServer/CMakeLists.txt +++ b/examples/StressDemo/TarsStressServer/CMakeLists.txt @@ -1 +1 @@ -build_tars_server("TarsStressServer") \ No newline at end of file +build_tars_server("TarsStressServer" "") \ No newline at end of file diff --git a/examples/StressDemo/TarsStressServer/Stress.h b/examples/StressDemo/TarsStressServer/Stress.h index 837b68f..42958d4 100644 --- a/examples/StressDemo/TarsStressServer/Stress.h +++ b/examples/StressDemo/TarsStressServer/Stress.h @@ -156,36 +156,6 @@ namespace Test "testStr" }; - pair r = equal_range(__Stress_all, __Stress_all+2, request.sFuncName); - if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; - switch(r.first - __Stress_all) - { - case 0: - { - callback_test_exception(response.iRet); - - return response.iRet; - - } - case 1: - { - callback_testStr_exception(response.iRet); - - return response.iRet; - - } - } - return tars::TARSSERVERNOFUNCERR; - } - - virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) - { - static ::std::string __Stress_all[]= - { - "test", - "testStr" - }; - pair r = equal_range(__Stress_all, __Stress_all+2, request.sFuncName); if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; switch(r.first - __Stress_all) @@ -258,6 +228,36 @@ namespace Test return tars::TARSSERVERNOFUNCERR; } + virtual int onDispatchException(const tars::RequestPacket &request, const tars::ResponsePacket &response) + { + static ::std::string __Stress_all[]= + { + "test", + "testStr" + }; + + pair r = equal_range(__Stress_all, __Stress_all+2, request.sFuncName); + if(r.first == r.second) return tars::TARSSERVERNOFUNCERR; + switch(r.first - __Stress_all) + { + case 0: + { + callback_test_exception(response.iRet); + + return response.iRet; + + } + case 1: + { + callback_testStr_exception(response.iRet); + + return response.iRet; + + } + } + return tars::TARSSERVERNOFUNCERR; + } + protected: map _mRspContext; }; diff --git a/examples/scripts/run-http.sh b/examples/scripts/run-http.sh index f4583ad..bb89c4d 100644 --- a/examples/scripts/run-http.sh +++ b/examples/scripts/run-http.sh @@ -18,4 +18,6 @@ echo "client: ${WORKDIR}/../bin/HttpClient" ${WORKDIR}/../bin/HttpClient 2 10000 +killall -9 HttpServer + diff --git a/examples/scripts/run-quick-start.sh b/examples/scripts/run-quick-start.sh index d2697a3..9f132b5 100644 --- a/examples/scripts/run-quick-start.sh +++ b/examples/scripts/run-quick-start.sh @@ -22,4 +22,11 @@ ${WORKDIR}/../bin/QuickStartDemoClient --count=100000 --call=async --thread=2 -- ${WORKDIR}/../bin/QuickStartDemoClient --count=100000 --call=synctup --thread=2 --buffersize=100 --netthread=2 -${WORKDIR}/../bin/QuickStartDemoClient --count=100000 --call=asynctup --thread=2 --buffersize=100 --netthread=2 \ No newline at end of file +${WORKDIR}/../bin/QuickStartDemoClient --count=100000 --call=asynctup --thread=2 --buffersize=100 --netthread=2 + +${WORKDIR}/../bin/ProxyServer --config=${WORKDIR}/../../examples/QuickStartDemo/ProxyServer/Server/config.conf & +${WORKDIR}/../bin/ProxyServerClient + + +killall -9 QuickStartDemo +killall -9 ProxyServer \ No newline at end of file diff --git a/servant/libservant/Application.cpp b/servant/libservant/Application.cpp index 61b8595..86bc82e 100644 --- a/servant/libservant/Application.cpp +++ b/servant/libservant/Application.cpp @@ -232,8 +232,6 @@ void Application::waitForShutdown() _epollServer->waitForShutdown(); - // waitForQuit(); - destroyApp(); TarsRemoteNotify::getInstance()->report("stop", true); @@ -241,10 +239,6 @@ void Application::waitForShutdown() void Application::terminate() { - // if(_epollServer) - // { - // _epollServer->terminate(); - // } if (_epollServer && !_epollServer->isTerminate()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); //稍微休息一下, 让当前处理包能够回复 diff --git a/servant/libservant/AsyncProcThread.cpp b/servant/libservant/AsyncProcThread.cpp index a2f28bc..376402e 100644 --- a/servant/libservant/AsyncProcThread.cpp +++ b/servant/libservant/AsyncProcThread.cpp @@ -25,7 +25,6 @@ namespace tars AsyncProcThread::AsyncProcThread(size_t iQueueCap, bool merge) : _terminate(false), _iQueueCap(iQueueCap), _merge(merge) { - // _msgQueue = new ReqInfoQueue(iQueueCap); _msgQueue = new TC_CasQueue(); if(!_merge) @@ -48,7 +47,7 @@ AsyncProcThread::~AsyncProcThread() void AsyncProcThread::terminate() { if(!_merge) { - Lock lock(*this); + TC_ThreadLock::Lock lock(*this); _terminate = true; @@ -85,19 +84,15 @@ void AsyncProcThread::run() { ReqMessage * msg; - //异步请求回来的响应包处理 - if(_msgQueue->empty()) - { - TC_ThreadLock::Lock lock(*this); - if(_msgQueue->empty()) { - timedWait(1000); - } - } - if (_msgQueue->pop_front(msg)) { callback(msg); } + else + { + TC_ThreadLock::Lock lock(*this); + timedWait(1000); + } } } diff --git a/servant/libservant/Communicator.cpp b/servant/libservant/Communicator.cpp index 9b3cb68..5bf4915 100644 --- a/servant/libservant/Communicator.cpp +++ b/servant/libservant/Communicator.cpp @@ -271,6 +271,33 @@ void Communicator::initialize() _clientThreadNum = MAX_CLIENT_THREAD_NUM; } + //异步线程数 + _asyncThreadNum = TC_Common::strto(getProperty("asyncthread", "3")); + + if(_asyncThreadNum == 0) + { + _asyncThreadNum = 3; + } + + if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM) + { + _asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM; + } + + bool merge = TC_Common::strto(getProperty("mergenetasync", "0")); + + //异步队列的大小 + size_t iAsyncQueueCap = TC_Common::strto(getProperty("asyncqueuecap", "10-000")); + if(iAsyncQueueCap < 10000) + { + iAsyncQueueCap = 10000; + } + + //第一个通信器才去启动回调线程 + for (size_t i = 0; i < _asyncThreadNum; ++i) { + _asyncThread.push_back(new AsyncProcThread(iAsyncQueueCap, merge)); + } + //stat总是有对象, 保证getStat返回的对象总是有效 _statReport = new StatReport(_clientThreadNum); @@ -280,13 +307,18 @@ void Communicator::initialize() _communicatorEpoll[i]->start(); } + //异步队列数目上报 + string moduleName = getProperty("modulename", ""); + if(!moduleName.empty()) + { + _reportAsyncQueue= getStatReport()->createPropertyReport(moduleName + ".asyncqueue", PropertyReport::avg()); + } + //初始化统计上报接口 string statObj = getProperty("stat", ""); string propertyObj = getProperty("property", ""); - string moduleName = getProperty("modulename", ""); - int iReportInterval = TC_Common::strto(getProperty("report-interval", "60000")); int iReportTimeout = TC_Common::strto(getProperty("report-timeout", "5000")); @@ -333,11 +365,40 @@ vector Communicator::getEndpoint4All(const string & objName) return pServantProxy->getEndpoint4All(); } +void Communicator::doStat() +{ + if(_reportAsyncQueue) + { + size_t n = 0; + for(size_t i = 0;i < _asyncThreadNum; ++i) + { + n = n + _asyncThread[i]->getSize(); + } + _reportAsyncQueue->report(n); + } + +} + +void Communicator::pushAsyncThreadQueue(ReqMessage * msg) +{ + //先不考虑每个线程队列数目不一致的情况 + _asyncThread[_asyncSeq]->push_back(msg); + _asyncSeq ++; + + if(_asyncSeq == _asyncThreadNum) + { + _asyncSeq = 0; + } +} + void Communicator::terminate() { { TC_LockT lock(*this); + if (_terminating) + return; + _terminating = true; if(_initialized) @@ -351,6 +412,22 @@ void Communicator::terminate() { _statReport->terminate(); } + + for(size_t i = 0;i < _asyncThreadNum; ++i) + { + if(_asyncThread[i]) + { + if (_asyncThread[i]->isAlive()) + { + _asyncThread[i]->terminate(); + _asyncThread[i]->getThreadControl().join(); + } + + delete _asyncThread[i]; + _asyncThread[i] = NULL; + } + } + _asyncThread.clear(); } } diff --git a/servant/libservant/CommunicatorEpoll.cpp b/servant/libservant/CommunicatorEpoll.cpp index 7ae0983..3c754f6 100755 --- a/servant/libservant/CommunicatorEpoll.cpp +++ b/servant/libservant/CommunicatorEpoll.cpp @@ -24,28 +24,19 @@ using namespace std; namespace tars { -vector CommunicatorEpoll::_asyncThread; -PropertyReport * CommunicatorEpoll::_reportAsyncQueue = NULL; - CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq) : _communicator(pCommunicator) , _terminate(false) , _nextTime(0) , _nextStatTime(0) , _objectProxyFactory(NULL) -, _asyncThreadNum(3) -, _asyncSeq(0) +// , _asyncThreadNum(3) +// , _asyncSeq(0) , _netThreadSeq(netThreadSeq) // , _reportAsyncQueue(NULL) , _noSendQueueLimit(1000) -// , _waitTimeout(100) , _timeoutCheckInterval(100) { - // _ep.create(1024); - - // _shutdown.createSocket(); - // _ep.add(_shutdown.getfd(), 0, EPOLLIN); - _ep.create(1024); _terminateFDInfo.notify.init(&_ep); @@ -55,19 +46,6 @@ CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThre //ObjectProxyFactory 对象 _objectProxyFactory = new ObjectProxyFactory(this); - //异步线程数 - _asyncThreadNum = TC_Common::strto(pCommunicator->getProperty("asyncthread", "3")); - - if(_asyncThreadNum == 0) - { - _asyncThreadNum = 3; - } - - if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM) - { - _asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM; - } - //节点队列未发送请求的大小限制 _noSendQueueLimit = TC_Common::strto(pCommunicator->getProperty("nosendqueuelimit", "100000")); if(_noSendQueueLimit < 1000) @@ -75,13 +53,6 @@ CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThre _noSendQueueLimit = 1000; } - //异步队列的大小 - size_t iAsyncQueueCap = TC_Common::strto(pCommunicator->getProperty("asyncqueuecap", "10-000")); - if(iAsyncQueueCap < 10000) - { - iAsyncQueueCap = 10000; - } - // //epollwait的超时时间 // _waitTimeout = TC_Common::strto(pCommunicator->getProperty("epollwaittimeout", "100")); // if(_waitTimeout < 1) @@ -96,69 +67,59 @@ CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThre _timeoutCheckInterval = 1; } - bool merge = TC_Common::strto(pCommunicator->getProperty("mergenetasync", "0")); - for(size_t i = 0;i < MAX_CLIENT_NOTIFYEVENT_NUM;++i) { _notify[i] = NULL; } - if(isFirstNetThread()) { - //第一个通信器才去启动回调线程 - for (size_t i = 0; i < _asyncThreadNum; ++i) { - _asyncThread.push_back(new AsyncProcThread(iAsyncQueueCap, merge)); - } + // if(isFirstNetThread()) { - //异步队列数目上报 - string moduleName = pCommunicator->getProperty("modulename", ""); - if(!moduleName.empty()) - { - PropertyReportPtr asyncQueuePtr = pCommunicator->getStatReport()->createPropertyReport(moduleName + ".asyncqueue", PropertyReport::avg()); - _reportAsyncQueue = asyncQueuePtr.get(); - } - } + // //异步线程数 + // _asyncThreadNum = TC_Common::strto(pCommunicator->getProperty("asyncthread", "3")); + // if(_asyncThreadNum == 0) + // { + // _asyncThreadNum = 3; + // } - // //创建异步线程 - // for(size_t i = 0; i < _asyncThreadNum; ++i) - // { - // _asyncThread[i] = new AsyncProcThread(iAsyncQueueCap); - // _asyncThread[i]->start(); + // if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM) + // { + // _asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM; + // } + + // //第一个通信器才去启动回调线程 + // for (size_t i = 0; i < _asyncThreadNum; ++i) { + // _asyncThread.push_back(new AsyncProcThread(iAsyncQueueCap, merge)); + // } + + // //异步队列数目上报 + // string moduleName = pCommunicator->getProperty("modulename", ""); + // if(!moduleName.empty()) + // { + // PropertyReportPtr asyncQueuePtr = pCommunicator->getStatReport()->createPropertyReport(moduleName + ".asyncqueue", PropertyReport::avg()); + // _reportAsyncQueue = asyncQueuePtr.get(); + // } // } - - // //初始化请求的事件通知 - // for(size_t i = 0; i < MAX_CLIENT_NOTIFYEVENT_NUM; ++i) - // { - // _notify[i].bValid = false; - // } - - // //异步队列数目上报 - // string moduleName = pCommunicator->getProperty("modulename", ""); - // if(!moduleName.empty()) - // { - // PropertyReportPtr asyncQueuePtr = pCommunicator->getStatReport()->createPropertyReport(moduleName + ".asyncqueue"+TC_Common::tostr(netThreadSeq), PropertyReport::avg()); - // _reportAsyncQueue = asyncQueuePtr.get(); - // } } CommunicatorEpoll::~CommunicatorEpoll() { - if(isFirstNetThread()) { - for(size_t i = 0;i < _asyncThreadNum; ++i) - { - if(_asyncThread[i]) - { - if (_asyncThread[i]->isAlive()) - { - _asyncThread[i]->terminate(); - _asyncThread[i]->getThreadControl().join(); - } + // if(isFirstNetThread()) { + // for(size_t i = 0;i < _asyncThreadNum; ++i) + // { + // if(_asyncThread[i]) + // { + // if (_asyncThread[i]->isAlive()) + // { + // _asyncThread[i]->terminate(); + // _asyncThread[i]->getThreadControl().join(); + // } - delete _asyncThread[i]; - _asyncThread[i] = NULL; - } - } - } + // delete _asyncThread[i]; + // _asyncThread[i] = NULL; + // } + // } + // } for(size_t i = 0;i < MAX_CLIENT_NOTIFYEVENT_NUM;++i) { @@ -181,7 +142,6 @@ void CommunicatorEpoll::terminate() _terminate = true; //通知epoll响应 _terminateFDInfo.notify.notify(); - // _ep.mod(_shutdown.getfd(), 0, EPOLLOUT); } ObjectProxy * CommunicatorEpoll::getObjectProxy(const string & sObjectProxyName,const string& setName) @@ -219,22 +179,6 @@ void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue) } _notify[iSeq]->notify.notify(); - // if(_notify[iSeq].bValid) - // { - // _ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN); - // assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue); - // } - // else - // { - // _notify[iSeq].stFDInfo.iType = FDInfo::ET_C_NOTIFY; - // _notify[iSeq].stFDInfo.p = (void*)msgQueue; - // _notify[iSeq].stFDInfo.fd = _notify[iSeq].eventFd; - // _notify[iSeq].stFDInfo.iSeq = iSeq; - // _notify[iSeq].notify.createSocket(); - // _notify[iSeq].bValid = true; - - // _ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN); - // } } void CommunicatorEpoll::notifyDel(size_t iSeq) @@ -244,11 +188,6 @@ void CommunicatorEpoll::notifyDel(size_t iSeq) { _notify[iSeq]->notify.notify(); } - - // if(_notify[iSeq].bValid && NULL != _notify[iSeq].stFDInfo.p) - // { - // _ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN); - // } } @@ -275,16 +214,6 @@ void CommunicatorEpoll::handleInputImp(Transceiver * pTransceiver) } pTransceiver->doResponse(); - - // list done; - // if(pTransceiver->doResponse(done) > 0) - // { - // list::iterator it = done.begin(); - // for (; it != done.end(); ++it) - // { - // pTransceiver->getAdapterProxy()->finishInvoke(*it); - // } - // } } void CommunicatorEpoll::handleOutputImp(Transceiver * pTransceiver) @@ -394,7 +323,6 @@ void CommunicatorEpoll::handle(FDInfo * pFDInfo, const epoll_event &ev) Transceiver *pTransceiver = (Transceiver*)pFDInfo->p; //先收包 - // if (events & EPOLLIN) if(TC_Epoller::readEvent(ev)) { try @@ -412,7 +340,6 @@ void CommunicatorEpoll::handle(FDInfo * pFDInfo, const epoll_event &ev) } //发包 - // if (events & EPOLLOUT) if(TC_Epoller::writeEvent(ev)) { try @@ -430,7 +357,6 @@ void CommunicatorEpoll::handle(FDInfo * pFDInfo, const epoll_event &ev) } //连接出错 直接关闭连接 - // if(events & EPOLLERR) if(TC_Epoller::errorEvent(ev)) { try @@ -471,11 +397,6 @@ void CommunicatorEpoll::doTimeout() for(size_t i = 0; i < _objectProxyFactory->getObjNum(); ++i) { - // const vector & vAdapterProxy=_objectProxyFactory->getObjectProxy(i)->getAdapters(); - // for(size_t iAdapter=0;iAdapterdoTimeout(); - // } _objectProxyFactory->getObjectProxy(i)->doTimeout(); } } @@ -490,16 +411,18 @@ void CommunicatorEpoll::doStat() _nextStatTime = iNow + 10; if(isFirstNetThread()) { - //异步队列长度上报 - if(_reportAsyncQueue) - { - size_t n = 0; - for(size_t i = 0;i < _asyncThreadNum; ++i) - { - n = n + _asyncThread[i]->getSize(); - } - _reportAsyncQueue->report(n); - } + + _communicator->doStat(); + // //异步队列长度上报 + // if(_reportAsyncQueue) + // { + // size_t n = 0; + // for(size_t i = 0;i < _asyncThreadNum; ++i) + // { + // n = n + _asyncThread[i]->getSize(); + // } + // _reportAsyncQueue->report(n); + // } } StatReport::MapStatMicMsg mStatMicMsg; @@ -507,12 +430,6 @@ void CommunicatorEpoll::doStat() for(size_t i = 0;i < _objectProxyFactory->getObjNum(); ++i) { _objectProxyFactory->getObjectProxy(i)->mergeStat(mStatMicMsg); - - // const vector & vAdapterProxy = _objectProxyFactory->getObjectProxy(i)->getAdapters(); - // for(size_t iAdapter = 0;iAdapter < vAdapterProxy.size(); ++iAdapter) - // { - // vAdapterProxy[iAdapter]->doStat(mStatMicMsg); - // } } //有数据才上报 @@ -525,14 +442,15 @@ void CommunicatorEpoll::doStat() void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage * msg) { - //先不考虑每个线程队列数目不一致的情况 - _asyncThread[_asyncSeq]->push_back(msg); - _asyncSeq ++; + _communicator->pushAsyncThreadQueue(msg); + // //先不考虑每个线程队列数目不一致的情况 + // _asyncThread[_asyncSeq]->push_back(msg); + // _asyncSeq ++; - if(_asyncSeq == _asyncThreadNum) - { - _asyncSeq = 0; - } + // if(_asyncSeq == _asyncThreadNum) + // { + // _asyncSeq = 0; + // } } void CommunicatorEpoll::run() @@ -562,7 +480,6 @@ void CommunicatorEpoll::run() const epoll_event& ev = _ep.get(i); uint64_t data = TC_Epoller::getU64(ev); - // uint64_t data = ev.data.u64; if(data == 0) { diff --git a/servant/libservant/CoroutineScheduler.cpp b/servant/libservant/CoroutineScheduler.cpp index 0f8b510..f84c90d 100644 --- a/servant/libservant/CoroutineScheduler.cpp +++ b/servant/libservant/CoroutineScheduler.cpp @@ -39,99 +39,6 @@ namespace tars { -// //////////////////////////////////////////////////////// -// std::size_t pagesize() -// { -// static std::size_t size = ::sysconf( _SC_PAGESIZE); -// return size; -// } - -// rlimit stacksize_limit_() -// { -// rlimit limit; - -// const int result = ::getrlimit( RLIMIT_STACK, & limit); -// assert( 0 == result); - -// return limit; -// } - -// rlimit stacksize_limit() -// { -// static rlimit limit = stacksize_limit_(); -// return limit; -// } - -// std::size_t page_count( std::size_t stacksize) -// { -// return static_cast< std::size_t >( std::ceil(static_cast< float >(stacksize) / pagesize() ) ); -// } - -// bool standard_stack_allocator::is_stack_unbound() -// { -// return RLIM_INFINITY == stacksize_limit().rlim_max; -// } - -// std::size_t standard_stack_allocator::default_stacksize() -// { -// std::size_t size = 8 * minimum_stacksize(); -// if ( is_stack_unbound() ) return size; - -// assert( maximum_stacksize() >= minimum_stacksize() ); -// return maximum_stacksize() == size ? size : (std::min)( size, maximum_stacksize() ); -// } - -// std::size_t standard_stack_allocator::minimum_stacksize() -// { -// return 8 * 1024 + sizeof(fcontext_t) + 15; -// } - -// std::size_t standard_stack_allocator::maximum_stacksize() -// { -// assert( ! is_stack_unbound() ); -// return static_cast< std::size_t >( stacksize_limit().rlim_max); -// } - -// int standard_stack_allocator::allocate( stack_context & ctx, std::size_t size) -// { -// assert( minimum_stacksize() <= size); -// assert( is_stack_unbound() || ( maximum_stacksize() >= size) ); - -// const std::size_t pages( page_count( size) + 1); -// const std::size_t size_( pages * pagesize() ); -// assert( 0 < size && 0 < size_); - -// void * limit = ::mmap( 0, size_, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0); - -// if (limit == (void *) -1) -// { -// TLOGERROR("[TARS][[standard_stack_allocator::allocate memory failed]" << endl); -// return -1; -// } - -// std::memset( limit, '\0', size_); - -// const int result( ::mprotect( limit, pagesize(), PROT_NONE) ); -// assert( 0 == result); - -// ctx.size = size_; -// ctx.sp = static_cast< char * >( limit) + ctx.size; - -// return 0; -// } - -// void standard_stack_allocator::deallocate( stack_context & ctx) -// { -// assert( ctx.sp); -// assert( minimum_stacksize() <= ctx.size); -// assert( is_stack_unbound() || ( maximum_stacksize() >= ctx.size) ); - -// void * limit = static_cast< char * >( ctx.sp) - ctx.size; - -// ::munmap( limit, ctx.size); -// } - - #if TARGET_PLATFORM_WINDOWS // x86_64 diff --git a/servant/servant/AsyncProcThread.h b/servant/servant/AsyncProcThread.h index 46a15eb..cf7dc0a 100644 --- a/servant/servant/AsyncProcThread.h +++ b/servant/servant/AsyncProcThread.h @@ -27,7 +27,7 @@ namespace tars /** * 异步回调后的处理线程 */ -class AsyncProcThread : public TC_Thread, public TC_HandleBase, public TC_ThreadLock +class AsyncProcThread : public TC_Thread, public TC_ThreadLock { public: /** @@ -75,7 +75,6 @@ private: * 异步队列 */ TC_CasQueue * _msgQueue; - // ReqInfoQueue * _msgQueue; /** * 队列流量控制 diff --git a/servant/servant/Communicator.h b/servant/servant/Communicator.h index 394a818..01710e1 100644 --- a/servant/servant/Communicator.h +++ b/servant/servant/Communicator.h @@ -246,6 +246,18 @@ protected: */ ServantProxy * getServantProxy(const string& objectName,const string& setName=""); + /** + * 数据加入到异步线程队列里面 + * @return + */ + void pushAsyncThreadQueue(ReqMessage * msg); + + /** + * 上报统计事件 + * @return + */ + void doStat(); + /** * 框架内部需要直接访问通信器的类 */ @@ -309,6 +321,26 @@ protected: */ int64_t _minTimeout; + /* + * 异步线程数组 + */ + //异步线程(跨通信器共享) + vector _asyncThread;//[MAX_THREAD_NUM]; + + /* + * 异步队列的统计上报的对象 + */ + PropertyReportPtr _reportAsyncQueue; + + /* + * 异步线程数目 + */ + size_t _asyncThreadNum; + /* + * 分发给异步线程的索引seq + */ + size_t _asyncSeq; + #ifdef _USE_OPENTRACKING public: struct TraceManager:public TC_HandleBase{ diff --git a/servant/servant/CommunicatorEpoll.h b/servant/servant/CommunicatorEpoll.h index 2df9991..c8ac81f 100644 --- a/servant/servant/CommunicatorEpoll.h +++ b/servant/servant/CommunicatorEpoll.h @@ -82,34 +82,6 @@ struct FDInfo class CommunicatorEpoll : public TC_Thread ,public TC_ThreadRecMutex { public: - // struct NotifyInfo - // { - // /** - // * 构造函数 - // */ - // NotifyInfo() - // : eventFd(-1) - // , bValid(false) - // { - // } - - // /** - // * 析构函数 - // */ - // ~NotifyInfo() - // { - // } - - - // FDInfo stFDInfo; //通知FD信息 - - // TC_Socket notify; //通知fd - - // int eventFd; //eventfd,目前未使用 - - // bool bValid; //是否有效 - // }; - /** * 构造函数 */ @@ -194,14 +166,9 @@ public: void delFd(int fd,FDInfo * info, uint32_t events); void modFd(int fd,FDInfo * info, uint32_t events); - /** - * 通知事件过来 - * @param fd - */ - // void notify(size_t iSeq,ReqInfoQueue * pReqQueue); /** - * 通知删除事件过来 + * 通知事件过来 * @param iSeq */ void notify(size_t iSeq,ReqInfoQueue * pReqQueue); @@ -252,19 +219,7 @@ protected: */ Communicator * _communicator; - // bool _notifySignal[MAX_CLIENT_NOTIFYEVENT_NUM]; - FDInfo* _notify[MAX_CLIENT_NOTIFYEVENT_NUM]; - /* - * 请求事件通知数组 - */ - // NotifyInfo _notify[MAX_CLIENT_NOTIFYEVENT_NUM]; - - /* - * 关闭线程请求的事件通知 - */ - // TC_Socket _shutdown; - /* * 线程是否终止 */ @@ -292,27 +247,26 @@ protected: */ ObjectProxyFactory * _objectProxyFactory; - /* - * 异步线程数组 - */ - // AsyncProcThread * _asyncThread[MAX_CLIENT_ASYNCTHREAD_NUM]; - //异步线程(跨通信器共享) - static vector _asyncThread;//[MAX_THREAD_NUM]; + // /* + // * 异步线程数组 + // */ + // //异步线程(跨通信器共享) + // vector _asyncThread;//[MAX_THREAD_NUM]; - /* - * 异步队列的统计上报的对象 - */ - static PropertyReport * _reportAsyncQueue; + // /* + // * 异步队列的统计上报的对象 + // */ + // PropertyReport * _reportAsyncQueue; - /* - * 异步线程数目 - */ - size_t _asyncThreadNum; + // /* + // * 异步线程数目 + // */ + // size_t _asyncThreadNum; /* * 分发给异步线程的索引seq */ - size_t _asyncSeq; + // size_t _asyncSeq; /* * 网络线程的id号 @@ -323,12 +277,7 @@ protected: * 节点ip队列未发送请求的大小限制 */ size_t _noSendQueueLimit; - - /* - * epoll wait的超时时间 - */ - // int64_t _waitTimeout; - + /* * 超时的检查时间间隔 */ diff --git a/servant/servant/ServantProxy.h b/servant/servant/ServantProxy.h index cd30f8c..bfa623d 100644 --- a/servant/servant/ServantProxy.h +++ b/servant/servant/ServantProxy.h @@ -195,8 +195,7 @@ public: { TC_LockT lock(_mutex); - vRet = _vReqMessage; - _vReqMessage.clear(); + vRet.swap(_vReqMessage); } return vRet; diff --git a/tools/tars2cpp/tars2cpp.cpp b/tools/tars2cpp/tars2cpp.cpp index 9c659ef..12de890 100644 --- a/tools/tars2cpp/tars2cpp.cpp +++ b/tools/tars2cpp/tars2cpp.cpp @@ -1217,7 +1217,7 @@ string Tars2Cpp::generateDispatchExceptionAsync(const OperationPtr& pPtr, const // return s.str(); // } -string Tars2Cpp::generateDispatchCoroResponseAsync(const OperationPtr& pPtr, const string& cn) const +string Tars2Cpp::generateDispatchCoroExceptionAsync(const OperationPtr& pPtr, const string& cn) const { ostringstream s; @@ -1232,7 +1232,7 @@ string Tars2Cpp::generateDispatchCoroResponseAsync(const OperationPtr& pPtr, con } -string Tars2Cpp::generateDispatchCoroExceptionAsync(const OperationPtr& pPtr, const string& cn) const +string Tars2Cpp::generateDispatchCoroResponseAsync(const OperationPtr& pPtr, const string& cn) const { ostringstream s; diff --git a/util/src/tc_epoll_server.cpp b/util/src/tc_epoll_server.cpp index 252794e..6e38025 100644 --- a/util/src/tc_epoll_server.cpp +++ b/util/src/tc_epoll_server.cpp @@ -3075,14 +3075,14 @@ void TC_EpollServer::waitForShutdown() } int64_t iLastCheckTime = TNOWMS; - while (!_bTerminate) - { - try - { + // while (!_bTerminate) + // { + // try + // { //循环监听网路连接请求 while (!_bTerminate) { - int iEvNum = _epoller.wait(300); + int iEvNum = _epoller.wait(3000); if (_bTerminate) break; @@ -3136,18 +3136,22 @@ void TC_EpollServer::waitForShutdown() { error("run exception:" + string(ex.what())); } + catch (...) + { + error("run exception"); + } } } - } - catch (exception &ex) - { - error(string("TC_EpollServer::waitForShutdown error : ") + ex.what()); - } - catch (...) - { - error("TC_EpollServer::waitForShutdown unknown error"); - } - } + // } + // catch (exception &ex) + // { + // error(string("TC_EpollServer::waitForShutdown error : ") + ex.what()); + // } + // catch (...) + // { + // error("TC_EpollServer::waitForShutdown unknown error"); + // } + // } for (size_t i = 0; i < _netThreads.size(); ++i) {