diff --git a/cmake/BuildTarsServer.cmake b/cmake/BuildTarsServer.cmake
index 4486c2a..b90be6b 100644
--- a/cmake/BuildTarsServer.cmake
+++ b/cmake/BuildTarsServer.cmake
@@ -54,4 +54,24 @@ macro(build_tars_server MODULE DEPS)
target_link_libraries(${MODULE} tarsservant tarsutil)
+ SET(MODULE-TGZ "${CMAKE_BINARY_DIR}/${MODULE}.tgz")
+ SET(RUN_DEPLOY_COMMAND_FILE "${PROJECT_BINARY_DIR}/run-deploy-${MODULE}.cmake")
+ FILE(WRITE ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(COMMAND ${CMAKE_COMMAND} -E make_directory ${PROJECT_BINARY_DIR}/deploy/${MODULE})\n")
+ IF(WIN32)
+ message(${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${CMAKE_BUILD_TYPE}/${MODULE}.exe)
+ FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${CMAKE_BUILD_TYPE}/${MODULE}.exe ${PROJECT_BINARY_DIR}/deploy/${MODULE}/)\n")
+ FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(WORKING_DIRECTORY ${PROJECT_BINARY_DIR}/deploy/ \n COMMAND ${CMAKE_COMMAND} -E tar czfv ${MODULE-TGZ} ${MODULE})\n")
+ ELSE()
+ FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${MODULE} ${PROJECT_BINARY_DIR}/deploy/${MODULE}/)\n")
+ FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(WORKING_DIRECTORY ${PROJECT_BINARY_DIR}/deploy/ \n COMMAND ${CMAKE_COMMAND} -E tar czfv ${MODULE-TGZ} ${MODULE})\n")
+ ENDIF()
+
+ #执行命令
+ add_custom_command(OUTPUT ${MODULE-TGZ}
+ DEPENDS ${MODULE}
+ COMMAND ${CMAKE_COMMAND} -P ${RUN_DEPLOY_COMMAND_FILE}
+ COMMENT "call ${RUN_DEPLOY_COMMAND_FILE}")
+
+ add_custom_target(${MODULE}-tar DEPENDS ${MODULE-TGZ})
+
endmacro()
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 40011fb..5d4f3e4 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -20,3 +20,8 @@ add_custom_target(run-co
DEPENDS CoroutineDemoAServer CoroutineDemoBServer CoroutineDemoClient testCoro testParallelCoro
COMMAND sh ${CMAKE_CURRENT_SOURCE_DIR}/scripts/run-co.sh
COMMENT "call run co")
+
+add_custom_target(run-push
+ DEPENDS PushServer PushClient
+ COMMAND sh ${CMAKE_CURRENT_SOURCE_DIR}/scripts/run-push.sh
+ COMMENT "call run push")
diff --git a/examples/CoroutineDemo/AServer/config.conf b/examples/CoroutineDemo/AServer/config.conf
new file mode 100755
index 0000000..3ec2c4f
--- /dev/null
+++ b/examples/CoroutineDemo/AServer/config.conf
@@ -0,0 +1,70 @@
+
+
+
+ #tarsregistry locator
+ locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
+ #max invoke timeout
+ max-invoke-timeout = 5000
+ #refresh endpoint interval
+ refresh-endpoint-interval = 10000
+ #stat obj
+ stat = tars.tarsstat.StatObj
+ #max send queue length limit
+ sendqueuelimit = 100000
+ #async queue length limit
+ asyncqueuecap = 100000
+ #async callback thread num
+ asyncthread = 3
+ #net thread
+ netthread = 1
+ #merge net and sync thread
+ mergenetasync = 0
+ #module name
+ modulename = Test.AServer
+
+
+
+ #not cout
+ closecout = 0
+ #app name
+ app = Test
+ #server name
+ server = AServer
+ #path
+ basepath = ./
+ datapath = ./
+ #log path
+ logpath = ./
+ #merge net and imp thread
+ mergenetimp = 0
+ #local ip, for tarsnode
+# local = tcp -h 127.0.0.1 -p 15001 -t 10000
+
+ #tarsnode
+# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
+ #config obj
+# config = tars.tarsconfig.ConfigObj
+ #notify obj
+# notify = tars.tarsconfig.NotifyObj
+ #log obj
+# log = tars.tarslog.LogObj
+
+
+ #ip:port:timeout
+ endpoint = tcp -h 127.0.0.1 -p 9000 -t 10000
+ #allow ip
+ allow =
+ #max connection num
+ maxconns = 4096
+ #imp thread num
+ threads = 5
+ #servant
+ servant = Test.AServer.AServantObj
+ #queue capacity
+ queuecap = 1000000
+ #tars protocol
+ protocol = tars
+
+
+
+
diff --git a/examples/CoroutineDemo/BServer/config.conf b/examples/CoroutineDemo/BServer/config.conf
new file mode 100755
index 0000000..58cdd8d
--- /dev/null
+++ b/examples/CoroutineDemo/BServer/config.conf
@@ -0,0 +1,72 @@
+
+
+
+ #tarsregistry locator
+ locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
+ #max invoke timeout
+ max-invoke-timeout = 5000
+ #refresh endpoint interval
+ refresh-endpoint-interval = 10000
+ #stat obj
+ stat = tars.tarsstat.StatObj
+ #max send queue length limit
+ sendqueuelimit = 100000
+ #async queue length limit
+ asyncqueuecap = 100000
+ #async callback thread num
+ asyncthread = 3
+ #net thread
+ netthread = 1
+ #merge net and sync thread
+ mergenetasync = 0
+ #module name
+ modulename = Test.BServer
+
+
+
+ #not cout
+ closecout = 0
+ #app name
+ app = Test
+ #server name
+ server = BServer
+ #path
+ basepath = ./
+ datapath = ./
+ #log path
+ logpath = ./
+ #merge net and imp thread
+ mergenetimp = 0
+ #opencoroutine
+ opencoroutine = 1
+ #local ip, for tarsnode
+# local = tcp -h 127.0.0.1 -p 15001 -t 10000
+
+ #tarsnode
+# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
+ #config obj
+# config = tars.tarsconfig.ConfigObj
+ #notify obj
+# notify = tars.tarsconfig.NotifyObj
+ #log obj
+# log = tars.tarslog.LogObj
+
+
+ #ip:port:timeout
+ endpoint = tcp -h 127.0.0.1 -p 9100 -t 10000
+ #allow ip
+ allow =
+ #max connection num
+ maxconns = 4096
+ #imp thread num
+ threads = 5
+ #servant
+ servant = Test.BServer.BServantObj
+ #queue capacity
+ queuecap = 1000000
+ #tars protocol
+ protocol = tars
+
+
+
+
diff --git a/examples/PushDemo/PushClient/TestRecvThread.cpp b/examples/PushDemo/PushClient/TestRecvThread.cpp
index 8b895f6..68bf2f8 100644
--- a/examples/PushDemo/PushClient/TestRecvThread.cpp
+++ b/examples/PushDemo/PushClient/TestRecvThread.cpp
@@ -5,7 +5,7 @@
/*
响应包解码函数,根据特定格式解码从服务端收到的数据,解析为ResponsePacket
*/
-static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, ResponsePacket& done)
+static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, ResponsePacket& rsp)
{
size_t len = sizeof(tars::Int32);
@@ -26,7 +26,7 @@ static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, Response
iHeaderLen = ntohl(iHeaderLen);
//做一下保护,长度大于M
- if (iHeaderLen > 100000 || iHeaderLen < sizeof(unsigned int))
+ if (iHeaderLen > 100000 || iHeaderLen < (int)sizeof(unsigned int))
{
throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
}
@@ -37,101 +37,58 @@ static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, Response
return TC_NetWorkBuffer::PACKET_LESS;
}
+ in.moveHeader(sizeof(iHeaderLen));
+
tars::Int32 iRequestId = 0;
string sRequestId;
- in.getHeader(sizeof(iHeaderLen), sRequestId);
+ in.getHeader(sizeof(iRequestId), sRequestId);
+ in.moveHeader(sizeof(iRequestId));
- ResponsePacket rsp;
rsp.iRequestId = ntohl(*((unsigned int *)(sRequestId.c_str())));
- in.getHeader(iHeaderLen - sizeof(iHeaderLen) - sizeof(iRequestId), rsp.sBuffer);
+ len = iHeaderLen - sizeof(iHeaderLen) - sizeof(iRequestId);
+ in.getHeader(len, rsp.sBuffer);
+ in.moveHeader(len);
return TC_NetWorkBuffer::PACKET_FULL;
}
/*
请求包编码函数,本函数的打包格式为
- 整个包长度(字节)+iRequestId(字节)+包内容
+ 整个包长度(4字节)+iRequestId(4字节)+包内容
*/
-static void pushRequest(const RequestPacket& request, shared_ptr& buff)
+static void pushRequest(const RequestPacket& request, shared_ptr& sbuff)
{
- // unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
- // unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
+ unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
+ unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
- // buff = "";
- // for (int i = 0; i<4; ++i)
- // {
- // buff += *bufflengthptr++;
- // }
+ vector buffer;
+ buffer.resize(request.sBuffer.size()+8);
- // unsigned int netrequestId = htonl(request.iRequestId);
- // unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId);
+ memcpy(buffer.data(), bufflengthptr, sizeof(unsigned int));
- // for (int i = 0; i<4; ++i)
- // {
- // buff += *netrequestIdptr++;
- // }
+ unsigned int netrequestId = htonl(request.iRequestId);
+ unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId);
- // string tmp;
- // tmp.assign((const char*)(&request.sBuffer[0]), request.sBuffer.size());
- // buff+=tmp;
+ memcpy(buffer.data() + sizeof(unsigned int), netrequestIdptr, sizeof(unsigned int));
+ memcpy(buffer.data() + sizeof(unsigned int) * 2, request.sBuffer.data(), request.sBuffer.size());
- TarsOutputStream os;
-
- tars::Int32 iHeaderLen = 0;
- tars::Int32 iRequestId = request.iRequestId;
-
-// 先预留8个字节长度
- os.writeBuf((const char *)&iHeaderLen, sizeof(iHeaderLen));
- os.writeBuf((const char *)&iRequestId, sizeof(iRequestId));
-
- request.writeTo(os);
-
- buff->swap(os.getByteBuffer());
-
- // assert(buff->length() >= 4);
-
- iHeaderLen = htonl((int)(buff->length()));
-
- memcpy((void*)buff->buffer(), (const char *)&iHeaderLen, sizeof(iHeaderLen));
+ sbuff->addBuffer(buffer);
}
static void printResult(int iRequestId, const string &sResponseStr)
{
- cout << "request id: " << iRequestId << endl;
- cout << "response str: " << sResponseStr << endl;
+ cout << "request id: " << iRequestId << ", response str: " << sResponseStr << endl;
}
+
static void printPushInfo(const string &sResponseStr)
{
cout << "push message: " << sResponseStr << endl;
}
-// int TestPushCallBack::onRequestException(int iRet)
-// {
-// return 0;
-// }
-
-// int TestPushCallBack::onRequestResponse(const tars::RequestPacket& request, const tars::ResponsePacket& response)
-// {
-// string sRet;
-// cout << "sBuffer: " << response.sBuffer.size() << endl;
-// sRet.assign(&(response.sBuffer[0]), response.sBuffer.size());
-// printResult(response.iRequestId, sRet);
-// return 0;
-// }
-
-// int TestPushCallBack::onPushResponse(const tars::ResponsePacket& response)
-// {
-// string sRet;
-// sRet.assign(&(response.sBuffer[0]), response.sBuffer.size());
-// printPushInfo(sRet);
-// return 0;
-// }
-
int TestPushCallBack::onDispatch(ReqMessagePtr msg)
{
if(msg->request.sFuncName == "printResult")
{
string sRet;
- cout << "sBuffer: " << msg->response->sBuffer.size() << endl;
sRet.assign(&(msg->response->sBuffer[0]), msg->response->sBuffer.size());
printResult(msg->request.iRequestId, sRet);
return 0;
@@ -150,12 +107,11 @@ int TestPushCallBack::onDispatch(ReqMessagePtr msg)
return -3;
}
-RecvThread::RecvThread():_bTerminate(false)
+RecvThread::RecvThread(int second):_second(second), _bTerminate(false)
{
- string sObjName = "Test.TestPushServer.TestPushServantObj";
- string sObjHost = "tcp -h 10.120.129.226 -t 60000 -p 10099";
+ string sObjName = "TestApp.PushServer.TestPushServantObj@tcp -h 127.0.0.1 -t 60000 -p 9300";
- _prx = _comm.stringToProxy(sObjName+"@"+sObjHost);
+ _prx = _comm.stringToProxy(sObjName);
ProxyProtocol prot;
prot.requestFunc = pushRequest;
@@ -164,15 +120,6 @@ RecvThread::RecvThread():_bTerminate(false)
_prx->tars_set_protocol(prot);
}
-void RecvThread::terminate()
-{
- _bTerminate = true;
- {
- tars::TC_ThreadLock::Lock sync(*this);
- notifyAll();
- }
-}
-
void RecvThread::run(void)
{
TestPushCallBackPtr cbPush = new TestPushCallBack();
@@ -180,6 +127,8 @@ void RecvThread::run(void)
string buf("heartbeat");
+ time_t n = TNOW;
+
while(!_bTerminate)
{
{
@@ -198,9 +147,15 @@ void RecvThread::run(void)
}
}
+ if(TNOW - n >= _second)
+ {
+ _bTerminate = true;
+ break;
+ }
+
{
TC_ThreadLock::Lock sync(*this);
- timedWait(5000);
+ timedWait(500);
}
}
}
diff --git a/examples/PushDemo/PushClient/TestRecvThread.h b/examples/PushDemo/PushClient/TestRecvThread.h
index 1049711..b88becd 100644
--- a/examples/PushDemo/PushClient/TestRecvThread.h
+++ b/examples/PushDemo/PushClient/TestRecvThread.h
@@ -9,26 +9,17 @@ public:
virtual int onDispatch(ReqMessagePtr msg);
};
-// class TestPushCallBack : public tars::PushCallback
-// {
-// public:
-// virtual int onRequestException(int iRet);
-// virtual int onRequestResponse(const tars::RequestPacket& request, const tars::ResponsePacket& response);
-// virtual int onPushResponse(const tars::ResponsePacket& response);
-// };
-
-
typedef tars::TC_AutoPtr TestPushCallBackPtr;
class RecvThread : public TC_Thread, public TC_ThreadLock
{
public:
- RecvThread();
+ RecvThread(int second);
virtual void run();
- void terminate();
private:
+ int _second;
bool _bTerminate;
Communicator _comm;
diff --git a/examples/PushDemo/PushClient/main.cpp b/examples/PushDemo/PushClient/main.cpp
index 613f245..4c67422 100644
--- a/examples/PushDemo/PushClient/main.cpp
+++ b/examples/PushDemo/PushClient/main.cpp
@@ -9,13 +9,17 @@ int main(int argc,char**argv)
{
try
{
- RecvThread thread;
+ int second = 5;
+
+ if(argc > 1)
+ second = TC_Common::strto(argv[1]);
+
+ if(second <=0 )
+ second = 1;
+
+ RecvThread thread(second);
thread.start();
- int c;
- while((c = getchar()) != 'q');
-
- thread.terminate();
thread.getThreadControl().join();
}
catch(std::exception&e)
diff --git a/examples/PushDemo/PushServer/TestPushServantImp.cpp b/examples/PushDemo/PushServer/TestPushServantImp.cpp
index 1d8dd71..cabefcd 100644
--- a/examples/PushDemo/PushServer/TestPushServantImp.cpp
+++ b/examples/PushDemo/PushServer/TestPushServantImp.cpp
@@ -30,7 +30,7 @@ int TestPushServantImp::doRequest(tars::TarsCurrentPtr current, vector& re
LOG->debug() << "connect ip: " << current->getIp() << endl;
}
(PushUser::mapMutex).unlock();
- //返回给客户端它自己请求的数据包,即原包返回
+ //返回给客户端它自己请求的数据包,即原包返回(4字节长度+4字节requestid+buffer)
const vector& request = current->getRequestBuffer();
response = request;
diff --git a/examples/PushDemo/PushServer/TestPushServer.cpp b/examples/PushDemo/PushServer/TestPushServer.cpp
index 5024578..3551e25 100644
--- a/examples/PushDemo/PushServer/TestPushServer.cpp
+++ b/examples/PushDemo/PushServer/TestPushServer.cpp
@@ -7,6 +7,42 @@ TestPushServer g_app;
/////////////////////////////////////////////////////////////////
+static TC_NetWorkBuffer::PACKET_TYPE parse(TC_NetWorkBuffer &in, vector &out)
+{
+ size_t len = sizeof(tars::Int32);
+
+ if (in.getBufferLength() < len)
+ {
+ return TC_NetWorkBuffer::PACKET_LESS;
+ }
+
+ string header;
+ in.getHeader(len, header);
+
+ assert(header.size() == len);
+
+ tars::Int32 iHeaderLen = 0;
+
+ ::memcpy(&iHeaderLen, header.c_str(), sizeof(tars::Int32));
+
+ iHeaderLen = ntohl(iHeaderLen);
+
+ if (iHeaderLen > 100000 || iHeaderLen < sizeof(unsigned int))
+ {
+ throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
+ }
+
+ if (in.getBufferLength() < (uint32_t)iHeaderLen)
+ {
+ return TC_NetWorkBuffer::PACKET_LESS;
+ }
+
+ in.getHeader(iHeaderLen, out);
+ in.moveHeader(iHeaderLen);
+
+ return TC_NetWorkBuffer::PACKET_FULL;
+}
+
void
TestPushServer::initialize()
{
@@ -15,11 +51,11 @@ TestPushServer::initialize()
addServant(ServerConfig::Application + "." + ServerConfig::ServerName + ".TestPushServantObj");
- addServantProtocol("Test.TestPushServer.TestPushServantObj", TC_NetWorkBuffer::parseBinary4<8, 1024*1024*10>);
+ addServantProtocol("TestApp.PushServer.TestPushServantObj", parse);
pushThread.start();
-
}
+
/////////////////////////////////////////////////////////////////
void
TestPushServer::destroyApp()
diff --git a/examples/PushDemo/PushServer/TestPushThread.cpp b/examples/PushDemo/PushServer/TestPushThread.cpp
index 5c9edd4..28c6b1b 100644
--- a/examples/PushDemo/PushServer/TestPushThread.cpp
+++ b/examples/PushDemo/PushServer/TestPushThread.cpp
@@ -38,30 +38,21 @@ void PushInfoThread::setPushInfo(const string &sInfo)
//定期向客户push消息
void PushInfoThread::run(void)
{
- time_t iNow;
-
setPushInfo("hello world");
while (!_bTerminate)
{
- iNow = TC_TimeProvider::getInstance()->getNow();
-
- if(iNow - _tLastPushTime > _tInterval)
+ (PushUser::mapMutex).lock();
+ for(map::iterator it = (PushUser::pushUser).begin(); it != (PushUser::pushUser).end(); ++it)
{
- _tLastPushTime = iNow;
-
- (PushUser::mapMutex).lock();
- for(map::iterator it = (PushUser::pushUser).begin(); it != (PushUser::pushUser).end(); ++it)
- {
- (it->second)->sendResponse(_sPushInfo.c_str(), _sPushInfo.size());
- LOG->debug() << "sendResponse: " << _sPushInfo.size() <second)->sendResponse(_sPushInfo.c_str(), _sPushInfo.size());
+ LOG->debug() << "sendResponse: " << _sPushInfo.size() <
+
+
+ #tarsregistry locator
+ locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
+ #max invoke timeout
+ max-invoke-timeout = 5000
+ #refresh endpoint interval
+ refresh-endpoint-interval = 10000
+ #stat obj
+ stat = tars.tarsstat.StatObj
+ #max send queue length limit
+ sendqueuelimit = 100000
+ #async queue length limit
+ asyncqueuecap = 100000
+ #async callback thread num
+ asyncthread = 3
+ #net thread
+ netthread = 1
+ #merge net and sync thread
+ mergenetasync = 0
+ #module name
+ modulename = TestApp.PushServer
+
+
+
+ #not cout
+ closecout = 0
+ #app name
+ app = TestApp
+ #server name
+ server = PushServer
+ #path
+ basepath = ./
+ datapath = ./
+ #log path
+ logpath = ./
+ #merge net and imp thread
+ mergenetimp = 0
+ #local ip, for tarsnode
+# local = tcp -h 127.0.0.1 -p 15001 -t 10000
+
+ #tarsnode
+# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
+ #config obj
+# config = tars.tarsconfig.ConfigObj
+ #notify obj
+# notify = tars.tarsconfig.NotifyObj
+ #log obj
+# log = tars.tarslog.LogObj
+
+
+ #ip:port:timeout
+ endpoint = tcp -h 127.0.0.1 -p 9300 -t 10000
+ #allow ip
+ allow =
+ #max connection num
+ maxconns = 4096
+ #imp thread num
+ threads = 5
+ #servant
+ servant = TestApp.PushServer.TestPushServantObj
+ #queue capacity
+ queuecap = 1000000
+ #tars protocol
+ protocol = not_tars
+
+
+
+
diff --git a/examples/QuickStartDemo/HelloServer/Client/main.cpp b/examples/QuickStartDemo/HelloServer/Client/main.cpp
index bfeb4dd..8b37dde 100644
--- a/examples/QuickStartDemo/HelloServer/Client/main.cpp
+++ b/examples/QuickStartDemo/HelloServer/Client/main.cpp
@@ -359,43 +359,3 @@ int main(int argc, char *argv[])
return 0;
}
-
-// int main(int argc,char ** argv)
-// {
-// Communicator comm;
-
-// TarsRollLogger::getInstance()->logger()->setLogLevel(TarsRollLogger::TARS_LOG);
-// try
-// {
-// HelloPrx prx;
-// comm.stringToProxy("TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 8999" , prx);
-
-// try
-// {
-// string sReq("hello");
-// string sRsp("");
-
-// int iRet = prx->testHello(sReq, sRsp);
-// cout<<"iRet:"<= 0)
{
string sReq("hello");
@@ -45,7 +45,8 @@ int main(int argc,char ** argv)
assert(sReq == sRsp);
}
- // cout<<"iRet:"<
+
+
+ #tarsregistry locator
+ locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
+ #max invoke timeout
+ max-invoke-timeout = 5000
+ #refresh endpoint interval
+ refresh-endpoint-interval = 10000
+ #stat obj
+ stat = tars.tarsstat.StatObj
+ #max send queue length limit
+ sendqueuelimit = 100000
+ #async queue length limit
+ asyncqueuecap = 100000
+ #async callback thread num
+ asyncthread = 3
+ #net thread
+ netthread = 1
+ #merge net and sync thread
+ mergenetasync = 0
+ #module name
+ modulename = TestApp.ProxyServer
+
+
+
+ #not cout
+ closecout = 0
+ #app name
+ app = TestApp
+ #server name
+ server = ProxyServer
+ #path
+ basepath = ./
+ datapath = ./
+ #log path
+ logpath = ./
+ #merge net and imp thread
+ mergenetimp = 0
+ #local ip, for tarsnode
+# local = tcp -h 127.0.0.1 -p 15001 -t 10000
+
+ #tarsnode
+# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
+ #config obj
+# config = tars.tarsconfig.ConfigObj
+ #notify obj
+# notify = tars.tarsconfig.NotifyObj
+ #log obj
+# log = tars.tarslog.LogObj
+
+
+ #ip:port:timeout
+ endpoint = tcp -h 127.0.0.1 -p 9200 -t 10000
+ #allow ip
+ allow =
+ #max connection num
+ maxconns = 4096
+ #imp thread num
+ threads = 5
+ #servant
+ servant = TestApp.ProxyServer.ProxyObj
+ #queue capacity
+ queuecap = 1000000
+ #tars protocol
+ protocol = tars
+
+
+
+
diff --git a/examples/scripts/run-co.sh b/examples/scripts/run-co.sh
new file mode 100644
index 0000000..faea578
--- /dev/null
+++ b/examples/scripts/run-co.sh
@@ -0,0 +1,43 @@
+#!/bin/bash
+
+echo "run-co.sh"
+
+WORKDIR=$(pwd)
+
+echo ${WORKDIR}
+
+killall -9 CoroutineDemoAServer
+
+echo "start server: ${WORKDIR}/../bin/CoroutineDemoAServer --config=${WORKDIR}/../../examples/CoroutineDemo/AServer/config.conf &"
+
+${WORKDIR}/../bin/CoroutineDemoAServer --config=${WORKDIR}/../../examples/CoroutineDemo/AServer/config.conf &
+
+#-------------------------------------------------------------------------------------------------------
+
+killall -9 CoroutineDemoBServer
+
+echo "start server: ${WORKDIR}/../bin/CoroutineDemoBServer --config=${WORKDIR}/../../examples/CoroutineDemo/BServer/config.conf &"
+
+${WORKDIR}/../bin/CoroutineDemoBServer --config=${WORKDIR}/../../examples/CoroutineDemo/BServer/config.conf &
+
+
+#-------------------------------------------------------------------------------------------------------
+sleep 3
+
+
+echo "client: ${WORKDIR}/../bin/CoroutineDemoClient"
+
+${WORKDIR}/../bin/CoroutineDemoClient 2 10000 0
+
+${WORKDIR}/../bin/CoroutineDemoClient 2 10000 1
+
+${WORKDIR}/../bin/testCoro 1000
+
+${WORKDIR}/../bin/testParallelCoro 1000
+
+#-------------------------------------------------------------------------------------------------------
+
+#killall -9 CoroutineDemoAServer
+#killall -9 CoroutineDemoBServer
+
+
diff --git a/examples/scripts/run-push.sh b/examples/scripts/run-push.sh
new file mode 100644
index 0000000..a3b568a
--- /dev/null
+++ b/examples/scripts/run-push.sh
@@ -0,0 +1,24 @@
+#!/bin/bash
+
+echo "run-co.sh"
+
+WORKDIR=$(pwd)
+
+echo ${WORKDIR}
+
+killall -9 PushServer
+
+echo "start server: ${WORKDIR}/../bin/PushServer --config=${WORKDIR}/../../examples/PushDemo/PushServer/config.conf &"
+
+${WORKDIR}/../bin/PushServer --config=${WORKDIR}/../../examples/PushDemo/PushServer/config.conf &
+
+sleep 3
+
+#-------------------------------------------------------------------------------------------------------
+
+echo "client: ${WORKDIR}/../bin/PushClient"
+
+${WORKDIR}/../bin/PushClient 5
+
+
+
diff --git a/examples/scripts/run-quick-start.sh b/examples/scripts/run-quick-start.sh
index 9f132b5..b61930a 100644
--- a/examples/scripts/run-quick-start.sh
+++ b/examples/scripts/run-quick-start.sh
@@ -6,11 +6,12 @@ WORKDIR=$(pwd)
echo ${WORKDIR}
-killall -9 QuickStartDemo
+killall -9 QuickStartDemo ProxyServer
echo "start server: ${WORKDIR}/../bin/QuickStartDemo --config=${WORKDIR}/../../examples/QuickStartDemo/HelloServer/Server/config.conf &"
${WORKDIR}/../bin/QuickStartDemo --config=${WORKDIR}/../../examples/QuickStartDemo/HelloServer/Server/config.conf &
+${WORKDIR}/../bin/ProxyServer --config=${WORKDIR}/../../examples/QuickStartDemo/ProxyServer/Server/config.conf &
sleep 3
@@ -24,9 +25,12 @@ ${WORKDIR}/../bin/QuickStartDemoClient --count=100000 --call=synctup --thread=2
${WORKDIR}/../bin/QuickStartDemoClient --count=100000 --call=asynctup --thread=2 --buffersize=100 --netthread=2
-${WORKDIR}/../bin/ProxyServer --config=${WORKDIR}/../../examples/QuickStartDemo/ProxyServer/Server/config.conf &
+echo "client: ${WORKDIR}/../bin/ProxyServerClient"
+
${WORKDIR}/../bin/ProxyServerClient
+sleep 2
+
+killall -9 ProxyServer QuickStartDemo
+
-killall -9 QuickStartDemo
-killall -9 ProxyServer
\ No newline at end of file
diff --git a/servant/libservant/AdapterProxy.cpp b/servant/libservant/AdapterProxy.cpp
index dc4f9e3..330f700 100755
--- a/servant/libservant/AdapterProxy.cpp
+++ b/servant/libservant/AdapterProxy.cpp
@@ -520,8 +520,8 @@ void AdapterProxy::finishInvoke(shared_ptr & rsp)
ReqMessage * msg = NULL;
//requestid 为0 是push消息
- // if(rsp->iRequestId == 0)
- if (rsp->cPacketType == TARSPUSH)
+ if(rsp->iRequestId == 0 || rsp->cPacketType == TARSPUSH)
+ // if (rsp->cPacketType == TARSPUSH)
{
if(!_objectProxy->getPushCallback())
{
diff --git a/servant/libservant/Application.cpp b/servant/libservant/Application.cpp
index 86bc82e..58b9f39 100644
--- a/servant/libservant/Application.cpp
+++ b/servant/libservant/Application.cpp
@@ -743,9 +743,6 @@ void Application::main(const TC_Option &option)
//绑定对象和端口
bindAdapter(adapters);
- //业务应用的初始化
- initialize();
-
//输出所有adapter
outAllAdapter(cout);
diff --git a/servant/libservant/AsyncProcThread.cpp b/servant/libservant/AsyncProcThread.cpp
index 376402e..6c9443a 100644
--- a/servant/libservant/AsyncProcThread.cpp
+++ b/servant/libservant/AsyncProcThread.cpp
@@ -64,7 +64,7 @@ void AsyncProcThread::push_back(ReqMessage * msg)
else {
if(_msgQueue->size() >= _iQueueCap)
{
- TLOGERROR("[TARS][AsyncProcThread::push_back] async_queue full." << endl);
+ TLOGERROR("[TARS][AsyncProcThread::push_back] async_queue full:" << _msgQueue->size() << ">=" << _iQueueCap << endl);
delete msg;
}
else
diff --git a/servant/libservant/Communicator.cpp b/servant/libservant/Communicator.cpp
index 5bf4915..f06fe04 100644
--- a/servant/libservant/Communicator.cpp
+++ b/servant/libservant/Communicator.cpp
@@ -287,7 +287,7 @@ void Communicator::initialize()
bool merge = TC_Common::strto(getProperty("mergenetasync", "0"));
//异步队列的大小
- size_t iAsyncQueueCap = TC_Common::strto(getProperty("asyncqueuecap", "10-000"));
+ size_t iAsyncQueueCap = TC_Common::strto(getProperty("asyncqueuecap", "100000"));
if(iAsyncQueueCap < 10000)
{
iAsyncQueueCap = 10000;
@@ -382,13 +382,13 @@ void Communicator::doStat()
void Communicator::pushAsyncThreadQueue(ReqMessage * msg)
{
//先不考虑每个线程队列数目不一致的情况
- _asyncThread[_asyncSeq]->push_back(msg);
- _asyncSeq ++;
+ _asyncThread[(++_asyncSeq)%_asyncThreadNum]->push_back(msg);
+ // _asyncSeq ++;
- if(_asyncSeq == _asyncThreadNum)
- {
- _asyncSeq = 0;
- }
+ // if(_asyncSeq == _asyncThreadNum)
+ // {
+ // _asyncSeq = 0;
+ // }
}
void Communicator::terminate()
diff --git a/util/src/tc_thread.cpp b/util/src/tc_thread.cpp
index d56b934..84708b9 100644
--- a/util/src/tc_thread.cpp
+++ b/util/src/tc_thread.cpp
@@ -64,7 +64,6 @@ TC_Thread::TC_Thread() : _running(false), _th(NULL)
TC_Thread::~TC_Thread()
{
-
if(_th != NULL)
{
//如果资源没有被detach或者被join,则自己释放
@@ -150,119 +149,5 @@ size_t TC_Thread::CURRENT_THREADID()
return threadId;
}
-// TC_ThreadControl::TC_ThreadControl(pthread_t thread) : _thread(thread)
-// {
-// }
-
-// TC_ThreadControl::TC_ThreadControl() : _thread(pthread_self())
-// {
-// }
-
-// void TC_ThreadControl::join()
-// {
-// if(pthread_self() == _thread)
-// {
-// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] can't be called in the same thread");
-// }
-
-// void* ignore = 0;
-// int rc = pthread_join(_thread, &ignore);
-// if(rc != 0)
-// {
-// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] pthread_join error ", rc);
-// }
-// }
-
-// void TC_ThreadControl::detach()
-// {
-// if(pthread_self() == _thread)
-// {
-// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] can't be called in the same thread");
-// }
-
-// int rc = pthread_detach(_thread);
-// if(rc != 0)
-// {
-// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] pthread_join error", rc);
-// }
-// }
-
-// pthread_t TC_ThreadControl::id() const
-// {
-// return _thread;
-// }
-
-// void TC_ThreadControl::sleep(long millsecond)
-// {
-// struct timespec ts;
-// ts.tv_sec = millsecond / 1000;
-// ts.tv_nsec = (millsecond % 1000)*1000000;
-// nanosleep(&ts, 0);
-// }
-
-// void TC_ThreadControl::yield()
-// {
-// sched_yield();
-// }
-
-// TC_Thread::TC_Thread() : _running(false),_tid(-1)
-// {
-// }
-
-// void TC_Thread::threadEntry(TC_Thread *pThread)
-// {
-// pThread->_running = true;
-
-// {
-// TC_ThreadLock::Lock sync(pThread->_lock);
-// pThread->_lock.notifyAll();
-// }
-
-// try
-// {
-// pThread->run();
-// }
-// catch(...)
-// {
-// pThread->_running = false;
-// throw;
-// }
-// pThread->_running = false;
-// }
-
-// TC_ThreadControl TC_Thread::start()
-// {
-// TC_ThreadLock::Lock sync(_lock);
-
-// if(_running)
-// {
-// throw TC_ThreadThreadControl_Exception("[TC_Thread::start] thread has start");
-// }
-
-// int ret = pthread_create(&_tid,
-// 0,
-// (void *(*)(void *))&threadEntry,
-// (void *)this);
-
-// if(ret != 0)
-// {
-// throw TC_ThreadThreadControl_Exception("[TC_Thread::start] thread start error", ret);
-// }
-
-// _lock.wait();
-
-// return TC_ThreadControl(_tid);
-// }
-
-// TC_ThreadControl TC_Thread::getThreadControl() const
-// {
-// return TC_ThreadControl(_tid);
-// }
-
-// bool TC_Thread::isAlive() const
-// {
-// return _running;
-// }
-
}