add config, run-push run-co, fix initialize bug

This commit is contained in:
ruanshudong 2020-02-02 20:35:17 +08:00
parent cfd4d576c7
commit dded31f122
23 changed files with 488 additions and 291 deletions

View File

@ -54,4 +54,24 @@ macro(build_tars_server MODULE DEPS)
target_link_libraries(${MODULE} tarsservant tarsutil)
SET(MODULE-TGZ "${CMAKE_BINARY_DIR}/${MODULE}.tgz")
SET(RUN_DEPLOY_COMMAND_FILE "${PROJECT_BINARY_DIR}/run-deploy-${MODULE}.cmake")
FILE(WRITE ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(COMMAND ${CMAKE_COMMAND} -E make_directory ${PROJECT_BINARY_DIR}/deploy/${MODULE})\n")
IF(WIN32)
message(${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${CMAKE_BUILD_TYPE}/${MODULE}.exe)
FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${CMAKE_BUILD_TYPE}/${MODULE}.exe ${PROJECT_BINARY_DIR}/deploy/${MODULE}/)\n")
FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(WORKING_DIRECTORY ${PROJECT_BINARY_DIR}/deploy/ \n COMMAND ${CMAKE_COMMAND} -E tar czfv ${MODULE-TGZ} ${MODULE})\n")
ELSE()
FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${MODULE} ${PROJECT_BINARY_DIR}/deploy/${MODULE}/)\n")
FILE(APPEND ${RUN_DEPLOY_COMMAND_FILE} "EXECUTE_PROCESS(WORKING_DIRECTORY ${PROJECT_BINARY_DIR}/deploy/ \n COMMAND ${CMAKE_COMMAND} -E tar czfv ${MODULE-TGZ} ${MODULE})\n")
ENDIF()
#
add_custom_command(OUTPUT ${MODULE-TGZ}
DEPENDS ${MODULE}
COMMAND ${CMAKE_COMMAND} -P ${RUN_DEPLOY_COMMAND_FILE}
COMMENT "call ${RUN_DEPLOY_COMMAND_FILE}")
add_custom_target(${MODULE}-tar DEPENDS ${MODULE-TGZ})
endmacro()

View File

@ -20,3 +20,8 @@ add_custom_target(run-co
DEPENDS CoroutineDemoAServer CoroutineDemoBServer CoroutineDemoClient testCoro testParallelCoro
COMMAND sh ${CMAKE_CURRENT_SOURCE_DIR}/scripts/run-co.sh
COMMENT "call run co")
add_custom_target(run-push
DEPENDS PushServer PushClient
COMMAND sh ${CMAKE_CURRENT_SOURCE_DIR}/scripts/run-push.sh
COMMENT "call run push")

View File

@ -0,0 +1,70 @@
<tars>
<application>
<client>
#tarsregistry locator
locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
#max invoke timeout
max-invoke-timeout = 5000
#refresh endpoint interval
refresh-endpoint-interval = 10000
#stat obj
stat = tars.tarsstat.StatObj
#max send queue length limit
sendqueuelimit = 100000
#async queue length limit
asyncqueuecap = 100000
#async callback thread num
asyncthread = 3
#net thread
netthread = 1
#merge net and sync thread
mergenetasync = 0
#module name
modulename = Test.AServer
</client>
<server>
#not cout
closecout = 0
#app name
app = Test
#server name
server = AServer
#path
basepath = ./
datapath = ./
#log path
logpath = ./
#merge net and imp thread
mergenetimp = 0
#local ip, for tarsnode
# local = tcp -h 127.0.0.1 -p 15001 -t 10000
#tarsnode
# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
#config obj
# config = tars.tarsconfig.ConfigObj
#notify obj
# notify = tars.tarsconfig.NotifyObj
#log obj
# log = tars.tarslog.LogObj
<AAdapter>
#ip:port:timeout
endpoint = tcp -h 127.0.0.1 -p 9000 -t 10000
#allow ip
allow =
#max connection num
maxconns = 4096
#imp thread num
threads = 5
#servant
servant = Test.AServer.AServantObj
#queue capacity
queuecap = 1000000
#tars protocol
protocol = tars
</AAdapter>
</server>
</application>
</tars>

View File

@ -0,0 +1,72 @@
<tars>
<application>
<client>
#tarsregistry locator
locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
#max invoke timeout
max-invoke-timeout = 5000
#refresh endpoint interval
refresh-endpoint-interval = 10000
#stat obj
stat = tars.tarsstat.StatObj
#max send queue length limit
sendqueuelimit = 100000
#async queue length limit
asyncqueuecap = 100000
#async callback thread num
asyncthread = 3
#net thread
netthread = 1
#merge net and sync thread
mergenetasync = 0
#module name
modulename = Test.BServer
</client>
<server>
#not cout
closecout = 0
#app name
app = Test
#server name
server = BServer
#path
basepath = ./
datapath = ./
#log path
logpath = ./
#merge net and imp thread
mergenetimp = 0
#opencoroutine
opencoroutine = 1
#local ip, for tarsnode
# local = tcp -h 127.0.0.1 -p 15001 -t 10000
#tarsnode
# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
#config obj
# config = tars.tarsconfig.ConfigObj
#notify obj
# notify = tars.tarsconfig.NotifyObj
#log obj
# log = tars.tarslog.LogObj
<BAdapter>
#ip:port:timeout
endpoint = tcp -h 127.0.0.1 -p 9100 -t 10000
#allow ip
allow =
#max connection num
maxconns = 4096
#imp thread num
threads = 5
#servant
servant = Test.BServer.BServantObj
#queue capacity
queuecap = 1000000
#tars protocol
protocol = tars
</BAdapter>
</server>
</application>
</tars>

View File

@ -5,7 +5,7 @@
/*
ResponsePacket
*/
static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, ResponsePacket& done)
static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, ResponsePacket& rsp)
{
size_t len = sizeof(tars::Int32);
@ -26,7 +26,7 @@ static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, Response
iHeaderLen = ntohl(iHeaderLen);
//做一下保护,长度大于M
if (iHeaderLen > 100000 || iHeaderLen < sizeof(unsigned int))
if (iHeaderLen > 100000 || iHeaderLen < (int)sizeof(unsigned int))
{
throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
}
@ -37,101 +37,58 @@ static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, Response
return TC_NetWorkBuffer::PACKET_LESS;
}
in.moveHeader(sizeof(iHeaderLen));
tars::Int32 iRequestId = 0;
string sRequestId;
in.getHeader(sizeof(iHeaderLen), sRequestId);
in.getHeader(sizeof(iRequestId), sRequestId);
in.moveHeader(sizeof(iRequestId));
ResponsePacket rsp;
rsp.iRequestId = ntohl(*((unsigned int *)(sRequestId.c_str())));
in.getHeader(iHeaderLen - sizeof(iHeaderLen) - sizeof(iRequestId), rsp.sBuffer);
len = iHeaderLen - sizeof(iHeaderLen) - sizeof(iRequestId);
in.getHeader(len, rsp.sBuffer);
in.moveHeader(len);
return TC_NetWorkBuffer::PACKET_FULL;
}
/*
+iRequestId+
4+iRequestId4+
*/
static void pushRequest(const RequestPacket& request, shared_ptr<TC_NetWorkBuffer::SendBuffer>& buff)
static void pushRequest(const RequestPacket& request, shared_ptr<TC_NetWorkBuffer::SendBuffer>& sbuff)
{
// unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
// unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
// buff = "";
// for (int i = 0; i<4; ++i)
// {
// buff += *bufflengthptr++;
// }
vector<char> buffer;
buffer.resize(request.sBuffer.size()+8);
// unsigned int netrequestId = htonl(request.iRequestId);
// unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId);
memcpy(buffer.data(), bufflengthptr, sizeof(unsigned int));
// for (int i = 0; i<4; ++i)
// {
// buff += *netrequestIdptr++;
// }
unsigned int netrequestId = htonl(request.iRequestId);
unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId);
// string tmp;
// tmp.assign((const char*)(&request.sBuffer[0]), request.sBuffer.size());
// buff+=tmp;
memcpy(buffer.data() + sizeof(unsigned int), netrequestIdptr, sizeof(unsigned int));
memcpy(buffer.data() + sizeof(unsigned int) * 2, request.sBuffer.data(), request.sBuffer.size());
TarsOutputStream<BufferWriterVector> os;
tars::Int32 iHeaderLen = 0;
tars::Int32 iRequestId = request.iRequestId;
// 先预留8个字节长度
os.writeBuf((const char *)&iHeaderLen, sizeof(iHeaderLen));
os.writeBuf((const char *)&iRequestId, sizeof(iRequestId));
request.writeTo(os);
buff->swap(os.getByteBuffer());
// assert(buff->length() >= 4);
iHeaderLen = htonl((int)(buff->length()));
memcpy((void*)buff->buffer(), (const char *)&iHeaderLen, sizeof(iHeaderLen));
sbuff->addBuffer(buffer);
}
static void printResult(int iRequestId, const string &sResponseStr)
{
cout << "request id: " << iRequestId << endl;
cout << "response str: " << sResponseStr << endl;
cout << "request id: " << iRequestId << ", response str: " << sResponseStr << endl;
}
static void printPushInfo(const string &sResponseStr)
{
cout << "push message: " << sResponseStr << endl;
}
// int TestPushCallBack::onRequestException(int iRet)
// {
// return 0;
// }
// int TestPushCallBack::onRequestResponse(const tars::RequestPacket& request, const tars::ResponsePacket& response)
// {
// string sRet;
// cout << "sBuffer: " << response.sBuffer.size() << endl;
// sRet.assign(&(response.sBuffer[0]), response.sBuffer.size());
// printResult(response.iRequestId, sRet);
// return 0;
// }
// int TestPushCallBack::onPushResponse(const tars::ResponsePacket& response)
// {
// string sRet;
// sRet.assign(&(response.sBuffer[0]), response.sBuffer.size());
// printPushInfo(sRet);
// return 0;
// }
int TestPushCallBack::onDispatch(ReqMessagePtr msg)
{
if(msg->request.sFuncName == "printResult")
{
string sRet;
cout << "sBuffer: " << msg->response->sBuffer.size() << endl;
sRet.assign(&(msg->response->sBuffer[0]), msg->response->sBuffer.size());
printResult(msg->request.iRequestId, sRet);
return 0;
@ -150,12 +107,11 @@ int TestPushCallBack::onDispatch(ReqMessagePtr msg)
return -3;
}
RecvThread::RecvThread():_bTerminate(false)
RecvThread::RecvThread(int second):_second(second), _bTerminate(false)
{
string sObjName = "Test.TestPushServer.TestPushServantObj";
string sObjHost = "tcp -h 10.120.129.226 -t 60000 -p 10099";
string sObjName = "TestApp.PushServer.TestPushServantObj@tcp -h 127.0.0.1 -t 60000 -p 9300";
_prx = _comm.stringToProxy<ServantPrx>(sObjName+"@"+sObjHost);
_prx = _comm.stringToProxy<ServantPrx>(sObjName);
ProxyProtocol prot;
prot.requestFunc = pushRequest;
@ -164,15 +120,6 @@ RecvThread::RecvThread():_bTerminate(false)
_prx->tars_set_protocol(prot);
}
void RecvThread::terminate()
{
_bTerminate = true;
{
tars::TC_ThreadLock::Lock sync(*this);
notifyAll();
}
}
void RecvThread::run(void)
{
TestPushCallBackPtr cbPush = new TestPushCallBack();
@ -180,6 +127,8 @@ void RecvThread::run(void)
string buf("heartbeat");
time_t n = TNOW;
while(!_bTerminate)
{
{
@ -198,9 +147,15 @@ void RecvThread::run(void)
}
}
if(TNOW - n >= _second)
{
_bTerminate = true;
break;
}
{
TC_ThreadLock::Lock sync(*this);
timedWait(5000);
timedWait(500);
}
}
}

View File

@ -9,26 +9,17 @@ public:
virtual int onDispatch(ReqMessagePtr msg);
};
// class TestPushCallBack : public tars::PushCallback
// {
// public:
// virtual int onRequestException(int iRet);
// virtual int onRequestResponse(const tars::RequestPacket& request, const tars::ResponsePacket& response);
// virtual int onPushResponse(const tars::ResponsePacket& response);
// };
typedef tars::TC_AutoPtr<TestPushCallBack> TestPushCallBackPtr;
class RecvThread : public TC_Thread, public TC_ThreadLock
{
public:
RecvThread();
RecvThread(int second);
virtual void run();
void terminate();
private:
int _second;
bool _bTerminate;
Communicator _comm;

View File

@ -9,13 +9,17 @@ int main(int argc,char**argv)
{
try
{
RecvThread thread;
int second = 5;
if(argc > 1)
second = TC_Common::strto<int>(argv[1]);
if(second <=0 )
second = 1;
RecvThread thread(second);
thread.start();
int c;
while((c = getchar()) != 'q');
thread.terminate();
thread.getThreadControl().join();
}
catch(std::exception&e)

View File

@ -30,7 +30,7 @@ int TestPushServantImp::doRequest(tars::TarsCurrentPtr current, vector<char>& re
LOG->debug() << "connect ip: " << current->getIp() << endl;
}
(PushUser::mapMutex).unlock();
//返回给客户端它自己请求的数据包,即原包返回
//返回给客户端它自己请求的数据包,即原包返回(4字节长度+4字节requestid+buffer)
const vector<char>& request = current->getRequestBuffer();
response = request;

View File

@ -7,6 +7,42 @@ TestPushServer g_app;
/////////////////////////////////////////////////////////////////
static TC_NetWorkBuffer::PACKET_TYPE parse(TC_NetWorkBuffer &in, vector<char> &out)
{
size_t len = sizeof(tars::Int32);
if (in.getBufferLength() < len)
{
return TC_NetWorkBuffer::PACKET_LESS;
}
string header;
in.getHeader(len, header);
assert(header.size() == len);
tars::Int32 iHeaderLen = 0;
::memcpy(&iHeaderLen, header.c_str(), sizeof(tars::Int32));
iHeaderLen = ntohl(iHeaderLen);
if (iHeaderLen > 100000 || iHeaderLen < sizeof(unsigned int))
{
throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
}
if (in.getBufferLength() < (uint32_t)iHeaderLen)
{
return TC_NetWorkBuffer::PACKET_LESS;
}
in.getHeader(iHeaderLen, out);
in.moveHeader(iHeaderLen);
return TC_NetWorkBuffer::PACKET_FULL;
}
void
TestPushServer::initialize()
{
@ -15,11 +51,11 @@ TestPushServer::initialize()
addServant<TestPushServantImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".TestPushServantObj");
addServantProtocol("Test.TestPushServer.TestPushServantObj", TC_NetWorkBuffer::parseBinary4<8, 1024*1024*10>);
addServantProtocol("TestApp.PushServer.TestPushServantObj", parse);
pushThread.start();
}
/////////////////////////////////////////////////////////////////
void
TestPushServer::destroyApp()

View File

@ -38,30 +38,21 @@ void PushInfoThread::setPushInfo(const string &sInfo)
//定期向客户push消息
void PushInfoThread::run(void)
{
time_t iNow;
setPushInfo("hello world");
while (!_bTerminate)
{
iNow = TC_TimeProvider::getInstance()->getNow();
if(iNow - _tLastPushTime > _tInterval)
(PushUser::mapMutex).lock();
for(map<string, TarsCurrentPtr>::iterator it = (PushUser::pushUser).begin(); it != (PushUser::pushUser).end(); ++it)
{
_tLastPushTime = iNow;
(PushUser::mapMutex).lock();
for(map<string, TarsCurrentPtr>::iterator it = (PushUser::pushUser).begin(); it != (PushUser::pushUser).end(); ++it)
{
(it->second)->sendResponse(_sPushInfo.c_str(), _sPushInfo.size());
LOG->debug() << "sendResponse: " << _sPushInfo.size() <<endl;
}
(PushUser::mapMutex).unlock();
(it->second)->sendResponse(_sPushInfo.c_str(), _sPushInfo.size());
LOG->debug() << "sendResponse: " << _sPushInfo.size() <<endl;
}
(PushUser::mapMutex).unlock();
{
TC_ThreadLock::Lock sync(*this);
timedWait(1000);
timedWait(100);
}
}
}

View File

@ -13,7 +13,7 @@ public:
class PushInfoThread : public TC_Thread, public TC_ThreadLock
{
public:
PushInfoThread():_bTerminate(false),_tLastPushTime(0),_tInterval(10),_iId(0){}
PushInfoThread():_bTerminate(false),_tLastPushTime(0),_iId(0){}
virtual void run();
@ -24,7 +24,6 @@ public:
private:
bool _bTerminate;
time_t _tLastPushTime;
time_t _tInterval;
unsigned int _iId;
string _sPushInfo;
};

View File

@ -0,0 +1,70 @@
<tars>
<application>
<client>
#tarsregistry locator
locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
#max invoke timeout
max-invoke-timeout = 5000
#refresh endpoint interval
refresh-endpoint-interval = 10000
#stat obj
stat = tars.tarsstat.StatObj
#max send queue length limit
sendqueuelimit = 100000
#async queue length limit
asyncqueuecap = 100000
#async callback thread num
asyncthread = 3
#net thread
netthread = 1
#merge net and sync thread
mergenetasync = 0
#module name
modulename = TestApp.PushServer
</client>
<server>
#not cout
closecout = 0
#app name
app = TestApp
#server name
server = PushServer
#path
basepath = ./
datapath = ./
#log path
logpath = ./
#merge net and imp thread
mergenetimp = 0
#local ip, for tarsnode
# local = tcp -h 127.0.0.1 -p 15001 -t 10000
#tarsnode
# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
#config obj
# config = tars.tarsconfig.ConfigObj
#notify obj
# notify = tars.tarsconfig.NotifyObj
#log obj
# log = tars.tarslog.LogObj
<ProxyAdapter>
#ip:port:timeout
endpoint = tcp -h 127.0.0.1 -p 9300 -t 10000
#allow ip
allow =
#max connection num
maxconns = 4096
#imp thread num
threads = 5
#servant
servant = TestApp.PushServer.TestPushServantObj
#queue capacity
queuecap = 1000000
#tars protocol
protocol = not_tars
</ProxyAdapter>
</server>
</application>
</tars>

View File

@ -359,43 +359,3 @@ int main(int argc, char *argv[])
return 0;
}
// int main(int argc,char ** argv)
// {
// Communicator comm;
// TarsRollLogger::getInstance()->logger()->setLogLevel(TarsRollLogger::TARS_LOG);
// try
// {
// HelloPrx prx;
// comm.stringToProxy("TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 8999" , prx);
// try
// {
// string sReq("hello");
// string sRsp("");
// int iRet = prx->testHello(sReq, sRsp);
// cout<<"iRet:"<<iRet<<" sReq:"<<sReq<<" sRsp:"<<sRsp<<endl;
// }
// catch(exception &ex)
// {
// cerr << "ex:" << ex.what() << endl;
// }
// catch(...)
// {
// cerr << "unknown exception." << endl;
// }
// }
// catch(exception& e)
// {
// cerr << "exception:" << e.what() << endl;
// }
// catch (...)
// {
// cerr << "unknown exception." << endl;
// }
// return 0;
// }

View File

@ -30,10 +30,10 @@ int main(int argc,char ** argv)
{
ProxyPrx prx;
comm.stringToProxy("TestApp.ProxyServer.ProxyObj@tcp -h 127.0.0.1 -p 9200" , prx);
int64_t t = TC_Common::now2us();
try
{
int i = 1000;
int i = 10000;
while(i-- >= 0)
{
string sReq("hello");
@ -45,7 +45,8 @@ int main(int argc,char ** argv)
assert(sReq == sRsp);
}
// cout<<"iRet:"<<iRet<<" sReq:"<<sReq<<" sRsp:"<<sRsp<<endl;
int64_t cost = TC_Common::now2us() - t;
cout << "syncCall total:" << cost << "us, avg:" << 1.*cost/10000 << "us" << endl;
}
catch(exception &ex)

View File

@ -0,0 +1,70 @@
<tars>
<application>
<client>
#tarsregistry locator
locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
#max invoke timeout
max-invoke-timeout = 5000
#refresh endpoint interval
refresh-endpoint-interval = 10000
#stat obj
stat = tars.tarsstat.StatObj
#max send queue length limit
sendqueuelimit = 100000
#async queue length limit
asyncqueuecap = 100000
#async callback thread num
asyncthread = 3
#net thread
netthread = 1
#merge net and sync thread
mergenetasync = 0
#module name
modulename = TestApp.ProxyServer
</client>
<server>
#not cout
closecout = 0
#app name
app = TestApp
#server name
server = ProxyServer
#path
basepath = ./
datapath = ./
#log path
logpath = ./
#merge net and imp thread
mergenetimp = 0
#local ip, for tarsnode
# local = tcp -h 127.0.0.1 -p 15001 -t 10000
#tarsnode
# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
#config obj
# config = tars.tarsconfig.ConfigObj
#notify obj
# notify = tars.tarsconfig.NotifyObj
#log obj
# log = tars.tarslog.LogObj
<ProxyAdapter>
#ip:port:timeout
endpoint = tcp -h 127.0.0.1 -p 9200 -t 10000
#allow ip
allow =
#max connection num
maxconns = 4096
#imp thread num
threads = 5
#servant
servant = TestApp.ProxyServer.ProxyObj
#queue capacity
queuecap = 1000000
#tars protocol
protocol = tars
</ProxyAdapter>
</server>
</application>
</tars>

View File

@ -0,0 +1,43 @@
#!/bin/bash
echo "run-co.sh"
WORKDIR=$(pwd)
echo ${WORKDIR}
killall -9 CoroutineDemoAServer
echo "start server: ${WORKDIR}/../bin/CoroutineDemoAServer --config=${WORKDIR}/../../examples/CoroutineDemo/AServer/config.conf &"
${WORKDIR}/../bin/CoroutineDemoAServer --config=${WORKDIR}/../../examples/CoroutineDemo/AServer/config.conf &
#-------------------------------------------------------------------------------------------------------
killall -9 CoroutineDemoBServer
echo "start server: ${WORKDIR}/../bin/CoroutineDemoBServer --config=${WORKDIR}/../../examples/CoroutineDemo/BServer/config.conf &"
${WORKDIR}/../bin/CoroutineDemoBServer --config=${WORKDIR}/../../examples/CoroutineDemo/BServer/config.conf &
#-------------------------------------------------------------------------------------------------------
sleep 3
echo "client: ${WORKDIR}/../bin/CoroutineDemoClient"
${WORKDIR}/../bin/CoroutineDemoClient 2 10000 0
${WORKDIR}/../bin/CoroutineDemoClient 2 10000 1
${WORKDIR}/../bin/testCoro 1000
${WORKDIR}/../bin/testParallelCoro 1000
#-------------------------------------------------------------------------------------------------------
#killall -9 CoroutineDemoAServer
#killall -9 CoroutineDemoBServer

View File

@ -0,0 +1,24 @@
#!/bin/bash
echo "run-co.sh"
WORKDIR=$(pwd)
echo ${WORKDIR}
killall -9 PushServer
echo "start server: ${WORKDIR}/../bin/PushServer --config=${WORKDIR}/../../examples/PushDemo/PushServer/config.conf &"
${WORKDIR}/../bin/PushServer --config=${WORKDIR}/../../examples/PushDemo/PushServer/config.conf &
sleep 3
#-------------------------------------------------------------------------------------------------------
echo "client: ${WORKDIR}/../bin/PushClient"
${WORKDIR}/../bin/PushClient 5

View File

@ -6,11 +6,12 @@ WORKDIR=$(pwd)
echo ${WORKDIR}
killall -9 QuickStartDemo
killall -9 QuickStartDemo ProxyServer
echo "start server: ${WORKDIR}/../bin/QuickStartDemo --config=${WORKDIR}/../../examples/QuickStartDemo/HelloServer/Server/config.conf &"
${WORKDIR}/../bin/QuickStartDemo --config=${WORKDIR}/../../examples/QuickStartDemo/HelloServer/Server/config.conf &
${WORKDIR}/../bin/ProxyServer --config=${WORKDIR}/../../examples/QuickStartDemo/ProxyServer/Server/config.conf &
sleep 3
@ -24,9 +25,12 @@ ${WORKDIR}/../bin/QuickStartDemoClient --count=100000 --call=synctup --thread=2
${WORKDIR}/../bin/QuickStartDemoClient --count=100000 --call=asynctup --thread=2 --buffersize=100 --netthread=2
${WORKDIR}/../bin/ProxyServer --config=${WORKDIR}/../../examples/QuickStartDemo/ProxyServer/Server/config.conf &
echo "client: ${WORKDIR}/../bin/ProxyServerClient"
${WORKDIR}/../bin/ProxyServerClient
sleep 2
killall -9 ProxyServer QuickStartDemo
killall -9 QuickStartDemo
killall -9 ProxyServer

View File

@ -520,8 +520,8 @@ void AdapterProxy::finishInvoke(shared_ptr<ResponsePacket> & rsp)
ReqMessage * msg = NULL;
//requestid 为0 是push消息
// if(rsp->iRequestId == 0)
if (rsp->cPacketType == TARSPUSH)
if(rsp->iRequestId == 0 || rsp->cPacketType == TARSPUSH)
// if (rsp->cPacketType == TARSPUSH)
{
if(!_objectProxy->getPushCallback())
{

View File

@ -743,9 +743,6 @@ void Application::main(const TC_Option &option)
//绑定对象和端口
bindAdapter(adapters);
//业务应用的初始化
initialize();
//输出所有adapter
outAllAdapter(cout);

View File

@ -64,7 +64,7 @@ void AsyncProcThread::push_back(ReqMessage * msg)
else {
if(_msgQueue->size() >= _iQueueCap)
{
TLOGERROR("[TARS][AsyncProcThread::push_back] async_queue full." << endl);
TLOGERROR("[TARS][AsyncProcThread::push_back] async_queue full:" << _msgQueue->size() << ">=" << _iQueueCap << endl);
delete msg;
}
else

View File

@ -287,7 +287,7 @@ void Communicator::initialize()
bool merge = TC_Common::strto<bool>(getProperty("mergenetasync", "0"));
//异步队列的大小
size_t iAsyncQueueCap = TC_Common::strto<size_t>(getProperty("asyncqueuecap", "10-000"));
size_t iAsyncQueueCap = TC_Common::strto<size_t>(getProperty("asyncqueuecap", "100000"));
if(iAsyncQueueCap < 10000)
{
iAsyncQueueCap = 10000;
@ -382,13 +382,13 @@ void Communicator::doStat()
void Communicator::pushAsyncThreadQueue(ReqMessage * msg)
{
//先不考虑每个线程队列数目不一致的情况
_asyncThread[_asyncSeq]->push_back(msg);
_asyncSeq ++;
_asyncThread[(++_asyncSeq)%_asyncThreadNum]->push_back(msg);
// _asyncSeq ++;
if(_asyncSeq == _asyncThreadNum)
{
_asyncSeq = 0;
}
// if(_asyncSeq == _asyncThreadNum)
// {
// _asyncSeq = 0;
// }
}
void Communicator::terminate()

View File

@ -64,7 +64,6 @@ TC_Thread::TC_Thread() : _running(false), _th(NULL)
TC_Thread::~TC_Thread()
{
if(_th != NULL)
{
//如果资源没有被detach或者被join则自己释放
@ -150,119 +149,5 @@ size_t TC_Thread::CURRENT_THREADID()
return threadId;
}
// TC_ThreadControl::TC_ThreadControl(pthread_t thread) : _thread(thread)
// {
// }
// TC_ThreadControl::TC_ThreadControl() : _thread(pthread_self())
// {
// }
// void TC_ThreadControl::join()
// {
// if(pthread_self() == _thread)
// {
// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] can't be called in the same thread");
// }
// void* ignore = 0;
// int rc = pthread_join(_thread, &ignore);
// if(rc != 0)
// {
// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] pthread_join error ", rc);
// }
// }
// void TC_ThreadControl::detach()
// {
// if(pthread_self() == _thread)
// {
// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] can't be called in the same thread");
// }
// int rc = pthread_detach(_thread);
// if(rc != 0)
// {
// throw TC_ThreadThreadControl_Exception("[TC_ThreadControl::join] pthread_join error", rc);
// }
// }
// pthread_t TC_ThreadControl::id() const
// {
// return _thread;
// }
// void TC_ThreadControl::sleep(long millsecond)
// {
// struct timespec ts;
// ts.tv_sec = millsecond / 1000;
// ts.tv_nsec = (millsecond % 1000)*1000000;
// nanosleep(&ts, 0);
// }
// void TC_ThreadControl::yield()
// {
// sched_yield();
// }
// TC_Thread::TC_Thread() : _running(false),_tid(-1)
// {
// }
// void TC_Thread::threadEntry(TC_Thread *pThread)
// {
// pThread->_running = true;
// {
// TC_ThreadLock::Lock sync(pThread->_lock);
// pThread->_lock.notifyAll();
// }
// try
// {
// pThread->run();
// }
// catch(...)
// {
// pThread->_running = false;
// throw;
// }
// pThread->_running = false;
// }
// TC_ThreadControl TC_Thread::start()
// {
// TC_ThreadLock::Lock sync(_lock);
// if(_running)
// {
// throw TC_ThreadThreadControl_Exception("[TC_Thread::start] thread has start");
// }
// int ret = pthread_create(&_tid,
// 0,
// (void *(*)(void *))&threadEntry,
// (void *)this);
// if(ret != 0)
// {
// throw TC_ThreadThreadControl_Exception("[TC_Thread::start] thread start error", ret);
// }
// _lock.wait();
// return TC_ThreadControl(_tid);
// }
// TC_ThreadControl TC_Thread::getThreadControl() const
// {
// return TC_ThreadControl(_tid);
// }
// bool TC_Thread::isAlive() const
// {
// return _running;
// }
}