From dded31f1226d722df3672e533519da3c8f238a94 Mon Sep 17 00:00:00 2001 From: ruanshudong Date: Sun, 2 Feb 2020 20:35:17 +0800 Subject: [PATCH] add config, run-push run-co, fix initialize bug --- cmake/BuildTarsServer.cmake | 20 +++ examples/CMakeLists.txt | 5 + examples/CoroutineDemo/AServer/config.conf | 70 +++++++++++ examples/CoroutineDemo/BServer/config.conf | 72 +++++++++++ .../PushDemo/PushClient/TestRecvThread.cpp | 115 ++++++------------ examples/PushDemo/PushClient/TestRecvThread.h | 13 +- examples/PushDemo/PushClient/main.cpp | 14 ++- .../PushServer/TestPushServantImp.cpp | 2 +- .../PushDemo/PushServer/TestPushServer.cpp | 40 +++++- .../PushDemo/PushServer/TestPushThread.cpp | 21 +--- examples/PushDemo/PushServer/TestPushThread.h | 3 +- examples/PushDemo/PushServer/config.conf | 70 +++++++++++ .../HelloServer/Client/main.cpp | 40 ------ .../ProxyServer/Client/main.cpp | 7 +- .../ProxyServer/Server/config.conf | 70 +++++++++++ examples/scripts/run-co.sh | 43 +++++++ examples/scripts/run-push.sh | 24 ++++ examples/scripts/run-quick-start.sh | 12 +- servant/libservant/AdapterProxy.cpp | 4 +- servant/libservant/Application.cpp | 3 - servant/libservant/AsyncProcThread.cpp | 2 +- servant/libservant/Communicator.cpp | 14 +-- util/src/tc_thread.cpp | 115 ------------------ 23 files changed, 488 insertions(+), 291 deletions(-) create mode 100755 examples/CoroutineDemo/AServer/config.conf create mode 100755 examples/CoroutineDemo/BServer/config.conf create mode 100755 examples/PushDemo/PushServer/config.conf create mode 100755 examples/QuickStartDemo/ProxyServer/Server/config.conf create mode 100644 examples/scripts/run-co.sh create mode 100644 examples/scripts/run-push.sh diff --git a/cmake/BuildTarsServer.cmake b/cmake/BuildTarsServer.cmake index 4486c2a..b90be6b 100644 --- a/cmake/BuildTarsServer.cmake +++ b/cmake/BuildTarsServer.cmake @@ -54,4 +54,24 @@ macro(build_tars_server MODULE DEPS) target_link_libraries(${MODULE} tarsservant tarsutil) + SET(MODULE-TGZ "${CMAKE_BINARY_DIR}/${MODULE}.tgz") + SET(RUN_DEPLOY_COMMAND_FILE "${PROJECT_BINARY_DIR}/run-deploy-${MODULE}.cmake") + FILE(WRITE ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(COMMAND ${CMAKE_COMMAND} -E make_directory ${PROJECT_BINARY_DIR}/deploy/${MODULE})\n") + IF(WIN32) + message(${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${CMAKE_BUILD_TYPE}/${MODULE}.exe) + FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${CMAKE_BUILD_TYPE}/${MODULE}.exe ${PROJECT_BINARY_DIR}/deploy/${MODULE}/)\n") + FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(WORKING_DIRECTORY ${PROJECT_BINARY_DIR}/deploy/ \n COMMAND ${CMAKE_COMMAND} -E tar czfv ${MODULE-TGZ} ${MODULE})\n") + ELSE() + FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${MODULE} ${PROJECT_BINARY_DIR}/deploy/${MODULE}/)\n") + FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(WORKING_DIRECTORY ${PROJECT_BINARY_DIR}/deploy/ \n COMMAND ${CMAKE_COMMAND} -E tar czfv ${MODULE-TGZ} ${MODULE})\n") + ENDIF() + + #执行命令 + add_custom_command(OUTPUT ${MODULE-TGZ} + DEPENDS ${MODULE} + COMMAND ${CMAKE_COMMAND} -P ${RUN_DEPLOY_COMMAND_FILE} + COMMENT "call ${RUN_DEPLOY_COMMAND_FILE}") + + add_custom_target(${MODULE}-tar DEPENDS ${MODULE-TGZ}) + endmacro() diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 40011fb..5d4f3e4 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -20,3 +20,8 @@ add_custom_target(run-co DEPENDS CoroutineDemoAServer CoroutineDemoBServer CoroutineDemoClient testCoro testParallelCoro COMMAND sh ${CMAKE_CURRENT_SOURCE_DIR}/scripts/run-co.sh COMMENT "call run co") + +add_custom_target(run-push + DEPENDS PushServer PushClient + COMMAND sh ${CMAKE_CURRENT_SOURCE_DIR}/scripts/run-push.sh + COMMENT "call run push") diff --git a/examples/CoroutineDemo/AServer/config.conf b/examples/CoroutineDemo/AServer/config.conf new file mode 100755 index 0000000..3ec2c4f --- /dev/null +++ b/examples/CoroutineDemo/AServer/config.conf @@ -0,0 +1,70 @@ + + + + #tarsregistry locator + locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890 + #max invoke timeout + max-invoke-timeout = 5000 + #refresh endpoint interval + refresh-endpoint-interval = 10000 + #stat obj + stat = tars.tarsstat.StatObj + #max send queue length limit + sendqueuelimit = 100000 + #async queue length limit + asyncqueuecap = 100000 + #async callback thread num + asyncthread = 3 + #net thread + netthread = 1 + #merge net and sync thread + mergenetasync = 0 + #module name + modulename = Test.AServer + + + + #not cout + closecout = 0 + #app name + app = Test + #server name + server = AServer + #path + basepath = ./ + datapath = ./ + #log path + logpath = ./ + #merge net and imp thread + mergenetimp = 0 + #local ip, for tarsnode +# local = tcp -h 127.0.0.1 -p 15001 -t 10000 + + #tarsnode +# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000 + #config obj +# config = tars.tarsconfig.ConfigObj + #notify obj +# notify = tars.tarsconfig.NotifyObj + #log obj +# log = tars.tarslog.LogObj + + + #ip:port:timeout + endpoint = tcp -h 127.0.0.1 -p 9000 -t 10000 + #allow ip + allow = + #max connection num + maxconns = 4096 + #imp thread num + threads = 5 + #servant + servant = Test.AServer.AServantObj + #queue capacity + queuecap = 1000000 + #tars protocol + protocol = tars + + + + diff --git a/examples/CoroutineDemo/BServer/config.conf b/examples/CoroutineDemo/BServer/config.conf new file mode 100755 index 0000000..58cdd8d --- /dev/null +++ b/examples/CoroutineDemo/BServer/config.conf @@ -0,0 +1,72 @@ + + + + #tarsregistry locator + locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890 + #max invoke timeout + max-invoke-timeout = 5000 + #refresh endpoint interval + refresh-endpoint-interval = 10000 + #stat obj + stat = tars.tarsstat.StatObj + #max send queue length limit + sendqueuelimit = 100000 + #async queue length limit + asyncqueuecap = 100000 + #async callback thread num + asyncthread = 3 + #net thread + netthread = 1 + #merge net and sync thread + mergenetasync = 0 + #module name + modulename = Test.BServer + + + + #not cout + closecout = 0 + #app name + app = Test + #server name + server = BServer + #path + basepath = ./ + datapath = ./ + #log path + logpath = ./ + #merge net and imp thread + mergenetimp = 0 + #opencoroutine + opencoroutine = 1 + #local ip, for tarsnode +# local = tcp -h 127.0.0.1 -p 15001 -t 10000 + + #tarsnode +# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000 + #config obj +# config = tars.tarsconfig.ConfigObj + #notify obj +# notify = tars.tarsconfig.NotifyObj + #log obj +# log = tars.tarslog.LogObj + + + #ip:port:timeout + endpoint = tcp -h 127.0.0.1 -p 9100 -t 10000 + #allow ip + allow = + #max connection num + maxconns = 4096 + #imp thread num + threads = 5 + #servant + servant = Test.BServer.BServantObj + #queue capacity + queuecap = 1000000 + #tars protocol + protocol = tars + + + + diff --git a/examples/PushDemo/PushClient/TestRecvThread.cpp b/examples/PushDemo/PushClient/TestRecvThread.cpp index 8b895f6..68bf2f8 100644 --- a/examples/PushDemo/PushClient/TestRecvThread.cpp +++ b/examples/PushDemo/PushClient/TestRecvThread.cpp @@ -5,7 +5,7 @@ /* 响应包解码函数,根据特定格式解码从服务端收到的数据,解析为ResponsePacket */ -static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, ResponsePacket& done) +static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, ResponsePacket& rsp) { size_t len = sizeof(tars::Int32); @@ -26,7 +26,7 @@ static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, Response iHeaderLen = ntohl(iHeaderLen); //做一下保护,长度大于M - if (iHeaderLen > 100000 || iHeaderLen < sizeof(unsigned int)) + if (iHeaderLen > 100000 || iHeaderLen < (int)sizeof(unsigned int)) { throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen)); } @@ -37,101 +37,58 @@ static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, Response return TC_NetWorkBuffer::PACKET_LESS; } + in.moveHeader(sizeof(iHeaderLen)); + tars::Int32 iRequestId = 0; string sRequestId; - in.getHeader(sizeof(iHeaderLen), sRequestId); + in.getHeader(sizeof(iRequestId), sRequestId); + in.moveHeader(sizeof(iRequestId)); - ResponsePacket rsp; rsp.iRequestId = ntohl(*((unsigned int *)(sRequestId.c_str()))); - in.getHeader(iHeaderLen - sizeof(iHeaderLen) - sizeof(iRequestId), rsp.sBuffer); + len = iHeaderLen - sizeof(iHeaderLen) - sizeof(iRequestId); + in.getHeader(len, rsp.sBuffer); + in.moveHeader(len); return TC_NetWorkBuffer::PACKET_FULL; } /* 请求包编码函数,本函数的打包格式为 - 整个包长度(字节)+iRequestId(字节)+包内容 + 整个包长度(4字节)+iRequestId(4字节)+包内容 */ -static void pushRequest(const RequestPacket& request, shared_ptr& buff) +static void pushRequest(const RequestPacket& request, shared_ptr& sbuff) { - // unsigned int net_bufflength = htonl(request.sBuffer.size()+8); - // unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength); + unsigned int net_bufflength = htonl(request.sBuffer.size()+8); + unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength); - // buff = ""; - // for (int i = 0; i<4; ++i) - // { - // buff += *bufflengthptr++; - // } + vector buffer; + buffer.resize(request.sBuffer.size()+8); - // unsigned int netrequestId = htonl(request.iRequestId); - // unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId); + memcpy(buffer.data(), bufflengthptr, sizeof(unsigned int)); - // for (int i = 0; i<4; ++i) - // { - // buff += *netrequestIdptr++; - // } + unsigned int netrequestId = htonl(request.iRequestId); + unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId); - // string tmp; - // tmp.assign((const char*)(&request.sBuffer[0]), request.sBuffer.size()); - // buff+=tmp; + memcpy(buffer.data() + sizeof(unsigned int), netrequestIdptr, sizeof(unsigned int)); + memcpy(buffer.data() + sizeof(unsigned int) * 2, request.sBuffer.data(), request.sBuffer.size()); - TarsOutputStream os; - - tars::Int32 iHeaderLen = 0; - tars::Int32 iRequestId = request.iRequestId; - -// 先预留8个字节长度 - os.writeBuf((const char *)&iHeaderLen, sizeof(iHeaderLen)); - os.writeBuf((const char *)&iRequestId, sizeof(iRequestId)); - - request.writeTo(os); - - buff->swap(os.getByteBuffer()); - - // assert(buff->length() >= 4); - - iHeaderLen = htonl((int)(buff->length())); - - memcpy((void*)buff->buffer(), (const char *)&iHeaderLen, sizeof(iHeaderLen)); + sbuff->addBuffer(buffer); } static void printResult(int iRequestId, const string &sResponseStr) { - cout << "request id: " << iRequestId << endl; - cout << "response str: " << sResponseStr << endl; + cout << "request id: " << iRequestId << ", response str: " << sResponseStr << endl; } + static void printPushInfo(const string &sResponseStr) { cout << "push message: " << sResponseStr << endl; } -// int TestPushCallBack::onRequestException(int iRet) -// { -// return 0; -// } - -// int TestPushCallBack::onRequestResponse(const tars::RequestPacket& request, const tars::ResponsePacket& response) -// { -// string sRet; -// cout << "sBuffer: " << response.sBuffer.size() << endl; -// sRet.assign(&(response.sBuffer[0]), response.sBuffer.size()); -// printResult(response.iRequestId, sRet); -// return 0; -// } - -// int TestPushCallBack::onPushResponse(const tars::ResponsePacket& response) -// { -// string sRet; -// sRet.assign(&(response.sBuffer[0]), response.sBuffer.size()); -// printPushInfo(sRet); -// return 0; -// } - int TestPushCallBack::onDispatch(ReqMessagePtr msg) { if(msg->request.sFuncName == "printResult") { string sRet; - cout << "sBuffer: " << msg->response->sBuffer.size() << endl; sRet.assign(&(msg->response->sBuffer[0]), msg->response->sBuffer.size()); printResult(msg->request.iRequestId, sRet); return 0; @@ -150,12 +107,11 @@ int TestPushCallBack::onDispatch(ReqMessagePtr msg) return -3; } -RecvThread::RecvThread():_bTerminate(false) +RecvThread::RecvThread(int second):_second(second), _bTerminate(false) { - string sObjName = "Test.TestPushServer.TestPushServantObj"; - string sObjHost = "tcp -h 10.120.129.226 -t 60000 -p 10099"; + string sObjName = "TestApp.PushServer.TestPushServantObj@tcp -h 127.0.0.1 -t 60000 -p 9300"; - _prx = _comm.stringToProxy(sObjName+"@"+sObjHost); + _prx = _comm.stringToProxy(sObjName); ProxyProtocol prot; prot.requestFunc = pushRequest; @@ -164,15 +120,6 @@ RecvThread::RecvThread():_bTerminate(false) _prx->tars_set_protocol(prot); } -void RecvThread::terminate() -{ - _bTerminate = true; - { - tars::TC_ThreadLock::Lock sync(*this); - notifyAll(); - } -} - void RecvThread::run(void) { TestPushCallBackPtr cbPush = new TestPushCallBack(); @@ -180,6 +127,8 @@ void RecvThread::run(void) string buf("heartbeat"); + time_t n = TNOW; + while(!_bTerminate) { { @@ -198,9 +147,15 @@ void RecvThread::run(void) } } + if(TNOW - n >= _second) + { + _bTerminate = true; + break; + } + { TC_ThreadLock::Lock sync(*this); - timedWait(5000); + timedWait(500); } } } diff --git a/examples/PushDemo/PushClient/TestRecvThread.h b/examples/PushDemo/PushClient/TestRecvThread.h index 1049711..b88becd 100644 --- a/examples/PushDemo/PushClient/TestRecvThread.h +++ b/examples/PushDemo/PushClient/TestRecvThread.h @@ -9,26 +9,17 @@ public: virtual int onDispatch(ReqMessagePtr msg); }; -// class TestPushCallBack : public tars::PushCallback -// { -// public: -// virtual int onRequestException(int iRet); -// virtual int onRequestResponse(const tars::RequestPacket& request, const tars::ResponsePacket& response); -// virtual int onPushResponse(const tars::ResponsePacket& response); -// }; - - typedef tars::TC_AutoPtr TestPushCallBackPtr; class RecvThread : public TC_Thread, public TC_ThreadLock { public: - RecvThread(); + RecvThread(int second); virtual void run(); - void terminate(); private: + int _second; bool _bTerminate; Communicator _comm; diff --git a/examples/PushDemo/PushClient/main.cpp b/examples/PushDemo/PushClient/main.cpp index 613f245..4c67422 100644 --- a/examples/PushDemo/PushClient/main.cpp +++ b/examples/PushDemo/PushClient/main.cpp @@ -9,13 +9,17 @@ int main(int argc,char**argv) { try { - RecvThread thread; + int second = 5; + + if(argc > 1) + second = TC_Common::strto(argv[1]); + + if(second <=0 ) + second = 1; + + RecvThread thread(second); thread.start(); - int c; - while((c = getchar()) != 'q'); - - thread.terminate(); thread.getThreadControl().join(); } catch(std::exception&e) diff --git a/examples/PushDemo/PushServer/TestPushServantImp.cpp b/examples/PushDemo/PushServer/TestPushServantImp.cpp index 1d8dd71..cabefcd 100644 --- a/examples/PushDemo/PushServer/TestPushServantImp.cpp +++ b/examples/PushDemo/PushServer/TestPushServantImp.cpp @@ -30,7 +30,7 @@ int TestPushServantImp::doRequest(tars::TarsCurrentPtr current, vector& re LOG->debug() << "connect ip: " << current->getIp() << endl; } (PushUser::mapMutex).unlock(); - //返回给客户端它自己请求的数据包,即原包返回 + //返回给客户端它自己请求的数据包,即原包返回(4字节长度+4字节requestid+buffer) const vector& request = current->getRequestBuffer(); response = request; diff --git a/examples/PushDemo/PushServer/TestPushServer.cpp b/examples/PushDemo/PushServer/TestPushServer.cpp index 5024578..3551e25 100644 --- a/examples/PushDemo/PushServer/TestPushServer.cpp +++ b/examples/PushDemo/PushServer/TestPushServer.cpp @@ -7,6 +7,42 @@ TestPushServer g_app; ///////////////////////////////////////////////////////////////// +static TC_NetWorkBuffer::PACKET_TYPE parse(TC_NetWorkBuffer &in, vector &out) +{ + size_t len = sizeof(tars::Int32); + + if (in.getBufferLength() < len) + { + return TC_NetWorkBuffer::PACKET_LESS; + } + + string header; + in.getHeader(len, header); + + assert(header.size() == len); + + tars::Int32 iHeaderLen = 0; + + ::memcpy(&iHeaderLen, header.c_str(), sizeof(tars::Int32)); + + iHeaderLen = ntohl(iHeaderLen); + + if (iHeaderLen > 100000 || iHeaderLen < sizeof(unsigned int)) + { + throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen)); + } + + if (in.getBufferLength() < (uint32_t)iHeaderLen) + { + return TC_NetWorkBuffer::PACKET_LESS; + } + + in.getHeader(iHeaderLen, out); + in.moveHeader(iHeaderLen); + + return TC_NetWorkBuffer::PACKET_FULL; +} + void TestPushServer::initialize() { @@ -15,11 +51,11 @@ TestPushServer::initialize() addServant(ServerConfig::Application + "." + ServerConfig::ServerName + ".TestPushServantObj"); - addServantProtocol("Test.TestPushServer.TestPushServantObj", TC_NetWorkBuffer::parseBinary4<8, 1024*1024*10>); + addServantProtocol("TestApp.PushServer.TestPushServantObj", parse); pushThread.start(); - } + ///////////////////////////////////////////////////////////////// void TestPushServer::destroyApp() diff --git a/examples/PushDemo/PushServer/TestPushThread.cpp b/examples/PushDemo/PushServer/TestPushThread.cpp index 5c9edd4..28c6b1b 100644 --- a/examples/PushDemo/PushServer/TestPushThread.cpp +++ b/examples/PushDemo/PushServer/TestPushThread.cpp @@ -38,30 +38,21 @@ void PushInfoThread::setPushInfo(const string &sInfo) //定期向客户push消息 void PushInfoThread::run(void) { - time_t iNow; - setPushInfo("hello world"); while (!_bTerminate) { - iNow = TC_TimeProvider::getInstance()->getNow(); - - if(iNow - _tLastPushTime > _tInterval) + (PushUser::mapMutex).lock(); + for(map::iterator it = (PushUser::pushUser).begin(); it != (PushUser::pushUser).end(); ++it) { - _tLastPushTime = iNow; - - (PushUser::mapMutex).lock(); - for(map::iterator it = (PushUser::pushUser).begin(); it != (PushUser::pushUser).end(); ++it) - { - (it->second)->sendResponse(_sPushInfo.c_str(), _sPushInfo.size()); - LOG->debug() << "sendResponse: " << _sPushInfo.size() <second)->sendResponse(_sPushInfo.c_str(), _sPushInfo.size()); + LOG->debug() << "sendResponse: " << _sPushInfo.size() < + + + #tarsregistry locator + locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890 + #max invoke timeout + max-invoke-timeout = 5000 + #refresh endpoint interval + refresh-endpoint-interval = 10000 + #stat obj + stat = tars.tarsstat.StatObj + #max send queue length limit + sendqueuelimit = 100000 + #async queue length limit + asyncqueuecap = 100000 + #async callback thread num + asyncthread = 3 + #net thread + netthread = 1 + #merge net and sync thread + mergenetasync = 0 + #module name + modulename = TestApp.PushServer + + + + #not cout + closecout = 0 + #app name + app = TestApp + #server name + server = PushServer + #path + basepath = ./ + datapath = ./ + #log path + logpath = ./ + #merge net and imp thread + mergenetimp = 0 + #local ip, for tarsnode +# local = tcp -h 127.0.0.1 -p 15001 -t 10000 + + #tarsnode +# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000 + #config obj +# config = tars.tarsconfig.ConfigObj + #notify obj +# notify = tars.tarsconfig.NotifyObj + #log obj +# log = tars.tarslog.LogObj + + + #ip:port:timeout + endpoint = tcp -h 127.0.0.1 -p 9300 -t 10000 + #allow ip + allow = + #max connection num + maxconns = 4096 + #imp thread num + threads = 5 + #servant + servant = TestApp.PushServer.TestPushServantObj + #queue capacity + queuecap = 1000000 + #tars protocol + protocol = not_tars + + + + diff --git a/examples/QuickStartDemo/HelloServer/Client/main.cpp b/examples/QuickStartDemo/HelloServer/Client/main.cpp index bfeb4dd..8b37dde 100644 --- a/examples/QuickStartDemo/HelloServer/Client/main.cpp +++ b/examples/QuickStartDemo/HelloServer/Client/main.cpp @@ -359,43 +359,3 @@ int main(int argc, char *argv[]) return 0; } - -// int main(int argc,char ** argv) -// { -// Communicator comm; - -// TarsRollLogger::getInstance()->logger()->setLogLevel(TarsRollLogger::TARS_LOG); -// try -// { -// HelloPrx prx; -// comm.stringToProxy("TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 8999" , prx); - -// try -// { -// string sReq("hello"); -// string sRsp(""); - -// int iRet = prx->testHello(sReq, sRsp); -// cout<<"iRet:"<= 0) { string sReq("hello"); @@ -45,7 +45,8 @@ int main(int argc,char ** argv) assert(sReq == sRsp); } - // cout<<"iRet:"< + + + #tarsregistry locator + locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890 + #max invoke timeout + max-invoke-timeout = 5000 + #refresh endpoint interval + refresh-endpoint-interval = 10000 + #stat obj + stat = tars.tarsstat.StatObj + #max send queue length limit + sendqueuelimit = 100000 + #async queue length limit + asyncqueuecap = 100000 + #async callback thread num + asyncthread = 3 + #net thread + netthread = 1 + #merge net and sync thread + mergenetasync = 0 + #module name + modulename = TestApp.ProxyServer + + + + #not cout + closecout = 0 + #app name + app = TestApp + #server name + server = ProxyServer + #path + basepath = ./ + datapath = ./ + #log path + logpath = ./ + #merge net and imp thread + mergenetimp = 0 + #local ip, for tarsnode +# local = tcp -h 127.0.0.1 -p 15001 -t 10000 + + #tarsnode +# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000 + #config obj +# config = tars.tarsconfig.ConfigObj + #notify obj +# notify = tars.tarsconfig.NotifyObj + #log obj +# log = tars.tarslog.LogObj + + + #ip:port:timeout + endpoint = tcp -h 127.0.0.1 -p 9200 -t 10000 + #allow ip + allow = + #max connection num + maxconns = 4096 + #imp thread num + threads = 5 + #servant + servant = TestApp.ProxyServer.ProxyObj + #queue capacity + queuecap = 1000000 + #tars protocol + protocol = tars + + + + diff --git a/examples/scripts/run-co.sh b/examples/scripts/run-co.sh new file mode 100644 index 0000000..faea578 --- /dev/null +++ b/examples/scripts/run-co.sh @@ -0,0 +1,43 @@ +#!/bin/bash + +echo "run-co.sh" + +WORKDIR=$(pwd) + +echo ${WORKDIR} + +killall -9 CoroutineDemoAServer + +echo "start server: ${WORKDIR}/../bin/CoroutineDemoAServer --config=${WORKDIR}/../../examples/CoroutineDemo/AServer/config.conf &" + +${WORKDIR}/../bin/CoroutineDemoAServer --config=${WORKDIR}/../../examples/CoroutineDemo/AServer/config.conf & + +#------------------------------------------------------------------------------------------------------- + +killall -9 CoroutineDemoBServer + +echo "start server: ${WORKDIR}/../bin/CoroutineDemoBServer --config=${WORKDIR}/../../examples/CoroutineDemo/BServer/config.conf &" + +${WORKDIR}/../bin/CoroutineDemoBServer --config=${WORKDIR}/../../examples/CoroutineDemo/BServer/config.conf & + + +#------------------------------------------------------------------------------------------------------- +sleep 3 + + +echo "client: ${WORKDIR}/../bin/CoroutineDemoClient" + +${WORKDIR}/../bin/CoroutineDemoClient 2 10000 0 + +${WORKDIR}/../bin/CoroutineDemoClient 2 10000 1 + +${WORKDIR}/../bin/testCoro 1000 + +${WORKDIR}/../bin/testParallelCoro 1000 + +#------------------------------------------------------------------------------------------------------- + +#killall -9 CoroutineDemoAServer +#killall -9 CoroutineDemoBServer + + diff --git a/examples/scripts/run-push.sh b/examples/scripts/run-push.sh new file mode 100644 index 0000000..a3b568a --- /dev/null +++ b/examples/scripts/run-push.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +echo "run-co.sh" + +WORKDIR=$(pwd) + +echo ${WORKDIR} + +killall -9 PushServer + +echo "start server: ${WORKDIR}/../bin/PushServer --config=${WORKDIR}/../../examples/PushDemo/PushServer/config.conf &" + +${WORKDIR}/../bin/PushServer --config=${WORKDIR}/../../examples/PushDemo/PushServer/config.conf & + +sleep 3 + +#------------------------------------------------------------------------------------------------------- + +echo "client: ${WORKDIR}/../bin/PushClient" + +${WORKDIR}/../bin/PushClient 5 + + + diff --git a/examples/scripts/run-quick-start.sh b/examples/scripts/run-quick-start.sh index 9f132b5..b61930a 100644 --- a/examples/scripts/run-quick-start.sh +++ b/examples/scripts/run-quick-start.sh @@ -6,11 +6,12 @@ WORKDIR=$(pwd) echo ${WORKDIR} -killall -9 QuickStartDemo +killall -9 QuickStartDemo ProxyServer echo "start server: ${WORKDIR}/../bin/QuickStartDemo --config=${WORKDIR}/../../examples/QuickStartDemo/HelloServer/Server/config.conf &" ${WORKDIR}/../bin/QuickStartDemo --config=${WORKDIR}/../../examples/QuickStartDemo/HelloServer/Server/config.conf & +${WORKDIR}/../bin/ProxyServer --config=${WORKDIR}/../../examples/QuickStartDemo/ProxyServer/Server/config.conf & sleep 3 @@ -24,9 +25,12 @@ ${WORKDIR}/../bin/QuickStartDemoClient --count=100000 --call=synctup --thread=2 ${WORKDIR}/../bin/QuickStartDemoClient --count=100000 --call=asynctup --thread=2 --buffersize=100 --netthread=2 -${WORKDIR}/../bin/ProxyServer --config=${WORKDIR}/../../examples/QuickStartDemo/ProxyServer/Server/config.conf & +echo "client: ${WORKDIR}/../bin/ProxyServerClient" + ${WORKDIR}/../bin/ProxyServerClient +sleep 2 + +killall -9 ProxyServer QuickStartDemo + -killall -9 QuickStartDemo -killall -9 ProxyServer \ No newline at end of file diff --git a/servant/libservant/AdapterProxy.cpp b/servant/libservant/AdapterProxy.cpp index dc4f9e3..330f700 100755 --- a/servant/libservant/AdapterProxy.cpp +++ b/servant/libservant/AdapterProxy.cpp @@ -520,8 +520,8 @@ void AdapterProxy::finishInvoke(shared_ptr & rsp) ReqMessage * msg = NULL; //requestid 为0 是push消息 - // if(rsp->iRequestId == 0) - if (rsp->cPacketType == TARSPUSH) + if(rsp->iRequestId == 0 || rsp->cPacketType == TARSPUSH) + // if (rsp->cPacketType == TARSPUSH) { if(!_objectProxy->getPushCallback()) { diff --git a/servant/libservant/Application.cpp b/servant/libservant/Application.cpp index 86bc82e..58b9f39 100644 --- a/servant/libservant/Application.cpp +++ b/servant/libservant/Application.cpp @@ -743,9 +743,6 @@ void Application::main(const TC_Option &option) //绑定对象和端口 bindAdapter(adapters); - //业务应用的初始化 - initialize(); - //输出所有adapter outAllAdapter(cout); diff --git a/servant/libservant/AsyncProcThread.cpp b/servant/libservant/AsyncProcThread.cpp index 376402e..6c9443a 100644 --- a/servant/libservant/AsyncProcThread.cpp +++ b/servant/libservant/AsyncProcThread.cpp @@ -64,7 +64,7 @@ void AsyncProcThread::push_back(ReqMessage * msg) else { if(_msgQueue->size() >= _iQueueCap) { - TLOGERROR("[TARS][AsyncProcThread::push_back] async_queue full." << endl); + TLOGERROR("[TARS][AsyncProcThread::push_back] async_queue full:" << _msgQueue->size() << ">=" << _iQueueCap << endl); delete msg; } else diff --git a/servant/libservant/Communicator.cpp b/servant/libservant/Communicator.cpp index 5bf4915..f06fe04 100644 --- a/servant/libservant/Communicator.cpp +++ b/servant/libservant/Communicator.cpp @@ -287,7 +287,7 @@ void Communicator::initialize() bool merge = TC_Common::strto(getProperty("mergenetasync", "0")); //异步队列的大小 - size_t iAsyncQueueCap = TC_Common::strto(getProperty("asyncqueuecap", "10-000")); + size_t iAsyncQueueCap = TC_Common::strto(getProperty("asyncqueuecap", "100000")); if(iAsyncQueueCap < 10000) { iAsyncQueueCap = 10000; @@ -382,13 +382,13 @@ void Communicator::doStat() void Communicator::pushAsyncThreadQueue(ReqMessage * msg) { //先不考虑每个线程队列数目不一致的情况 - _asyncThread[_asyncSeq]->push_back(msg); - _asyncSeq ++; + _asyncThread[(++_asyncSeq)%_asyncThreadNum]->push_back(msg); + // _asyncSeq ++; - if(_asyncSeq == _asyncThreadNum) - { - _asyncSeq = 0; - } + // if(_asyncSeq == _asyncThreadNum) + // { + // _asyncSeq = 0; + // } } void Communicator::terminate() diff --git a/util/src/tc_thread.cpp b/util/src/tc_thread.cpp index d56b934..84708b9 100644 --- a/util/src/tc_thread.cpp +++ b/util/src/tc_thread.cpp @@ -64,7 +64,6 @@ TC_Thread::TC_Thread() : _running(false), _th(NULL) TC_Thread::~TC_Thread() { - if(_th != NULL) { //如果资源没有被detach或者被join,则自己释放 @@ -150,119 +149,5 @@ size_t TC_Thread::CURRENT_THREADID() return threadId; } -// TC_ThreadControl::TC_ThreadControl(pthread_t thread) : _thread(thread) -// { -// } - -// TC_ThreadControl::TC_ThreadControl() : _thread(pthread_self()) -// { -// } - -// void TC_ThreadControl::join() -// { -// if(pthread_self() == _thread) -// { -// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] can't be called in the same thread"); -// } - -// void* ignore = 0; -// int rc = pthread_join(_thread, &ignore); -// if(rc != 0) -// { -// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] pthread_join error ", rc); -// } -// } - -// void TC_ThreadControl::detach() -// { -// if(pthread_self() == _thread) -// { -// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] can't be called in the same thread"); -// } - -// int rc = pthread_detach(_thread); -// if(rc != 0) -// { -// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] pthread_join error", rc); -// } -// } - -// pthread_t TC_ThreadControl::id() const -// { -// return _thread; -// } - -// void TC_ThreadControl::sleep(long millsecond) -// { -// struct timespec ts; -// ts.tv_sec = millsecond / 1000; -// ts.tv_nsec = (millsecond % 1000)*1000000; -// nanosleep(&ts, 0); -// } - -// void TC_ThreadControl::yield() -// { -// sched_yield(); -// } - -// TC_Thread::TC_Thread() : _running(false),_tid(-1) -// { -// } - -// void TC_Thread::threadEntry(TC_Thread *pThread) -// { -// pThread->_running = true; - -// { -// TC_ThreadLock::Lock sync(pThread->_lock); -// pThread->_lock.notifyAll(); -// } - -// try -// { -// pThread->run(); -// } -// catch(...) -// { -// pThread->_running = false; -// throw; -// } -// pThread->_running = false; -// } - -// TC_ThreadControl TC_Thread::start() -// { -// TC_ThreadLock::Lock sync(_lock); - -// if(_running) -// { -// throw TC_ThreadThreadControl_Exception("[TC_Thread::start] thread has start"); -// } - -// int ret = pthread_create(&_tid, -// 0, -// (void *(*)(void *))&threadEntry, -// (void *)this); - -// if(ret != 0) -// { -// throw TC_ThreadThreadControl_Exception("[TC_Thread::start] thread start error", ret); -// } - -// _lock.wait(); - -// return TC_ThreadControl(_tid); -// } - -// TC_ThreadControl TC_Thread::getThreadControl() const -// { -// return TC_ThreadControl(_tid); -// } - -// bool TC_Thread::isAlive() const -// { -// return _running; -// } - }