1 udp in ipv6 bug

2 add tars.resource
3 fix rsp queue
4 add max buffer limit
5 send queue, data accumulate clear
6 Adjust cmake third library path
7 tc_network_buffer add iterator
8 optimize tc_http tc_http_async, improve http parser performance
This commit is contained in:
ruanshudong 2020-03-04 10:33:14 +08:00
parent 34c6435833
commit 21f58d250b
42 changed files with 3728 additions and 1219 deletions

View File

@ -73,6 +73,7 @@ if (TARS_PROTOBUF)
ExternalProject_Add(ADD_${LIB_PROTOBUF}
URL http://cdn.tarsyun.com/src/protobuf-cpp-3.11.3.tar.gz
DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/download
PREFIX ${CMAKE_BINARY_DIR}
INSTALL_DIR ${CMAKE_SOURCE_DIR}
CONFIGURE_COMMAND cmake cmake -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/src/protobuf -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=ON
@ -90,6 +91,7 @@ if (TARS_PROTOBUF)
ExternalProject_Add(ADD_${LIB_PROTOBUF}
URL http://cdn.tarsyun.com/src/protobuf-cpp-3.11.3.tar.gz
DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/download
PREFIX ${CMAKE_BINARY_DIR}
INSTALL_DIR ${CMAKE_SOURCE_DIR}
CONFIGURE_COMMAND cmake cmake -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/src/protobuf -DBUILD_SHARED_LIBS=OFF
@ -104,8 +106,8 @@ if (TARS_PROTOBUF)
endif ()
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/protobuf/lib DESTINATION .)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/protobuf/include/google DESTINATION include)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/protobuf/lib DESTINATION thirdparty)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/protobuf/include/google DESTINATION thirdparty/include)
add_dependencies(thirdparty ADD_${LIB_PROTOBUF})
@ -124,8 +126,8 @@ if (TARS_SSL)
set(LIB_CRYPTO "libcrypto")
ExternalProject_Add(ADD_${LIB_SSL}
DEPENDS ${LIB_ZLIB}
URL http://cdn.tarsyun.com/src/openssl-1.1.1d.tar.gz
DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/download
PREFIX ${CMAKE_BINARY_DIR}
INSTALL_DIR ${CMAKE_SOURCE_DIR}
CONFIGURE_COMMAND perl Configure --prefix=${CMAKE_BINARY_DIR}/src/openssl VC-WIN64A no-asm
@ -142,8 +144,8 @@ if (TARS_SSL)
set(LIB_CRYPTO "crypto")
ExternalProject_Add(ADD_${LIB_SSL}
DEPENDS ${LIB_ZLIB}
URL http://cdn.tarsyun.com/src/openssl-1.1.1d.tar.gz
DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/download
PREFIX ${CMAKE_BINARY_DIR}
INSTALL_DIR ${CMAKE_SOURCE_DIR}
CONFIGURE_COMMAND ./config --prefix=${CMAKE_BINARY_DIR}/src/openssl no-shared
@ -158,8 +160,8 @@ if (TARS_SSL)
endif ()
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/openssl/lib DESTINATION .)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/openssl/include/openssl DESTINATION include)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/openssl/lib DESTINATION thirdparty)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/openssl/include/openssl DESTINATION thirdparty/include)
add_dependencies(thirdparty ADD_${LIB_SSL})
endif ()
@ -175,6 +177,7 @@ if (TARS_MYSQL)
ExternalProject_Add(ADD_${LIB_MYSQL}
URL http://cdn.tarsyun.com/src/mysql-connector-c-6.1.11-src.zip
DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/download
PREFIX ${CMAKE_BINARY_DIR}
INSTALL_DIR ${CMAKE_SOURCE_DIR}
CONFIGURE_COMMAND cmake . -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/src/mysql -DBUILD_CONFIG=mysql_release
@ -192,6 +195,7 @@ if (TARS_MYSQL)
ExternalProject_Add(ADD_${LIB_MYSQL}
URL http://cdn.tarsyun.com/src/mysql-connector-c-6.1.11-src.tar.gz
DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/download
PREFIX ${CMAKE_BINARY_DIR}
INSTALL_DIR ${CMAKE_SOURCE_DIR}
CONFIGURE_COMMAND cmake . -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/src/mysql -DDEFAULT_CHARSET=utf8 -DDEFAULT_COLLATION=utf8_general_ci -DDISABLE_SHARED=1
@ -206,9 +210,9 @@ if (TARS_MYSQL)
endif ()
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/mysql/lib DESTINATION .)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/mysql/lib DESTINATION thirdparty)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/mysql/include/mysql DESTINATION include)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/mysql/include/mysql DESTINATION thirdparty/include)
add_dependencies(thirdparty ADD_${LIB_MYSQL})
endif ()
@ -228,6 +232,7 @@ if (TARS_HTTP2)
if (WIN32)
ExternalProject_Add(ADD_${LIB_HTTP2}
URL http://cdn.tarsyun.com/src/nghttp2-1.40.0.tar.gz
DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/download
PREFIX ${CMAKE_BINARY_DIR}
INSTALL_DIR ${CMAKE_SOURCE_DIR}
CONFIGURE_COMMAND cmake . -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/src/nghttp2 -DENABLE_LIB_ONLY=ON -DENABLE_STATIC_LIB=ON
@ -243,6 +248,7 @@ if (TARS_HTTP2)
else ()
ExternalProject_Add(ADD_${LIB_HTTP2}
URL http://cdn.tarsyun.com/src/nghttp2-1.40.0.tar.gz
DOWNLOAD_DIR ${CMAKE_SOURCE_DIR}/download
PREFIX ${CMAKE_BINARY_DIR}
INSTALL_DIR ${CMAKE_SOURCE_DIR}
CONFIGURE_COMMAND cmake . -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/src/nghttp2 -DENABLE_LIB_ONLY=ON -DENABLE_STATIC_LIB=ON
@ -257,8 +263,8 @@ if (TARS_HTTP2)
endif ()
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/nghttp2/lib DESTINATION .)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/nghttp2/include/nghttp2 DESTINATION include)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/nghttp2/lib DESTINATION thirdparty)
INSTALL(DIRECTORY ${CMAKE_BINARY_DIR}/src/nghttp2/include/nghttp2 DESTINATION thirdparty/include)
add_dependencies(thirdparty ADD_${LIB_HTTP2})

View File

@ -12,6 +12,7 @@ endif()
add_subdirectory(PushDemo)
add_subdirectory(QuickStartDemo)
add_subdirectory(StressDemo)
add_subdirectory(UdpDemo)
set(WORKING_DIRECTORY ${tars-cpp_SOURCE_DIR})
@ -38,6 +39,13 @@ if(WIN32)
COMMAND examples/scripts/run-auth.bat ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${WORKING_DIRECTORY}
COMMENT "call run auth")
add_custom_target(run-udp
WORKING_DIRECTORY ${WORKING_DIRECTORY}
DEPENDS UdpServer UdpClient
USES_TERMINAL
COMMAND examples/scripts/run-udp.bat ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${WORKING_DIRECTORY}
COMMENT "call run udp")
if(TARS_HTTP2)
add_custom_target(run-http2
WORKING_DIRECTORY ${WORKING_DIRECTORY}
@ -117,10 +125,16 @@ else(WIN32)
COMMENT "call quick start")
add_custom_target(run-http
WORKING_DIRECTORY ${WORKING_DIRECTORY}
DEPENDS HttpServer HttpClient
COMMAND sh examples/scripts/run-http.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${WORKING_DIRECTORY}
COMMENT "call run http")
WORKING_DIRECTORY ${WORKING_DIRECTORY}
DEPENDS HttpServer HttpClient
COMMAND sh examples/scripts/run-http.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${WORKING_DIRECTORY}
COMMENT "call run http")
add_custom_target(run-udp
WORKING_DIRECTORY ${WORKING_DIRECTORY}
DEPENDS UdpServer UdpClient
COMMAND sh examples/scripts/run-udp.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${WORKING_DIRECTORY}
COMMENT "call run udp")
add_custom_target(run-auth
WORKING_DIRECTORY ${WORKING_DIRECTORY}

View File

@ -0,0 +1,5 @@
include_directories(Server)
add_subdirectory(Server)
add_subdirectory(Client)

View File

@ -0,0 +1 @@
build_tars_server(UdpClient UdpServer)

View File

@ -0,0 +1,28 @@
<tars>
<application>
<client>
#tarsregistry locator
locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
#max invoke timeout
sync-invoke-timeout = 5000
#refresh endpoint interval
refresh-endpoint-interval = 10000
#stat obj
stat = tars.tarsstat.StatObj
#max send queue length limit
sendqueuelimit = 100000
#async queue length limit
asyncqueuecap = 100000
#async callback thread num
asyncthread = 3
#net thread
netthread = 1
#merge net and sync thread
mergenetasync = 0
#module name
modulename = TestApp.UdpClient
</client>
</application>
</tars>

View File

@ -0,0 +1,243 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#include <iostream>
#include "servant/Communicator.h"
#include "Hello.h"
#include "util/tc_option.h"
using namespace std;
using namespace tars;
using namespace TestApp;
Communicator* _comm;
static string helloObj = "TestApp.UdpServer.UdpObj@udp -h 127.0.0.1 -p 9016 -e 1";
static string ipv6Obj = "TestApp.UdpServer.Ipv6Obj@udp -h ::1 -p 25460";
struct Param
{
int count;
string call;
int thread;
int buffersize;
int netthread;
HelloPrx pPrx;
};
Param param;
std::atomic<int> request_count(0);
std::atomic<int> callback_count(0);
struct HelloCallback : public HelloPrxCallback
{
HelloCallback(int64_t t, int i, int c) : start(t), cur(i), count(c)
{
}
//call back
virtual void callback_testHello(int ret, const string &r)
{
assert(ret == 0);
callback_count++;
if(cur == count-1)
{
int64_t cost = TC_Common::now2us() - start;
cout << "callback_testHello count:" << count << ", " << cost << " us, avg:" << 1.*cost/count << "us" << endl;
}
}
virtual void callback_testHello_exception(tars::Int32 ret)
{
cout << "callback exception:" << ret << endl;
}
int64_t start;
int cur;
int count;
};
void syncCall(int c)
{
string buffer(param.buffersize, 'a');
int64_t t = TC_Common::now2us();
//发起远程调用
for (int i = 0; i < c; ++i)
{
string r;
try
{
param.pPrx->testHello(buffer, r);
}
catch(exception& e)
{
cout << "exception:" << e.what() << endl;
}
++callback_count;
}
int64_t cost = TC_Common::now2us() - t;
cout << "syncCall total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
}
void asyncCall(int c)
{
int64_t t = TC_Common::now2us();
string buffer(param.buffersize, 'a');
//发起远程调用
for (int i = 0; i < c;)
{
if(request_count - callback_count < 100) {
i++;
request_count++;
HelloPrxCallbackPtr p = new HelloCallback(t, i, c);
try {
param.pPrx->async_testHello(p, buffer);
}
catch (exception & e) {
cout << "exception:" << e.what() << endl;
}
}
else
{
TC_Common::msleep(10);
}
}
int64_t cost = TC_Common::now2us() - t;
cout << "asyncCall send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
}
int main(int argc, char *argv[])
{
try
{
if (argc < 6)
{
cout << "Usage:" << argv[0] << "--config=conf --count=1000 --call=[sync|async|syncv6|asyncv6] --thread=1 --buffersize=1000 --netthread=1" << endl;
return 0;
}
TC_Option option;
option.decode(argc, argv);
param.count = TC_Common::strto<int>(option.getValue("count"));
if(param.count <= 0) param.count = 1000;
param.buffersize = TC_Common::strto<int>(option.getValue("buffersize"));
if(param.buffersize <= 0) param.buffersize = 1000;
param.call = option.getValue("call");
if(param.call.empty()) param.call = "sync";
param.thread = TC_Common::strto<int>(option.getValue("thread"));
if(param.thread <= 0) param.thread = 1;
param.netthread = TC_Common::strto<int>(option.getValue("netthread"));
if(param.netthread <= 0) param.netthread = 1;
_comm = new Communicator();
TC_Config conf;
conf.parseFile(option.getValue("config"));
_comm->setProperty(conf);
// TafRollLogger::getInstance()->logger()->setLogLevel(6);
_comm->setProperty("sendqueuelimit", "1000000");
_comm->setProperty("asyncqueuecap", "1000000");
_comm->setProperty("netthread", TC_Common::tostr(param.netthread));
int64_t start = TC_Common::now2us();
std::function<void(int)> func;
if (param.call == "sync")
{
func = syncCall;
param.pPrx = _comm->stringToProxy<HelloPrx>(helloObj);
}
else if (param.call == "async")
{
func = asyncCall;
param.pPrx = _comm->stringToProxy<HelloPrx>(helloObj);
}
else if (param.call == "syncv6")
{
func = syncCall;
param.pPrx = _comm->stringToProxy<HelloPrx>(ipv6Obj);
}
else if (param.call == "asyncv6")
{
func = asyncCall;
param.pPrx = _comm->stringToProxy<HelloPrx>(ipv6Obj);
}
else
{
cout << "no func, exits" << endl;
exit(0);
}
param.pPrx->tars_connect_timeout(5000);
param.pPrx->tars_async_timeout(60*1000);
param.pPrx->tars_ping();
vector<std::thread*> vt;
for(int i = 0 ; i< param.thread; i++)
{
vt.push_back(new std::thread(func, param.count));
}
std::thread print([&]{while(callback_count != param.count * param.thread) {
cout << "Auth:" << param.call << " : ----------finish count:" << callback_count << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
};});
for(size_t i = 0 ; i< vt.size(); i++)
{
vt[i]->join();
delete vt[i];
}
cout << "(pid:" << std::this_thread::get_id() << ")"
<< "(count:" << param.count << ")"
<< "(use ms:" << (TC_Common::now2us() - start)/1000 << ")"
<< endl;
while(callback_count != param.count * param.thread) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
print.join();
cout << "Auth:" << param.call << " ----------finish count:" << callback_count << endl;
}
catch(exception &ex)
{
cout << ex.what() << endl;
}
cout << "main return." << endl;
return 0;
}

View File

@ -0,0 +1,2 @@
build_tars_server("UdpServer" "")

View File

@ -0,0 +1,471 @@
// **********************************************************************
// This file was generated by a TARS parser!
// TARS version 2.0.0.
// **********************************************************************
#ifndef __HELLO_H_
#define __HELLO_H_
#include <map>
#include <string>
#include <vector>
#include "tup/Tars.h"
#include "tup/TarsJson.h"
using namespace std;
#include "servant/ServantProxy.h"
#include "servant/Servant.h"
namespace TestApp
{
/* callback of async proxy for client */
class HelloPrxCallback: public tars::ServantProxyCallback
{
public:
virtual ~HelloPrxCallback(){}
virtual void callback_test(tars::Int32 ret)
{ throw std::runtime_error("callback_test() override incorrect."); }
virtual void callback_test_exception(tars::Int32 ret)
{ throw std::runtime_error("callback_test_exception() override incorrect."); }
virtual void callback_testHello(tars::Int32 ret, const std::string& sRsp)
{ throw std::runtime_error("callback_testHello() override incorrect."); }
virtual void callback_testHello_exception(tars::Int32 ret)
{ throw std::runtime_error("callback_testHello_exception() override incorrect."); }
public:
virtual const map<std::string, std::string> & getResponseContext() const
{
CallbackThreadData * pCbtd = CallbackThreadData::getData();
assert(pCbtd != NULL);
if(!pCbtd->getContextValid())
{
throw TC_Exception("cann't get response context");
}
return pCbtd->getResponseContext();
}
public:
virtual int onDispatch(tars::ReqMessagePtr msg)
{
static ::std::string __Hello_all[]=
{
"test",
"testHello"
};
pair<string*, string*> r = equal_range(__Hello_all, __Hello_all+2, string(msg->request.sFuncName));
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __Hello_all)
{
case 0:
{
if (msg->response->iRet != tars::TARSSERVERSUCCESS)
{
callback_test_exception(msg->response->iRet);
return msg->response->iRet;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(msg->response->sBuffer);
tars::Int32 _ret;
_is.read(_ret, 0, true);
CallbackThreadData * pCbtd = CallbackThreadData::getData();
assert(pCbtd != NULL);
pCbtd->setResponseContext(msg->response->context);
callback_test(_ret);
pCbtd->delResponseContext();
return tars::TARSSERVERSUCCESS;
}
case 1:
{
if (msg->response->iRet != tars::TARSSERVERSUCCESS)
{
callback_testHello_exception(msg->response->iRet);
return msg->response->iRet;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(msg->response->sBuffer);
tars::Int32 _ret;
_is.read(_ret, 0, true);
std::string sRsp;
_is.read(sRsp, 2, true);
CallbackThreadData * pCbtd = CallbackThreadData::getData();
assert(pCbtd != NULL);
pCbtd->setResponseContext(msg->response->context);
callback_testHello(_ret, sRsp);
pCbtd->delResponseContext();
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
};
typedef tars::TC_AutoPtr<HelloPrxCallback> HelloPrxCallbackPtr;
/* callback of coroutine async proxy for client */
class HelloCoroPrxCallback: public HelloPrxCallback
{
public:
virtual ~HelloCoroPrxCallback(){}
public:
virtual const map<std::string, std::string> & getResponseContext() const { return _mRspContext; }
virtual void setResponseContext(const map<std::string, std::string> &mContext) { _mRspContext = mContext; }
public:
int onDispatch(tars::ReqMessagePtr msg)
{
static ::std::string __Hello_all[]=
{
"test",
"testHello"
};
pair<string*, string*> r = equal_range(__Hello_all, __Hello_all+2, string(msg->request.sFuncName));
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __Hello_all)
{
case 0:
{
if (msg->response->iRet != tars::TARSSERVERSUCCESS)
{
callback_test_exception(msg->response->iRet);
return msg->response->iRet;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(msg->response->sBuffer);
try
{
tars::Int32 _ret;
_is.read(_ret, 0, true);
setResponseContext(msg->response->context);
callback_test(_ret);
}
catch(std::exception &ex)
{
callback_test_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
catch(...)
{
callback_test_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
return tars::TARSSERVERSUCCESS;
}
case 1:
{
if (msg->response->iRet != tars::TARSSERVERSUCCESS)
{
callback_testHello_exception(msg->response->iRet);
return msg->response->iRet;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(msg->response->sBuffer);
try
{
tars::Int32 _ret;
_is.read(_ret, 0, true);
std::string sRsp;
_is.read(sRsp, 2, true);
setResponseContext(msg->response->context);
callback_testHello(_ret, sRsp);
}
catch(std::exception &ex)
{
callback_testHello_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
catch(...)
{
callback_testHello_exception(tars::TARSCLIENTDECODEERR);
return tars::TARSCLIENTDECODEERR;
}
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
protected:
map<std::string, std::string> _mRspContext;
};
typedef tars::TC_AutoPtr<HelloCoroPrxCallback> HelloCoroPrxCallbackPtr;
/* proxy for client */
class HelloProxy : public tars::ServantProxy
{
public:
typedef map<string, string> TARS_CONTEXT;
tars::Int32 test(const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL)
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
std::map<string, string> _mStatus;
shared_ptr<tars::ResponsePacket> rep = tars_invoke(tars::TARSNORMAL,"test", _os, context, _mStatus);
if(pResponseContext)
{
pResponseContext->swap(rep->context);
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(rep->sBuffer);
tars::Int32 _ret;
_is.read(_ret, 0, true);
return _ret;
}
void async_test(HelloPrxCallbackPtr callback,const map<string, string>& context = TARS_CONTEXT())
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
std::map<string, string> _mStatus;
tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback);
}
void coro_test(HelloCoroPrxCallbackPtr callback,const map<string, string>& context = TARS_CONTEXT())
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
std::map<string, string> _mStatus;
tars_invoke_async(tars::TARSNORMAL,"test", _os, context, _mStatus, callback, true);
}
tars::Int32 testHello(const std::string & sReq,std::string &sRsp,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL)
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(sReq, 1);
_os.write(sRsp, 2);
std::map<string, string> _mStatus;
shared_ptr<tars::ResponsePacket> rep = tars_invoke(tars::TARSNORMAL,"testHello", _os, context, _mStatus);
if(pResponseContext)
{
pResponseContext->swap(rep->context);
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(rep->sBuffer);
tars::Int32 _ret;
_is.read(_ret, 0, true);
_is.read(sRsp, 2, true);
return _ret;
}
void async_testHello(HelloPrxCallbackPtr callback,const std::string &sReq,const map<string, string>& context = TARS_CONTEXT())
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(sReq, 1);
std::map<string, string> _mStatus;
tars_invoke_async(tars::TARSNORMAL,"testHello", _os, context, _mStatus, callback);
}
void coro_testHello(HelloCoroPrxCallbackPtr callback,const std::string &sReq,const map<string, string>& context = TARS_CONTEXT())
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(sReq, 1);
std::map<string, string> _mStatus;
tars_invoke_async(tars::TARSNORMAL,"testHello", _os, context, _mStatus, callback, true);
}
HelloProxy* tars_hash(int64_t key)
{
return (HelloProxy*)ServantProxy::tars_hash(key);
}
HelloProxy* tars_consistent_hash(int64_t key)
{
return (HelloProxy*)ServantProxy::tars_consistent_hash(key);
}
HelloProxy* tars_set_timeout(int msecond)
{
return (HelloProxy*)ServantProxy::tars_set_timeout(msecond);
}
static const char* tars_prxname() { return "HelloProxy"; }
};
typedef tars::TC_AutoPtr<HelloProxy> HelloPrx;
/* servant for server */
class Hello : public tars::Servant
{
public:
virtual ~Hello(){}
virtual tars::Int32 test(tars::TarsCurrentPtr current) = 0;
static void async_response_test(tars::TarsCurrentPtr current, tars::Int32 _ret)
{
if (current->getRequestVersion() == TUPVERSION )
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> tarsAttr;
tarsAttr.setVersion(current->getRequestVersion());
tarsAttr.put("", _ret);
vector<char> sTupResponseBuffer;
tarsAttr.encode(sTupResponseBuffer);
current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer);
}
else
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(_ret, 0);
current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer());
}
}
virtual tars::Int32 testHello(const std::string & sReq,std::string &sRsp,tars::TarsCurrentPtr current) = 0;
static void async_response_testHello(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &sRsp)
{
if (current->getRequestVersion() == TUPVERSION )
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> tarsAttr;
tarsAttr.setVersion(current->getRequestVersion());
tarsAttr.put("", _ret);
tarsAttr.put("sRsp", sRsp);
vector<char> sTupResponseBuffer;
tarsAttr.encode(sTupResponseBuffer);
current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer);
}
else
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(_ret, 0);
_os.write(sRsp, 2);
current->sendResponse(tars::TARSSERVERSUCCESS, _os.getByteBuffer());
}
}
public:
int onDispatch(tars::TarsCurrentPtr _current, vector<char> &_sResponseBuffer)
{
static ::std::string __TestApp__Hello_all[]=
{
"test",
"testHello"
};
pair<string*, string*> r = equal_range(__TestApp__Hello_all, __TestApp__Hello_all+2, _current->getFuncName());
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __TestApp__Hello_all)
{
case 0:
{
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_current->getRequestBuffer());
if (_current->getRequestVersion() == TUPVERSION)
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> tarsAttr;
tarsAttr.setVersion(_current->getRequestVersion());
tarsAttr.decode(_current->getRequestBuffer());
}
else
{
}
tars::Int32 _ret = test(_current);
if(_current->isResponse())
{
if (_current->getRequestVersion() == TUPVERSION )
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> tarsAttr;
tarsAttr.setVersion(_current->getRequestVersion());
tarsAttr.put("", _ret);
tarsAttr.encode(_sResponseBuffer);
}
else
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(_ret, 0);
_os.swap(_sResponseBuffer);
}
}
return tars::TARSSERVERSUCCESS;
}
case 1:
{
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_current->getRequestBuffer());
std::string sReq;
std::string sRsp;
if (_current->getRequestVersion() == TUPVERSION)
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> tarsAttr;
tarsAttr.setVersion(_current->getRequestVersion());
tarsAttr.decode(_current->getRequestBuffer());
tarsAttr.get("sReq", sReq);
tarsAttr.getByDefault("sRsp", sRsp, sRsp);
}
else
{
_is.read(sReq, 1, true);
_is.read(sRsp, 2, false);
}
tars::Int32 _ret = testHello(sReq,sRsp, _current);
if(_current->isResponse())
{
if (_current->getRequestVersion() == TUPVERSION )
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> tarsAttr;
tarsAttr.setVersion(_current->getRequestVersion());
tarsAttr.put("", _ret);
tarsAttr.put("sRsp", sRsp);
tarsAttr.encode(_sResponseBuffer);
}
else
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
_os.write(_ret, 0);
_os.write(sRsp, 2);
_os.swap(_sResponseBuffer);
}
}
return tars::TARSSERVERSUCCESS;
}
}
return tars::TARSSERVERNOFUNCERR;
}
};
}
#endif

View File

@ -0,0 +1,26 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
module TestApp
{
interface Hello
{
int test();
int testHello(string sReq, out string sRsp);
};
};

View File

@ -0,0 +1,43 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#include "HelloImp.h"
#include "servant/Application.h"
using namespace std;
//////////////////////////////////////////////////////
void HelloImp::initialize()
{
//initialize servant here:
//...
}
//////////////////////////////////////////////////////
void HelloImp::destroy()
{
//destroy servant here:
//...
}
int HelloImp::testHello(const std::string &sReq, std::string &sRsp, tars::TarsCurrentPtr current)
{
// TLOGDEBUG("HelloImp::testHellosReq:"<<sReq<<endl);
// cout << sReq << endl;
sRsp = sReq;
return 0;
}

View File

@ -0,0 +1,53 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#ifndef _SSLImp_H_
#define _SSLImp_H_
#include "servant/Application.h"
#include "Hello.h"
/**
*
*
*/
class HelloImp : public TestApp::Hello
{
public:
/**
*
*/
virtual ~HelloImp() {}
/**
*
*/
virtual void initialize();
/**
*
*/
virtual void destroy();
/**
*
*/
virtual int test(tars::TarsCurrentPtr current) { return 0;};
virtual int testHello(const std::string &sReq, std::string &sRsp, tars::TarsCurrentPtr current);
};
/////////////////////////////////////////////////////
#endif

View File

@ -0,0 +1,60 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#include "HelloServer.h"
#include "HelloImp.h"
using namespace std;
HelloServer g_app;
/////////////////////////////////////////////////////////////////
void
HelloServer::initialize()
{
//initialize application here:
//...
addServant<HelloImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".UdpObj");
addServant<HelloImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".Ipv6Obj");
}
/////////////////////////////////////////////////////////////////
void
HelloServer::destroyApp()
{
//destroy application here:
//...
}
/////////////////////////////////////////////////////////////////
int
main(int argc, char* argv[])
{
try
{
g_app.main(argc, argv);
g_app.waitForShutdown();
}
catch (std::exception& e)
{
cerr << "std::exception:" << e.what() << std::endl;
}
catch (...)
{
cerr << "unknown exception." << std::endl;
}
return -1;
}
/////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,50 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#ifndef _HelloServer_H_
#define _HelloServer_H_
#include <iostream>
#include "servant/Application.h"
using namespace tars;
/**
*
**/
class HelloServer : public Application
{
public:
/**
*
**/
virtual ~HelloServer() {};
/**
*
**/
virtual void initialize();
/**
*
**/
virtual void destroyApp();
};
extern HelloServer g_app;
////////////////////////////////////////////
#endif

View File

@ -0,0 +1,87 @@
<tars>
<application>
<client>
#tarsregistry locator
locator = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
#max invoke timeout
sync-invoke-timeout = 5000
#refresh endpoint interval
refresh-endpoint-interval = 10000
#stat obj
stat = tars.tarsstat.StatObj
#max send queue length limit
sendqueuelimit = 100000
#async queue length limit
asyncqueuecap = 100000
#async callback thread num
asyncthread = 3
#net thread
netthread = 1
#merge net and sync thread
mergenetasync = 0
#module name
modulename = TestApp.UdpServer
</client>
<server>
#not cout
closecout = 0
#app name
app = TestApp
#server name
server = UdpServer
#path
basepath = ./
datapath = ./
#log path
logpath = ./
#merge net and imp thread
mergenetimp = 0
#local ip, for tarsnode
# local = tcp -h 127.0.0.1 -p 15001 -t 10000
#tarsnode
# node = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
#config obj
# config = tars.tarsconfig.ConfigObj
#notify obj
# notify = tars.tarsconfig.NotifyObj
#log obj
# log = tars.tarslog.LogObj
<TestApp.UdpServer.UdpObjAdapter>
#ip:port:timeout
endpoint = udp -h 127.0.0.1 -p 9016 -t 10000 -e 1
#allow ip
allow =
#max connection num
maxconns = 4096
#imp thread num
threads = 5
#servant
servant = TestApp.UdpServer.UdpObj
#queue capacity
queuecap = 1000000
#tars protocol
protocol = tars
</TestApp.UdpServer.UdpObjAdapter>
<Ipv6Adapter>
#ip:port:timeout
endpoint = udp -h ::1 -p 25460 -t 10000
#允许的IP地址
allow =
#最大连接数
maxconns = 4096
#当前线程个数
threads = 5
#处理对象
servant = TestApp.UdpServer.Ipv6Obj
#队列最大包个数
queuecap = 1000000
</Ipv6Adapter>
</server>
</application>
</tars>

View File

@ -0,0 +1,30 @@
echo "run-auth.bat"
set EXE_PATH=%1
set SRC_PATH=%2
echo %EXE_PATH% %SRC_PATH%
taskkill /im UdpServer.exe /t /f
timeout /T 1
echo "start server: %EXE_PATH%/UdpServer.exe --config=%SRC_PATH%/examples/UdpDemo/Server/config.conf"
start /b %EXE_PATH%\\UdpServer.exe --config=%SRC_PATH%\\examples\\UdpDemo\\Server\\config.conf
timeout /T 3
echo "client: ${EXE_PATH}/UdpClient.exe"
%EXE_PATH%\\UdpClient.exe --count=10000 --thread=2 --call=sync --buffersize=1000 --netthread=1
%EXE_PATH%\\UdpClient.exe --count=10000 --thread=2 --call=async --buffersize=1000 --netthread=1
timeout /T 1
taskkill /im UdpServer.exe /t /f

View File

@ -0,0 +1,28 @@
#!/bin/bash
echo "run-auth.sh"
EXE_PATH=$1
SRC_PATH=$2
echo ${EXE_PATH} ${SRC_PATH}
killall -9 UdpServer
sleep 1
echo "start server: ${EXE_PATH}/UdpServer --config=${SRC_PATH}/examples/UdpDemo/Server/config.conf &"
${EXE_PATH}/UdpServer --config=${SRC_PATH}/examples/UdpDemo/Server/config.conf &
sleep 1
echo "client: ${EXE_PATH}/UdpClient"
${EXE_PATH}/UdpClient --config=${SRC_PATH}/examples/UdpDemo/Client/config.conf --count=10000 --thread=2 --call=sync --buffersize=1000 --netthread=1
${EXE_PATH}/UdpClient --config=${SRC_PATH}/examples/UdpDemo/Client/config.conf --count=10000 --thread=2 --call=async --buffersize=1000 --netthread=1
sleep 1
killall -9 UdpServer

View File

@ -75,7 +75,7 @@ vector<char> ProxyProtocol::http1Request(tars::RequestPacket& request, Transceiv
struct Http1Context
{
string buff;
// string buff;
TC_HttpResponse httpRsp;
};
@ -90,10 +90,10 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http1Response(TC_NetWorkBuffer &in,
in.setContextData(context, [=]{ delete context; });
}
context->buff.append(in.getBuffersString());
in.clearBuffers();
// context->buff.append(in.getBuffersString());
// in.clearBuffers();
if(context->httpRsp.incrementDecode(context->buff))
if(context->httpRsp.incrementDecode(in))
{
rsp.iRequestId = ((Transceiver*)(in.getConnection()))->getAdapterProxy()->getId();

View File

@ -84,6 +84,8 @@ bool ServerConfig::ManualListen = false; //手工启动监听端口
bool ServerConfig::MergeNetImp = false; //合并网络和处理线程
int ServerConfig::NetThread = 1; //servernet thread
bool ServerConfig::CloseCout = true;
int ServerConfig::BackPacketLimit = 0;
int ServerConfig::BackPacketMin = 1024;
#if TARS_SSL
std::string ServerConfig::CA;
@ -92,8 +94,6 @@ std::string ServerConfig::Key;
bool ServerConfig::VerifyClient = false;
#endif
#define OUT_LINE (TC_Common::outfill("", '-', 80))
#define OUT_LINE_LONG (TC_Common::outfill("", '=', 80))
///////////////////////////////////////////////////////////////////////////////////////////
TC_Config Application::_conf;
@ -157,14 +157,12 @@ void reportRspQueue(TC_EpollServer *epollServer)
{
iLastCheckTime = iNow;
vector<TC_EpollServer::NetThread*> vNetThread = epollServer->getNetThread();
unsigned int iNetThreadNum = epollServer->getNetThreadNum();
const vector<TC_EpollServer::BindAdapterPtr> &adapters = epollServer->getBindAdapters();
size_t n = 0;
for (size_t i = 0; i < iNetThreadNum; ++i)
for (size_t i = 0; i < adapters.size(); ++i)
{
n = n + vNetThread[i]->getSendRspSize();
n = n + adapters[i]->getSendBufferSize();
}
g_pReportRspQueue->report((int)n);
@ -419,14 +417,20 @@ bool Application::cmdConnections(const string& command, const string& params, st
os << TC_Common::outfill("conn-uid", ' ', 15)
<< TC_Common::outfill("ip:port", ' ', 25)
<< TC_Common::outfill("last-time", ' ', 25)
<< TC_Common::outfill("timeout", ' ', 10) << endl;
<< TC_Common::outfill("timeout", ' ', 10)
<< TC_Common::outfill("recvBufferSize", ' ', 30)
<< TC_Common::outfill("sendBufferSize", ' ', 30)
<< endl;
for (size_t i = 0; i < v.size(); i++)
{
os << TC_Common::outfill(TC_Common::tostr<uint32_t>(v[i].uid), ' ', 15)
<< TC_Common::outfill(v[i].ip + ":" + TC_Common::tostr(v[i].port), ' ', 25)
<< TC_Common::outfill(TC_Common::tm2str(v[i].iLastRefreshTime, "%Y-%m-%d %H:%M:%S"), ' ', 25)
<< TC_Common::outfill(TC_Common::tostr(v[i].timeout), ' ', 10) << endl;
<< TC_Common::outfill(TC_Common::tostr(v[i].timeout), ' ', 10)
<< TC_Common::outfill(TC_Common::tostr(v[i].recvBufferSize), ' ', 30)
<< TC_Common::outfill(TC_Common::tostr(v[i].sendBufferSize), ' ', 30)
<< endl;
}
}
os << OUT_LINE_LONG << endl;
@ -574,6 +578,31 @@ bool Application::cmdReloadLocator(const string& command, const string& params,
return bSucc;
}
bool Application::cmdViewResource(const string& command, const string& params, string& result)
{
TLOGDEBUG("Application::cmdViewResource:" << command << " " << params << endl);
ostringstream os;
os << _communicator->getResouresInfo() << endl;
os << OUT_LINE << endl;
vector<TC_EpollServer::BindAdapterPtr> adapters = _epollServer->getBindAdapters();
for(auto adapter : adapters)
{
outAdapter(os, ServantHelperManager::getInstance()->getAdapterServant(adapter->getName()), adapter);
os << TC_Common::outfill("recv-buffer-count") << adapter->getRecvBufferSize() << endl;
os << TC_Common::outfill("send-buffer-count") << adapter->getSendBufferSize() << endl;
}
result += os.str();
TLOGDEBUG("Application::cmdViewResource result:" << result << endl);
return true;
}
void Application::outAllAdapter(ostream &os)
{
auto m = _epollServer->getListenSocketInfo();
@ -726,6 +755,12 @@ void Application::main(const TC_Option &option)
//设置是否标准输出
TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_CLOSE_COUT, Application::cmdCloseCout);
//设置是否标准输出
TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_RELOAD_LOCATOR, Application::cmdReloadLocator);
//设置是否标准输出
TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_RESOURCE, Application::cmdViewResource);
//上报版本
TARS_REPORTVERSION(TARS_VERSION);
@ -921,12 +956,16 @@ void Application::outServer(ostream &os)
os << TC_Common::outfill("NetThread(netthread)") << ServerConfig::NetThread << endl;
os << TC_Common::outfill("ManualListen(manuallisten)") << ServerConfig::ManualListen << endl;
os << TC_Common::outfill("MergeNetImp(mergenetimp)") << ServerConfig::MergeNetImp << endl;
os << TC_Common::outfill("ReportFlow") << ServerConfig::ReportFlow<< endl;
#if TARS_SSL
cout << TC_Common::outfill("Ca") << ServerConfig::CA << endl;
cout << TC_Common::outfill("Cert") << ServerConfig::Cert << endl;
cout << TC_Common::outfill("Key") << ServerConfig::Key << endl;
cout << TC_Common::outfill("VerifyClient") << ServerConfig::VerifyClient << endl;
os << TC_Common::outfill("ReportFlow(reportflow)") << ServerConfig::ReportFlow<< endl;
os << TC_Common::outfill("BackPacketLimit(backpacketlimit)") << ServerConfig::BackPacketLimit<< endl;
os << TC_Common::outfill("BackPacketMin(backpacketmin)") << ServerConfig::BackPacketMin<< endl;
#if TAF_SSL
cout << TC_Common::outfill("Ca(ca)") << ServerConfig::CA << endl;
cout << TC_Common::outfill("Cert(cert)") << ServerConfig::Cert << endl;
cout << TC_Common::outfill("Key(key)") << ServerConfig::Key << endl;
cout << TC_Common::outfill("VerifyClient(verifyclient)") << ServerConfig::VerifyClient << endl;
// cout << TC_Common::outfill("Ciphers(ciphers)") << ServerConfig::Ciphers << endl;
#endif
}
@ -997,6 +1036,8 @@ void Application::initializeServer()
ServerConfig::MergeNetImp = _conf.get("/tars/application/server<mergenetimp>", "0") == "0" ? false : true;
ServerConfig::NetThread = TC_Common::strto<int>(toDefault(_conf.get("/tars/application/server<nethread>"), "1"));
ServerConfig::CloseCout = _conf.get("/tars/application/server<closecout>","1")=="0"?0:1;
ServerConfig::BackPacketLimit = TC_Common::strto<int>(_conf.get("/tars/application/server<backpacketlimit>", "100*1024*1024"));
ServerConfig::BackPacketMin = TC_Common::strto<int>(_conf.get("/tars/application/server<backpacketmin>", "1024"));
#if TARS_SSL
ServerConfig::CA = _conf.get("/tars/application/server<ca>");
@ -1243,7 +1284,10 @@ void Application::bindAdapter(vector<TC_EpollServer::BindAdapterPtr>& adapters)
bindAdapter->setProtocolName(_conf.get(sLastPath + "<protocol>", "tars"));
if (bindAdapter->isTarsProtocol())
bindAdapter->setBackPacketBuffLimit(ServerConfig::BackPacketLimit);
bindAdapter->setBackPacketBuffMin(ServerConfig::BackPacketMin);
if (bindAdapter->isTarsProtocol())
{
bindAdapter->setProtocol(AppProtocol::parse);
}

View File

@ -15,8 +15,8 @@
*/
#include "util/tc_file.h"
#include "servant/Communicator.h"
#include "servant/Application.h"
#include "servant/StatReport.h"
#include "servant/TarsLogger.h"
@ -458,10 +458,21 @@ vector<TC_Endpoint> Communicator::getEndpoint(const string & objName)
vector<TC_Endpoint> Communicator::getEndpoint4All(const string & objName)
{
ServantProxy * pServantProxy = getServantProxy(objName);
ServantProxy *pServantProxy = getServantProxy(objName);
return pServantProxy->getEndpoint4All();
}
string Communicator::getResouresInfo()
{
ostringstream os;
for (size_t i = 0; i < _clientThreadNum; ++i)
{
os << OUT_LINE << endl;
os << _communicatorEpoll[i]->getResouresInfo();
}
return os.str();
}
void Communicator::terminate()
{
{

View File

@ -16,6 +16,7 @@
#include "servant/CommunicatorEpoll.h"
#include "servant/Communicator.h"
#include "servant/Application.h"
#include "servant/TarsLogger.h"
#include "servant/StatReport.h"
@ -380,7 +381,60 @@ void CommunicatorEpoll::doStat()
void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage * msg)
{
_communicator->pushAsyncThreadQueue(msg);
}
void CommunicatorEpoll::reConnect(int64_t ms, Transceiver*p)
{
_reconnect[ms] = p;
}
string CommunicatorEpoll::getResouresInfo()
{
ostringstream desc;
desc << TC_Common::outfill("index") << _netThreadSeq << endl;
if(_communicator->_statReport) {
desc << TC_Common::outfill("stat size") << _communicator->_statReport->getQueueSize(_netThreadSeq) << endl;
}
desc << TC_Common::outfill("obj num") << _objectProxyFactory->getObjNum() << endl;
const static string TAB = " ";
for(size_t i = 0; i < _objectProxyFactory->getObjNum(); ++i)
{
desc << TAB << OUT_LINE_TAB(1) << endl;
desc << TAB << TC_Common::outfill("obj name") << _objectProxyFactory->getObjectProxy(i)->name() << endl;
const vector<AdapterProxy*> &adapters = _objectProxyFactory->getObjectProxy(i)->getAdapters();
for(auto adapter : adapters)
{
desc << TAB << TAB << OUT_LINE_TAB(2) << endl;
desc << TAB << TAB << TC_Common::outfill("adapter") << adapter->endpoint().getEndpoint().toString() << endl;
desc << TAB << TAB << TC_Common::outfill("recv size") << adapter->trans()->getRecvBuffer()->getBufferLength() << endl;
desc << TAB << TAB << TC_Common::outfill("send size") << adapter->trans()->getSendBuffer()->getBufferLength() << endl;
}
}
return desc.str();
}
void CommunicatorEpoll::reConnect()
{
int64_t iNow = TNOWMS;
while(!_reconnect.empty())
{
auto it = _reconnect.begin();
if(it->first > iNow)
{
return;
}
it->second->reconnect();
_reconnect.erase(it++);
}
}
void CommunicatorEpoll::run()
@ -419,6 +473,7 @@ void CommunicatorEpoll::run()
//数据上报
doStat();
reConnect();
}
catch (exception& e)
{

View File

@ -67,16 +67,11 @@ ObjectProxy::~ObjectProxy()
void ObjectProxy::initialize()
{
}
//
//ServantProxy * ObjectProxy::getServantProxy()
//{
// return _pServantProxy;
//}
//
//void ObjectProxy::setServantProxy(ServantProxy * pServantProxy)
//{
// _pServantProxy = pServantProxy;
//}
const vector<AdapterProxy*> & ObjectProxy::getAdapters()
{
return _endpointManger->getAdapters();
}
int ObjectProxy::loadLocator()
{

View File

@ -163,7 +163,7 @@ ServantProxyCallback::ServantProxyCallback()
int HttpServantProxyCallback::onDispatch(ReqMessagePtr msg)
{
if (msg->response->iRet != tars::TARSSERVERSUCCESS)
if (msg->response->iRet != TARSSERVERSUCCESS)
{
return onDispatchException(msg->request, *msg->response);
}
@ -288,6 +288,13 @@ string ServantProxy::tars_name() const
return "NULL";
}
void ServantProxy::tars_reconnect(int second)
{
if (_objectProxyNum >= 1 && (*_objectProxy != NULL))
{
(*_objectProxy)->reconnect(second);
}
}
TC_Endpoint ServantProxy::tars_invoke_endpoint()
{
@ -406,9 +413,9 @@ void ServantProxy::tars_ping()
map<string, string> s;
tars::TarsOutputStream<tars::BufferWriterVector> os;
TarsOutputStream<BufferWriterVector> os;
tars_invoke(tars::TARSNORMAL, "tars_ping", os, m, s);
tars_invoke(TARSNORMAL, "tars_ping", os, m, s);
}
@ -417,9 +424,9 @@ void ServantProxy::tars_async_ping()
map<string, string> m;
map<string, string> s;
tars::TarsOutputStream<tars::BufferWriterVector> os;
TarsOutputStream<BufferWriterVector> os;
tars_invoke_async(tars::TARSONEWAY, "tars_ping", os, m, s, NULL);
tars_invoke_async(TARSONEWAY, "tars_ping", os, m, s, NULL);
}
ServantProxy* ServantProxy::tars_hash(int64_t key)
@ -591,7 +598,7 @@ void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
//如果是按set规则调用
if (pObjProxy && pObjProxy->isInvokeBySet())
{
SET_MSG_TYPE(msg->request.iMessageType, tars::TARSMESSAGETYPESETNAME);
SET_MSG_TYPE(msg->request.iMessageType, TARSMESSAGETYPESETNAME);
msg->request.status[ServantProxy::STATUS_SETNAME_VALUE] = pObjProxy->getInvokeSetName();
TLOGTARS("[TARS][ServantProxy::invoke, " << msg->request.sServantName << ", invoke with set,"<<pObjProxy->getInvokeSetName()<<"]" << endl);
@ -742,7 +749,7 @@ void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
//////////////////////////////////////////////////////////////////
void ServantProxy::tars_invoke_async(char cPacketType,
const string &sFuncName,
tars::TarsOutputStream<tars::BufferWriterVector> &buf,
TarsOutputStream<BufferWriterVector> &buf,
const map<string, string>& context,
const map<string, string>& status,
const ServantProxyCallbackPtr& callback,
@ -777,7 +784,7 @@ void ServantProxy::tars_invoke_async(char cPacketType,
shared_ptr<ResponsePacket> ServantProxy::tars_invoke(char cPacketType,
const string& sFuncName,
tars::TarsOutputStream<tars::BufferWriterVector>& buf,
TarsOutputStream<BufferWriterVector>& buf,
const map<string, string>& context,
const map<string, string>& status)
// ResponsePacket& rsp)
@ -964,7 +971,7 @@ void ServantProxy::checkDye(RequestPacket& req)
assert(pSptd != NULL);
if(pSptd && pSptd->_dyeing)
{
SET_MSG_TYPE(req.iMessageType, tars::TARSMESSAGETYPEDYED);
SET_MSG_TYPE(req.iMessageType, TARSMESSAGETYPEDYED);
req.status[ServantProxy::STATUS_DYED_KEY] = pSptd->_dyeingKey;
}

View File

@ -318,17 +318,19 @@ void StatReport::report(const string& strModuleName,
}
head.interfaceName = trimAndLimitStr(strInterfaceName, MAX_MASTER_NAME_LEN);
head.slavePort = iPort;
head.returnValue = iReturnValue;
//包体信息.
if(eResult == STAT_SUCC)
if (eResult == STAT_SUCC)
{
body.count = 1;
body.totalRspTime = body.minRspTime = body.maxRspTime = iSptime;
}
else if(eResult == STAT_TIMEOUT)
else if (eResult == STAT_TIMEOUT)
{
body.timeoutCount = 1;
}
@ -381,23 +383,24 @@ void StatReport::report(const string& strMasterName,
submit(head, body, true);
}
string StatReport::sampleUnid()
{
static atomic<int> g_id(rand());
char s[14] = {0};
time_t t = TNOW;
int ip = inet_addr(_ip.c_str());
int thread = ++g_id;
static unsigned short n = 0;
++n;
memcpy( s, &ip, 4 );
memcpy( s + 4, &t, 4);
memcpy( s + 8, &thread, 4);
memcpy( s + 12, &n, 2 );
return TC_Common::bin2str(string(s,14));
}
//
//string StatReport::sampleUnid()
//{
//
// static atomic<int> g_id(rand());
//
// char s[14] = { 0 };
// time_t t = TNOW;
// int ip = inet_addr(_ip.c_str());
// int thread = ++g_id;
// static unsigned short n = 0;
// ++n;
// memcpy(s, &ip, 4);
// memcpy(s + 4, &t, 4);
// memcpy(s + 8, &thread, 4);
// memcpy(s + 12, &n, 2);
// return TC_Common::bin2str(string(s, 14));
//}
void StatReport::submit( StatMicMsgHead& head, StatMicMsgBody& body,bool bFromClient )
{
@ -430,13 +433,23 @@ void StatReport::submit( StatMicMsgHead& head, StatMicMsgBody& body,bool bFromCl
}
}
void StatReport::doSample(const string& strSlaveName,
const string& strInterfaceName,
const string& strSlaveIp,
map<string, string> &status)
size_t StatReport::getQueueSize(size_t epollIndex)
{
if(epollIndex >= _statMsg.size())
{
return 0;
}
return _statMsg[epollIndex]->size();
}
//void StatReport::doSample(const string& strSlaveName,
// const string& strInterfaceName,
// const string& strSlaveIp,
// map<string, string>& status)
//{
//}
int StatReport::reportMicMsg(MapStatMicMsg& msg,bool bFromClient)
{
if (msg.empty()) return 0;

View File

@ -70,8 +70,6 @@ bool Transceiver::isSSL() const
void Transceiver::reconnect()
{
close();
connect();
}
@ -157,6 +155,11 @@ void Transceiver::setConnected()
TLOGTARS("[TARS][tcp setConnected, " << _adapterProxy->getObjProxy()->name() << ",fd:" << _fd << "]" << endl);
onConnect();
if(_adapterProxy->getObjProxy()->getPushCallback())
{
_adapterProxy->getObjProxy()->getPushCallback()->onConnect(_ep.getEndpoint());
}
}
void Transceiver::onConnect()
@ -401,11 +404,9 @@ bool Transceiver::sendAuthData(const BasicAuthInfo& info)
void Transceiver::close()
{
if(!isValid()) return;
// if(_adapterProxy->getObjProxy()->getPushCallback())
// {
// _adapterProxy->getObjProxy()->getPushCallback()->onClose();
// }
#if TARS_SSL
#if TAF_SSL
if (_openssl)
{
_openssl->release();
@ -427,7 +428,21 @@ void Transceiver::close()
_authState = AUTH_INIT;
TLOGTARS("[TARS][trans close:"<< _adapterProxy->getObjProxy()->name()<< "," << _ep.desc() << "]" << endl);
if(_adapterProxy->getObjProxy()->getPushCallback())
{
_adapterProxy->getObjProxy()->getPushCallback()->onClose();
}
int second = _adapterProxy->getObjProxy()->reconnect();
if(second > 0) {
_adapterProxy->getObjProxy()->getCommunicatorEpoll()->reConnect(TNOWMS + second * 1000, this);
TLOGERROR("[TAF][trans close:" << _adapterProxy->getObjProxy()->name() << "," << _ep.desc() << ", reconnect:" << second << "]" << endl);
}
// else
// {
// TLOGERROR("[TAF][trans close:" << _adapterProxy->getObjProxy()->name() << "," << _ep.desc() << "]" << endl);
// }
}
int Transceiver::doRequest()
@ -998,7 +1013,8 @@ int UdpTransceiver::send(const void* buf, uint32_t len, uint32_t flag)
{
if(!isValid()) return -1;
int iRet=::sendto(_fd, (const char*)buf, len, flag, (struct sockaddr*) &(_ep.addr()), sizeof(sockaddr));
socklen_t addrlen = _ep.isIPv6() ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
int iRet=::sendto(_fd, (const char*)buf, len, flag, _ep.addrPtr(), addrlen);
if (iRet<0)
{

View File

@ -40,7 +40,12 @@
namespace tars
{
//////////////////////////////////////////////////////////////////////
//#ifndef GEN_PYTHON_MASK
#define OUT_LINE (TC_Common::outfill("", '-', 80))
#define OUT_LINE_LONG (TC_Common::outfill("", '=', 80))
#define OUT_LINE_TAB(x) (TC_Common::outfill("", '-', 80 - 4*x))
/**
*
*/
@ -56,7 +61,7 @@ namespace tars
#define TARS_CMD_SET_DAYLOG_LEVEL "tars.enabledaylog" //设置按天日志是否输出: tars.enabledaylog [remote|local]|[logname]|[true|false]
#define TARS_CMD_CLOSE_CORE "tars.closecore" //设置服务的core limit: tars.setlimit [yes|no]
#define TARS_CMD_RELOAD_LOCATOR "tars.reloadlocator" //重新加载locator的配置信息
#define TARS_CMD_RESOURCE "tars.resource" //get resource
//////////////////////////////////////////////////////////////////////
/**
* notify服务,
@ -127,6 +132,8 @@ struct ServerConfig
static int NetThread; //servernet thread
static bool ManualListen; //是否启用手工端口监听
static bool MergeNetImp; //网络线程和IMP线程合并(以网络线程个数为准)
static int BackPacketLimit; //回包积压检查
static int BackPacketMin; //回包速度检查
#if TARS_SSL
static std::string CA;
static std::string Cert;
@ -369,6 +376,14 @@ protected:
*/
bool cmdReloadLocator(const string& command, const string& params, string& result);
/*
* view server resource
* @param command
* @param params
* @param result
*/
bool cmdViewResource(const string& command, const string& params, string& result);
protected:
/**

View File

@ -240,6 +240,12 @@ public:
*/
int64_t getMinTimeout() { return _minTimeout; }
/**
* get resource info
* @return
*/
string getResouresInfo();
protected:
/**
*

View File

@ -180,6 +180,18 @@ public:
*/
void pushAsyncThreadQueue(ReqMessage * msg);
/**
* set reconnect
* @param time
*/
void reConnect(int64_t ms, Transceiver*);
/**
* communicator resource desc
* @return
*/
string getResouresInfo();
protected:
/**
*
@ -213,6 +225,11 @@ protected:
*/
void doStat();
/**
* reconnect
*/
void reConnect();
protected:
/*
*
@ -261,6 +278,12 @@ protected:
*
*/
int64_t _timeoutCheckInterval;
/**
* auto reconnect Transceiver
*/
unordered_map<int64_t, Transceiver*> _reconnect;
};
/////////////////////////////////////////////////////////////////////////////////////

View File

@ -54,11 +54,16 @@ public:
EndpointInfo(const string& host, uint16_t port, TC_Endpoint::EType type, int32_t grid, const string & setDivision, int qos, int weight = -1, unsigned int weighttype = 0, int authType = 0);
/**
* ,set信息
*
* @return string
* get endpoint
* @return
*/
const string& descNoSetInfo() const;
const TC_Endpoint &getEndpoint() const { return _ep; }
// /**
// * 地址的字符串描述,不带set信息
// *
// * @return string
// */
// const string& descNoSetInfo() const;
/**
*
@ -70,15 +75,15 @@ public:
return _desc;
}
/**
*
*
* @return string
*/
const string & compareDesc() const
{
return _cmpDesc;
}
// /**
// * 比较的地址的字符串描述
// *
// * @return string
// */
// const string & compareDesc() const
// {
// return _cmpDesc;
// }
/**
*
@ -181,13 +186,13 @@ public:
*/
bool operator == (const EndpointInfo& r) const;
/**
*,set信息不参与比较
*@param r
*
*@return bool
*/
bool equalNoSetInfo(const EndpointInfo& r) const;
// /**
// *等于,set信息不参与比较
// *@param r
// *
// *@return bool
// */
// bool equalNoSetInfo(const EndpointInfo& r) const;
/**
*

View File

@ -132,14 +132,14 @@ public:
*/
void doTimeout();
/**
* Obj的超时队列的长度
*/
size_t timeoutQSize()
{
return _reqTimeoutQueue.size();
}
// /**
// * Obj的超时队列的长度
// */
// size_t timeoutQSize()
// {
// return _reqTimeoutQueue.size();
// }
//
/**
* CommunicatorEpoll*
*/
@ -158,6 +158,24 @@ public:
}
/**
* reconnect
* @param second
*/
inline void reconnect(int second)
{
_reConnectSecond = second;
}
/**
* reconnect
* @param second
*/
inline int reconnect()
{
return _reConnectSecond;
}
/**
* obj是否走按set规则调用流程使set的set规则调用的
*/
bool isInvokeBySet() const
@ -215,6 +233,12 @@ public:
_servantProxy = pServantProxy;
}
/**
* get all adapter proxy
* @return
*/
const vector<AdapterProxy*> & getAdapters();
protected:
/**
@ -259,6 +283,11 @@ private:
*/
int _conTimeout;
/**
* reconnect, 0: not reconnect
*/
int _reConnectSecond = 0;
/*
*
*/

View File

@ -303,6 +303,16 @@ public:
*/
virtual int onDispatch(ReqMessagePtr ptr) = 0;
/**
* (PUSH callback生效)
*/
virtual void onClose(){};
/**
* (PUSH callback生效)
*/
virtual void onConnect(const TC_Endpoint &ep){};
protected:
/**
@ -499,6 +509,12 @@ public:
*/
void tars_connect_timeout(int conTimeout);
/**
* set auto reconnect time
* @return int, second
*/
void tars_reconnect(int second);
/**
* Object名称
* @return string

View File

@ -219,17 +219,17 @@ public:
*/
StatFPrx getStatPrx() {return _statPrx; }
/*
*
*/
void doSample(const string& strSlaveName,
const string& strInterfaceName,
const string& strSlaveIp,
map<string, string> &status);
/*
* id
*/
string sampleUnid();
// /*
// * 采样
// */
// void doSample(const string& strSlaveName,
// const string& strInterfaceName,
// const string& strSlaveIp,
// map<string, string> &status);
// /*
// * 采样id
// */
// string sampleUnid();
/**
* . addStatInterv(5)
@ -306,21 +306,27 @@ private:
*/
int reportPropMsg();
/**
* Prop = property
* @return int
*/
int reportPropPlusMsg();
// /**
// * 上报多维度属性信息 Prop = property
// * @return int
// */
// int reportPropPlusMsg();
/**
* stat
*/
int reportSampleMsg();
//合并两个MicMsg
void addMicMsg(MapStatMicMsg & old,MapStatMicMsg & add);
/**
* get queue info
* @return
*/
size_t getQueueSize(size_t epollIndex);
friend class CommunicatorEpoll;
private:
time_t _time;

View File

@ -9,7 +9,7 @@
#define FLEX_SCANNER
#define YY_FLEX_MAJOR_VERSION 2
#define YY_FLEX_MINOR_VERSION 5
#define YY_FLEX_SUBMINOR_VERSION 37
#define YY_FLEX_SUBMINOR_VERSION 35
#if YY_FLEX_SUBMINOR_VERSION > 0
#define FLEX_BETA
#endif
@ -47,6 +47,7 @@ typedef int16_t flex_int16_t;
typedef uint16_t flex_uint16_t;
typedef int32_t flex_int32_t;
typedef uint32_t flex_uint32_t;
typedef uint64_t flex_uint64_t;
#else
typedef signed char flex_int8_t;
typedef short int flex_int16_t;
@ -54,6 +55,7 @@ typedef int flex_int32_t;
typedef unsigned char flex_uint8_t;
typedef unsigned short int flex_uint16_t;
typedef unsigned int flex_uint32_t;
#endif /* ! C99 */
/* Limits of integral types. */
#ifndef INT8_MIN
@ -84,8 +86,6 @@ typedef unsigned int flex_uint32_t;
#define UINT32_MAX (4294967295U)
#endif
#endif /* ! C99 */
#endif /* ! FLEXINT_H */
#ifdef __cplusplus
@ -176,7 +176,7 @@ extern FILE *yyin, *yyout;
*/
#define YY_LESS_LINENO(n) \
do { \
int yyl;\
yy_size_t yyl;\
for ( yyl = n; yyl < yyleng; ++yyl )\
if ( yytext[yyl] == '\n' )\
--yylineno;\
@ -369,7 +369,7 @@ static void yy_fatal_error (yyconst char msg[] );
*/
#define YY_DO_BEFORE_ACTION \
(yytext_ptr) = yy_bp; \
yyleng = (size_t) (yy_cp - yy_bp); \
yyleng = (yy_size_t) (yy_cp - yy_bp); \
(yy_hold_char) = *yy_cp; \
*yy_cp = '\0'; \
(yy_c_buf_p) = yy_cp;
@ -513,7 +513,7 @@ int yy_flex_debug = 0;
#define YY_MORE_ADJ 0
#define YY_RESTORE_YY_MORE_OFFSET
char *yytext;
#line 1 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 1 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
/**
* Tencent is pleased to support the open source community by making Tars available.
*
@ -529,7 +529,7 @@ char *yytext;
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#line 20 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 20 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
#include <map>
#include <string>
#include <sstream>
@ -649,7 +649,7 @@ static int input (void );
/* This used to be an fputs(), but since the string might contain NUL's,
* we now use fwrite().
*/
#define ECHO do { if (fwrite( yytext, yyleng, 1, yyout )) {} } while (0)
#define ECHO fwrite( yytext, yyleng, 1, yyout )
#endif
/* Gets input and stuffs it into "buf". number of characters read, or YY_NULL,
@ -660,7 +660,7 @@ static int input (void );
if ( YY_CURRENT_BUFFER_LVALUE->yy_is_interactive ) \
{ \
int c = '*'; \
size_t n; \
yy_size_t n; \
for ( n = 0; n < max_size && \
(c = getc( yyin )) != EOF && c != '\n'; ++n ) \
buf[n] = (char) c; \
@ -742,7 +742,7 @@ YY_DECL
register char *yy_cp, *yy_bp;
register int yy_act;
#line 67 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 67 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
#line 749 "tars.lex.cpp"
@ -840,12 +840,12 @@ do_action: /* This label is used only to access EOF actions. */
case 1:
YY_RULE_SETUP
#line 69 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 69 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{ BEGIN(INCL); }
YY_BREAK
case 2:
YY_RULE_SETUP
#line 71 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 71 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
if ( include_file_stack_ptr >= MAX_INCLUDE_DEPTH )
{
@ -878,7 +878,7 @@ YY_RULE_SETUP
YY_BREAK
case YY_STATE_EOF(INITIAL):
case YY_STATE_EOF(INCL):
#line 101 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 101 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
--include_file_stack_ptr;
if ( include_file_stack_ptr < 0 )
@ -897,14 +897,14 @@ case YY_STATE_EOF(INCL):
YY_BREAK
case 3:
YY_RULE_SETUP
#line 117 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 117 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
return TARS_SCOPE_DELIMITER;
}
YY_BREAK
case 4:
YY_RULE_SETUP
#line 121 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 121 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
// C++ comment
bool e = false;
@ -925,7 +925,7 @@ YY_RULE_SETUP
YY_BREAK
case 5:
YY_RULE_SETUP
#line 139 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 139 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
// C comment
bool e = false;
@ -976,7 +976,7 @@ YY_RULE_SETUP
YY_BREAK
case 6:
YY_RULE_SETUP
#line 187 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 187 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
StringGrammarPtr ident = new StringGrammar;
ident->v = yytext;
@ -987,7 +987,7 @@ YY_RULE_SETUP
case 7:
/* rule 7 can match eol */
YY_RULE_SETUP
#line 194 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 194 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
StringGrammarPtr ident = new StringGrammar;
ident->v = yytext;
@ -1000,7 +1000,7 @@ YY_RULE_SETUP
YY_BREAK
case 8:
YY_RULE_SETUP
#line 204 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 204 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
StringGrammarPtr str = new StringGrammar;
bool e = false;
@ -1115,7 +1115,7 @@ YY_RULE_SETUP
YY_BREAK
case 9:
YY_RULE_SETUP
#line 316 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 316 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
errno = 0;
IntergerGrammarPtr ptr = new IntergerGrammar;
@ -1140,7 +1140,7 @@ YY_RULE_SETUP
YY_BREAK
case 10:
YY_RULE_SETUP
#line 338 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 338 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
errno = 0;
FloatGrammarPtr ptr = new FloatGrammar;
@ -1175,7 +1175,7 @@ YY_RULE_SETUP
case 11:
/* rule 11 can match eol */
YY_RULE_SETUP
#line 369 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 369 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
if(yytext[0] == '\n')
{
@ -1185,7 +1185,7 @@ YY_RULE_SETUP
YY_BREAK
case 12:
YY_RULE_SETUP
#line 376 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 376 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
{
if(yytext[0] < 32 || yytext[0] > 126)
{
@ -1204,7 +1204,7 @@ YY_RULE_SETUP
YY_BREAK
case 13:
YY_RULE_SETUP
#line 392 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 392 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"
ECHO;
YY_BREAK
#line 1211 "tars.lex.cpp"
@ -1398,7 +1398,7 @@ static int yy_get_next_buffer (void)
{ /* Not enough room in the buffer - grow it. */
/* just a shorter name for the current buffer */
YY_BUFFER_STATE b = YY_CURRENT_BUFFER_LVALUE;
YY_BUFFER_STATE b = YY_CURRENT_BUFFER;
int yy_c_buf_p_offset =
(int) ((yy_c_buf_p) - b->yy_ch_buf);
@ -1531,7 +1531,7 @@ static int yy_get_next_buffer (void)
yy_current_state = yy_nxt[yy_base[yy_current_state] + (unsigned int) yy_c];
yy_is_jam = (yy_current_state == 51);
return yy_is_jam ? 0 : yy_current_state;
return yy_is_jam ? 0 : yy_current_state;
}
static void yyunput (int c, register char * yy_bp )
@ -1623,7 +1623,7 @@ static int yy_get_next_buffer (void)
case EOB_ACT_END_OF_FILE:
{
if ( yywrap( ) )
return EOF;
return 0;
if ( ! (yy_did_buffer_switch_on_eof) )
YY_NEW_FILE;
@ -1764,6 +1764,10 @@ static void yy_load_buffer_state (void)
yyfree((void *) b );
}
#ifndef __cplusplus
extern int isatty (int );
#endif /* __cplusplus */
/* Initializes or reinitializes a buffer.
* This function is sometimes called more than once on the same buffer,
* such as during a yyrestart() or at EOF.
@ -1968,8 +1972,8 @@ YY_BUFFER_STATE yy_scan_string (yyconst char * yystr )
/** Setup the input buffer state to scan the given bytes. The next call to yylex() will
* scan from a @e copy of @a bytes.
* @param yybytes the byte buffer to scan
* @param _yybytes_len the number of bytes in the buffer pointed to by @a bytes.
* @param bytes the byte buffer to scan
* @param len the number of bytes in the buffer pointed to by @a bytes.
*
* @return the newly allocated buffer state object.
*/
@ -1977,8 +1981,7 @@ YY_BUFFER_STATE yy_scan_bytes (yyconst char * yybytes, yy_size_t _yybytes_len
{
YY_BUFFER_STATE b;
char *buf;
yy_size_t n;
yy_size_t i;
yy_size_t n, i;
/* Get memory for full buffer, including space for trailing EOB's. */
n = _yybytes_len + 2;
@ -2211,7 +2214,7 @@ void yyfree (void * ptr )
#define YYTABLES_NAME "yytables"
#line 392 "/home/tars/TarsFramework/tarscpp/tools/tarsgrammar/tars.l"
#line 392 "/Users/jarod/centos/TarsCpp/tools/tarsgrammar/tars.l"

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,8 @@
/* A Bison parser, made by GNU Bison 3.0.4. */
/* A Bison parser, made by GNU Bison 3.2.2. */
/* Bison interface for Yacc-like parsers in C
Copyright (C) 1984, 1989-1990, 2000-2015 Free Software Foundation, Inc.
Copyright (C) 1984, 1989-1990, 2000-2015, 2018 Free Software Foundation, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -30,6 +30,9 @@
This special exception was added by the Free Software Foundation in
version 2.2 of Bison. */
/* Undocumented macros, especially those whose name start with YY_,
are private implementation details. Do not rely on them. */
#ifndef YY_YY_TARS_TAB_HPP_INCLUDED
# define YY_YY_TARS_TAB_HPP_INCLUDED
/* Debug traces. */

View File

@ -172,18 +172,20 @@ public:
typedef TC_CasQueue<shared_ptr<SendContext>> send_queue;
typedef recv_queue::queue_type recv_queue_type;
////////////////////////////////////////////////////////////////////////////
/**
*
*/
struct ConnStatus
{
string ip;
int32_t uid;
uint16_t port;
int timeout;
int iLastRefreshTime;
};
////////////////////////////////////////////////////////////////////////////
/**
*
*/
struct ConnStatus
{
string ip;
int32_t uid;
uint16_t port;
int timeout;
int iLastRefreshTime;
size_t recvBufferSize;
size_t sendBufferSize;
};
////////////////////////////////////////////////////////////////////////////
/**
@ -642,28 +644,28 @@ public:
*/
void decreaseNowConnection();
/**
*
*/
void increaseNowConnection();
/**
*
*/
void increaseNowConnection();
/**
*
* @return ConnStatus
*/
vector<ConnStatus> getConnStatus();
/**
*
* @return ConnStatus
*/
vector<ConnStatus> getConnStatus();
/**
*
* @return int
*/
int getNowConnection() const;
/**
*
* @return int
*/
int getNowConnection() const;
/**
* EpollServer
* @return TC_EpollServer*
*/
TC_EpollServer* getEpollServer() const { return _pEpollServer; }
/**
*
* @return TC_EpollServer*
*/
TC_EpollServer* getEpollServer() const { return _pEpollServer; };
/**
* 线
@ -678,17 +680,17 @@ public:
*/
void setProtocol(const TC_NetWorkBuffer::protocol_functor& pf, int iHeaderLen = 0, const header_filter_functor& hf = echo_header_filter);
/**
*
* @return protocol_functor&
*/
TC_NetWorkBuffer::protocol_functor &getProtocol();
/**
*
* @return protocol_functor&
*/
TC_NetWorkBuffer::protocol_functor &getProtocol();
/**
*
* @return protocol_functor&
*/
header_filter_functor &getHeaderFilterFunctor();
/**
*
* @return protocol_functor&
*/
header_filter_functor &getHeaderFilterFunctor();
/**
*
@ -704,11 +706,27 @@ public:
*/
bool waitForRecvQueue(uint32_t handleIndex, shared_ptr<RecvContext> &recv);
/**
*
* @return size_t
*/
size_t getRecvBufferSize() const;
/**
*
* @return size_t
*/
size_t getRecvBufferSize() const;
/**
*
* @return size_t
*/
size_t getSendBufferSize() const;
/**
* add send buffer size
*/
inline void increaseSendBufferSize() { ++_iSendBufferSize; }
/**
* increase send buffer size
*/
inline void decreaseSendBufferSize(size_t s = 1) { _iSendBufferSize.fetch_sub(s); }
/**
* , echo
@ -783,16 +801,25 @@ public:
return _handles[index];
}
// /**
// * 设置服务端回包缓存的大小限制
// */
// void setBackPacketBuffLimit(size_t iLimitSize);
/*
* ()
*/
void setBackPacketBuffLimit(size_t iLimitSize) { _iBackPacketBuffLimit = iLimitSize; }
// /**
// * 获取服务端回包缓存的大小限制
// */
// size_t getBackPacketBuffLimit();
/**
* ()
*/
size_t getBackPacketBuffLimit() const { return _iBackPacketBuffLimit; }
/*
* 5/s最低发送字节
*/
void setBackPacketBuffMin(size_t iMinLimit) { _iBackPacketBuffMin = iMinLimit; }
/**
* 5/s最低发送字节
*/
size_t getBackPacketBuffMin() const { return _iBackPacketBuffMin; }
/**
* (_rnbuffer有多个, 线id来hash获取)
@ -943,12 +970,17 @@ public:
/**
*
*/
atomic<size_t> _iBufferSize{0};
atomic<size_t> _iRecvBufferSize{0};
/**
*
*/
int _iQueueCapacity;
/**
*
*/
atomic<size_t> _iSendBufferSize{0};
/**
*
*/
int _iQueueCapacity;
/**
* )(
@ -970,8 +1002,15 @@ public:
*/
string _protocolName;
// 回包缓存限制大小
// size_t _iBackPacketBuffLimit;
/**
*
*/
size_t _iBackPacketBuffLimit = 0;
/**
* (5/s), 1K
*/
size_t _iBackPacketBuffMin = 1024;
//队列模式
bool _queueMode = false;
@ -1121,6 +1160,16 @@ public:
*/
void tryInitAuthState(int initState);
/**
* buffer
*/
TC_NetWorkBuffer &getRecvBuffer() { return _recvBuffer; }
/**
* buffer
*/
TC_NetWorkBuffer &getSendBuffer() { return _sendBuffer; }
friend class NetThread;
protected:
@ -1131,15 +1180,15 @@ public:
*/
void close();
/**
* TCP
*/
int sendTcp(const shared_ptr<SendContext> &data);
/**
* Udp
*/
int sendUdp(const shared_ptr<SendContext> &data);
// /**
// * 发送TCP
// */
// int sendTcp(const shared_ptr<SendContext> &data);
//
// /**
// * 发送Udp
// */
// int sendUdp(const shared_ptr<SendContext> &data);
/**
* buffer
@ -1248,10 +1297,25 @@ public:
*/
TC_NetWorkBuffer _sendBuffer;
/**
*
*/
int _iHeaderLen;
/**
*
*/
size_t _sendBufferSize = 0;
/**
*
*/
time_t _lastCheckTime = 0;
/**
* <, buffer大小>
*/
vector<pair<size_t, size_t>> _checkSend;
/**
*
*/
int _iHeaderLen;
/**
*
@ -1373,12 +1437,12 @@ public:
/**
*
*/
TC_SpinLock _mutex;
TC_ThreadMutex _mutex;
/**
*
*/
NetThread *_pEpollServer;
/**
*
*/
NetThread *_pEpollServer;
/**
*
@ -1545,11 +1609,11 @@ public:
*/
void setUdpRecvBufferSize(size_t nSize=DEFAULT_RECV_BUFFERSIZE);
/**
*
* @return size_t
*/
size_t getSendRspSize();
// /**
// * 发送队列的大小
// * @return size_t
// */
// size_t getSendRspSize();
protected:

View File

@ -25,6 +25,7 @@
#include "util/tc_socket.h"
#include "util/tc_epoller.h"
#include "util/tc_timeout_queue.h"
#include "util/tc_network_buffer.h"
#include <map>
#include <sstream>
#include <cassert>
@ -60,6 +61,7 @@ namespace tars
*/
/////////////////////////////////////////////////
class TC_NetWorkBuffer;
/**
* @brief http协议解析异常类
@ -577,13 +579,13 @@ public:
*/
void reset();
/**
* @brief .
*
* @param ppChar
* @return string
*/
static string getLine(const char** ppChar);
// /**
// * @brief 读取一行.
// *
// * @param ppChar 读取位置指针
// * @return string 读取的内容
// */
// static string getLine(const char** ppChar);
/**
* @brief .
@ -592,7 +594,7 @@ public:
* @param iBufLen
* @return string
*/
static string getLine(const char** ppChar, int iBufLen);
// static string getLine(const char** ppChar, int iBufLen);
/**
* @brief ().
@ -614,8 +616,45 @@ public:
* @param szBuffer
* @return const char*,
*/
static const char* parseHeader(const char* szBuffer, http_header_type &sHeader);
// static const char* parseHeader(const char* szBuffer, http_header_type &sHeader);
template<typename ForwardIterator1, typename ForwardIterator2>
static void parseHeader(const ForwardIterator1 &beginIt, const ForwardIterator2 &headerIt, http_header_type &sHeader)
{
sHeader.clear();
string sep = "\r\n";
string colon = ":";
bool first = true;
auto lineStartIt= beginIt;
while (true)
{
auto it = std::search(lineStartIt, headerIt, sep.c_str(), sep.c_str() + sep.size());
if(it == headerIt)
{
break;
}
if(!first)
{
auto itF = std::search(lineStartIt, it, colon.c_str(), colon.c_str() + colon.size());
if (itF != it)
{
string name;
name.resize(itF - lineStartIt);
std::copy(lineStartIt, itF, name.begin());
string value;
value.resize(it - (itF + 1));
std::copy(itF + 1, it, value.begin());
sHeader.insert(multimap<string, string>::value_type(TC_Common::trim(name, " "),
TC_Common::trim(value, " ")));
}
}
else
{
first = false;
}
lineStartIt = it + sep.size();
}
}
protected:
/**
@ -815,7 +854,7 @@ public:
* false:
* ,
*/
bool incrementDecode(string &sBuffer);
bool incrementDecode(TC_NetWorkBuffer &buff);
/**
* @brief http应答(string方式)
@ -954,14 +993,42 @@ public:
* @param szBuffer
* @return
*/
void parseResponseHeader(const char* szBuffer);
// void parseResponseHeader(const char* szBuffer, const char* header);
template<typename ForwardIterator1, typename ForwardIterator2>
void parseResponseHeader(const ForwardIterator1 &beginIt, const ForwardIterator2 &headerIt)
{
string line = "\r\n";
auto it = std::search(beginIt, headerIt, line.c_str(), line.c_str() + line.size());
assert(it != headerIt);
string sep = " ";
auto f1 = std::search(beginIt, headerIt, sep.c_str(), sep.c_str() + sep.size());
if(f1 == headerIt)
{
throw TC_HttpResponse_Exception("[TC_HttpResponse_Exception::parseResponeHeader] http response parse version format error : " + string(beginIt, it));
}
auto f2 = std::search(f1 + 1, headerIt, sep.c_str(), sep.c_str() + sep.size());
if(f1 == headerIt)
{
throw TC_HttpResponse_Exception("[TC_HttpResponse_Exception::parseResponeHeader] http response parse status format error : " + string(beginIt, it));
}
_headerLine = string(beginIt, it);
if(TC_Port::strncasecmp(_headerLine.c_str(), "HTTP/", 5) != 0)
{
throw TC_HttpResponse_Exception("[TC_HttpResponse_Exception::parseResponeHeader] http response version is not start with 'HTTP/' : " + _headerLine);
}
_version = string(beginIt, f1);
_status = TC_Common::strto<int>(string(f1 + 1, f2));
_about = TC_Common::trim(string(f2 + 1, it));
parseHeader(beginIt, headerIt, _headers);
}
protected:
/**
* ,
* @param sBuffer
*/
void addContent(const string &sBuffer);
void addContent(const char *buffer, size_t length);
protected:
@ -1034,6 +1101,13 @@ public:
static bool checkRequest(const char* sBuffer, size_t len);
/**
* http包是否收全
* @param buff
* @return
*/
static bool checkRequest(TC_NetWorkBuffer &buff);
/**
* @brief
*/
void reset();
@ -1095,6 +1169,12 @@ public:
void encode(vector<char> &buffer);
/**
* encode buffer to TC_NetWorkBuffer
* @param buff
*/
void encode(TC_NetWorkBuffer &buff);
/**
* @brief .
*
* @param sUrl :http://www.qq.com/query?a=b&c=d
@ -1265,7 +1345,7 @@ public:
/**
* @brief http请求.
*
*
* @return http请求串
*/
string getOriginRequest() const { return _httpURL.getURL(); }
@ -1281,7 +1361,7 @@ public:
* @brief http请求的url部分, ?Host,
* http://www.qq.com/abc?a=b#def, 则为:/abc
* @return http请求的url部分
* */
* */
string getRequestUrl() const { return _httpURL.getPath(); }
/**
@ -1293,15 +1373,15 @@ public:
/**
* @brief .
*
*
* @param szBuffer
* @return size_t
* @return
*/
size_t parseRequestHeader(const char* szBuffer);
void parseRequestHeader(const char* szBuffer, const char *header);
/**
* @brief .
*
*
* @param iRequestType
* @return
*/
@ -1311,7 +1391,7 @@ protected:
/**
* @brief http请求编码.
*
*
* @param sUrl http请求
* @param iRequestType
* @return void

View File

@ -20,6 +20,7 @@
#include <functional>
#include "util/tc_platform.h"
#include "util/tc_thread_pool.h"
#include "util/tc_network_buffer.h"
#include "util/tc_http.h"
#include "util/tc_autoptr.h"
#include "util/tc_socket.h"
@ -42,17 +43,6 @@ namespace tars
*/
/////////////////////////////////////////////////
///**
//* @brief 线程异常
//*/
//struct TC_HttpAsync_Exception : public TC_Exception
//{
// TC_HttpAsync_Exception(const string &buffer) : TC_Exception(buffer) {};
// TC_HttpAsync_Exception(const string &buffer, int err) : TC_Exception(buffer, err) {};
// ~TC_HttpAsync_Exception() throw() {};
//};
/**
* @brief 线.
*/
@ -128,6 +118,15 @@ protected:
*/
AsyncRequest(TC_HttpRequest &stHttpRequest, RequestCallbackPtr &callbackPtr, bool bUseProxy);
/**
* @brief .
*
* @param stHttpRequest
* @param callbackPtr
* @param addr
*/
AsyncRequest(TC_HttpRequest &stHttpRequest, RequestCallbackPtr &callbackPtr, const string &addr);
/**
* @brief
*/
@ -140,11 +139,6 @@ protected:
*/
int getfd() const { return _fd.getfd(); }
/**
* fd
*/
// int getNotifyfd() const { return _notify.getfd(); }
/**
* @brief .
*
@ -155,7 +149,7 @@ protected:
* @brief
* @return
*/
string getError(const char* sDefault) const;
string getError(const string &sDefault) const;
/**
* @brief
@ -266,8 +260,8 @@ protected:
string _sHost;
uint32_t _iPort;
uint32_t _iUniqId;
string _sReq;
string _sRsp;
TC_NetWorkBuffer _sendBuffer;
TC_NetWorkBuffer _recvBuffer;
RequestCallbackPtr _callbackPtr;
bool _bindAddrSet;
struct sockaddr _bindAddr;
@ -300,6 +294,15 @@ public:
*/
void doAsyncRequest(TC_HttpRequest &stHttpRequest, RequestCallbackPtr &callbackPtr, bool bUseProxy = false);
/**
* @brief .
*
* @param stHttpRequest
* @param httpCallbackPtr
* @param addr, , ip:port
*/
void doAsyncRequest(TC_HttpRequest &stHttpRequest, RequestCallbackPtr &callbackPtr, const string &addr);
/**
* @brief proxy地址
*
@ -373,11 +376,6 @@ protected:
*/
static void timeout(AsyncRequestPtr& ptr);
/**
* @brief
*/
// static void process(AsyncRequestPtr &p, int events);
/**
* @brief 线
* @param _threadId [description]

View File

@ -29,6 +29,15 @@
namespace tars
{
/**
* @brief
*/
struct TC_NetWorkBuffer_Exception : public TC_Exception
{
TC_NetWorkBuffer_Exception(const string &sBuffer) : TC_Exception(sBuffer){};
~TC_NetWorkBuffer_Exception() {};
};
class TC_NetWorkBuffer
{
public:
@ -109,7 +118,21 @@ public:
size_t pos() const { return _pos; }
void add(uint32_t ret)
char &at(size_t offset)
{
if(_pos + offset >= _buffer.size() )
throw TC_NetWorkBuffer_Exception("[TC_NetWorkBuffer::Buffer] at '" + TC_Common::tostr(offset) + "' offset overflow");
return _buffer[_pos + offset];
}
char at(size_t offset) const
{
if(_pos + offset >= _buffer.size() )
throw TC_NetWorkBuffer_Exception("[TC_NetWorkBuffer::Buffer] at '" + TC_Common::tostr(offset) + "' offset overflow");
return _buffer[_pos + offset];
}
void add(uint32_t ret)
{
_pos += ret;
assert(_pos <= _buffer.size());
@ -121,6 +144,212 @@ public:
};
typedef std::list<std::shared_ptr<Buffer>>::const_iterator buffer_list_iterator;
class buffer_iterator : public std::iterator<std::random_access_iterator_tag, char>
{
public:
buffer_iterator(const TC_NetWorkBuffer *buffer, size_t offset) : _buffer(buffer)
{
parseOffset(offset);
}
/**
* @brief copy
* @param it
*/
buffer_iterator(const buffer_iterator &it)
{
if(this != &it)
{
_buffer = it._buffer;
_pos = it._pos;
_offset = it._offset;
_it = it._it;
}
}
/**
*
* @param mcmi
*
* @return bool
*/
bool operator==(const buffer_iterator& it) const
{
if (_buffer == it._buffer && _it == it._it && _offset == it._offset)
{
return true;
}
return false;
}
/**
*
* @param mv
*
* @return bool
*/
bool operator!=(const buffer_iterator& it) const
{
if (_buffer == it._buffer && _it == it._it && _offset == it._offset)
{
return false;
}
return true;
}
char & operator *()
{
return (*_it)->buffer()[_pos];
}
char * operator ->()
{
return &(*_it)->buffer()[_pos];
}
char& operator *() const
{
return (*_it)->buffer()[_pos];
}
const char* operator ->() const
{
return &(*_it)->buffer()[_pos];
}
buffer_iterator operator +(size_t n) const
{
return buffer_iterator(_buffer, _offset + n);
}
buffer_iterator operator += (size_t n)
{
parseOffset(_offset + n);
return *this;
}
difference_type operator -(const buffer_iterator &n) const
{
return _offset - n._offset;
}
buffer_iterator operator -(size_t n) const
{
return buffer_iterator(_buffer, _offset - n);
}
bool operator < (const buffer_iterator n) const
{
return _offset<n._offset;
}
bool operator <= (const buffer_iterator n) const
{
return _offset<=n._offset;
}
bool operator > (const buffer_iterator n) const
{
return _offset>n._offset;
}
bool operator >= (const buffer_iterator n) const
{
return _offset>=n._offset;
}
//前置++
buffer_iterator& operator ++()
{
if(_offset >= _buffer->getBufferLength())
return *this;
assert(_it != _buffer->_bufferList.end());
if(_pos < (*_it)->length() - 1)
{
++_pos;
++_offset;
}
else
{
++_it;
++_offset;
_pos = 0;
}
return *this;
}
//后置++
buffer_iterator operator ++(int)
{
buffer_iterator it(*this);
if(_it != _buffer->_bufferList.end())
{
if (_pos < (*_it)->length() - 1)
{
++_pos;
++_offset;
}
else
{
++_it;
++_offset;
_pos = 0;
}
}
else
{
_offset = _buffer->getBufferLength();
_pos = 0;
}
return it;
}
protected:
void parseOffset(size_t offset)
{
_offset = offset;
bool flag = false;
for(auto it = _buffer->_bufferList.begin(); it != _buffer->_bufferList.end(); ++it)
{
if(offset >= (*it)->length())
{
offset -= (*it)->length();
}
else
{
_it = it;
_pos = offset;
flag = true;
break;
}
}
if(!flag)
{
_offset = _buffer->getBufferLength();
_pos = 0;
_it = _buffer->_bufferList.end();
}
}
friend class TC_NetWorkBuffer;
protected:
const TC_NetWorkBuffer* _buffer;
buffer_list_iterator _it;
size_t _offset = 0;
size_t _pos = 0;
};
/**
* connection来构造(, )
* @param buff
@ -169,6 +398,12 @@ public:
*/
void addBuffer(const std::vector<char>& buff);
/**
* buffer
* @param buff
*/
void addBuffer(const std::string& buff);
/**
* buffer
* @param buff
@ -177,6 +412,75 @@ public:
void addBuffer(const char* buff, size_t length);
/**
* begin
* @return
*/
buffer_iterator begin() const;
/**
* end
* @return
*/
buffer_iterator end() const;
/**
*
* @param str
* @param length
* @return
*/
buffer_iterator find(const char *str, size_t length);
/**
* (, )buffer
* T: string or vector<char>
* @param it
* @return
*/
template<typename T>
T iteratorToIterator(const buffer_iterator &sit, const buffer_iterator &eit)
{
T buff;
if(sit == end() || sit == eit)
{
return buff;
}
if(sit > eit)
{
throw TC_NetWorkBuffer_Exception("[TC_NetWorkBuffer::iteratorToIterator] sit > eit error");
}
buff.resize(eit - sit);
std::copy(sit, eit, buff.begin());
return buff;
}
/**
* buff, buffer
* T: string or vector<char>
* @param sep
* @param length
* @return
*/
template<typename T>
T getPrefixBuffer(const char *sep, size_t length)
{
T buff;
auto sit = std::search(begin(), end(), sep, sep + length);
if(sit == end())
{
return buff;
}
return iteratorToIterator<T>(begin(), sit);
}
/**
* buffer
*/
void clearBuffers();
@ -192,6 +496,12 @@ public:
*/
size_t getBufferLength() const;
/**
* buffer list length
* @return
*/
size_t size() const { return _bufferList.size(); }
/**
* buffer的指针,
* @return
@ -230,6 +540,34 @@ public:
*/
bool getHeader(size_t len, std::vector<char> &buffer) const;
/**
* len字节的buffer(len个字节被分割到多个buffer的情况)(: )
* getHeader<string>(10), getHeader<vector<char>>(10);
* @param len
* @return TC_NetWorkBuffer_Exception
*/
template<typename T>
T getHeader(size_t len) const
{
if(getBufferLength() < len)
{
throw TC_NetWorkBuffer_Exception("[TC_NetWorkBuffer::getHeader] no enough buff(" + TC_Common::tostr(getBufferLength()) + ") to get(" + TC_Common::tostr(len) + ")");
}
T buffer;
if(len == 0)
{
return buffer;
}
buffer.resize(len);
getBuffers(&buffer[0], len);
return buffer;
}
/**
* len个字节
* @param len

View File

@ -426,7 +426,7 @@ void TC_EpollServer::BindAdapter::notifyHandle(uint32_t handleIndex)
void TC_EpollServer::BindAdapter::insertRecvQueue(const shared_ptr<RecvContext> &recv)
{
_iBufferSize++;
_iRecvBufferSize++;
size_t idx = 0;
@ -451,14 +451,19 @@ bool TC_EpollServer::BindAdapter::waitForRecvQueue(uint32_t handleIndex, shared_
return bRet;
}
--_iBufferSize;
--_iRecvBufferSize;
return bRet;
}
size_t TC_EpollServer::BindAdapter::getRecvBufferSize() const
{
return _iBufferSize;
return _iRecvBufferSize;
}
size_t TC_EpollServer::BindAdapter::getSendBufferSize() const
{
return _iSendBufferSize;
}
TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::BindAdapter::echo_protocol(TC_NetWorkBuffer &r, vector<char> &o)
@ -504,14 +509,14 @@ void TC_EpollServer::BindAdapter::setQueueCapacity(int n)
int TC_EpollServer::BindAdapter::isOverloadorDiscard()
{
int iRecvBufferSize = _iBufferSize;
int iRecvBufferSize = _iRecvBufferSize;
if(iRecvBufferSize > (int)(_iQueueCapacity / 5.*4) && (iRecvBufferSize < _iQueueCapacity) && (_iQueueCapacity > 0)) //overload
{
//超过队列4/5开始认为过载
return -1;
}
else if(iRecvBufferSize > (int)(_iQueueCapacity) && _iQueueCapacity > 0)//队列满需要丢弃接受的数据包
else if(iRecvBufferSize > (int)(_iQueueCapacity) && _iQueueCapacity > 0 ) //队列满需要丢弃接受的数据包
{
return -2;
}
@ -748,6 +753,8 @@ void TC_EpollServer::Connection::close()
if (isTcp() && _sock.isValid())
{
_pBindAdapter->decreaseSendBufferSize(_sendBuffer.size());
_sock.close();
}
}
@ -823,7 +830,7 @@ int TC_EpollServer::Connection::parseProtocol(TC_NetWorkBuffer &rbuf)
recv->buffer().swap(ro);
if (_pBindAdapter->_authWrapper && _pBindAdapter->_authWrapper(this, recv))
if (_pBindAdapter->getEndpoint().isTcp() && _pBindAdapter->_authWrapper && _pBindAdapter->_authWrapper(this, recv))
continue;
//收到完整的包才算
@ -1003,13 +1010,23 @@ int TC_EpollServer::Connection::recv()
int TC_EpollServer::Connection::sendBuffer()
{
size_t nowSendBufferSize = 0;
size_t nowLeftBufferSize = _sendBuffer.getBufferLength();
while(!_sendBuffer.empty())
{
pair<const char*, size_t> data = _sendBuffer.getBufferPointer();
assert(data.first != NULL);
int iBytesSent = 0;
int iBytesSent = _sock.send((const void *)data.first, data.second);
if(this->isTcp())
{
iBytesSent = _sock.send((const void *) data.first, data.second);
}
else
{
iBytesSent = _sock.sendto((const void *) data.first, data.second, _ip, _port, 0);
}
if (iBytesSent < 0)
{
@ -1026,7 +1043,24 @@ int TC_EpollServer::Connection::sendBuffer()
if(iBytesSent > 0)
{
_sendBuffer.moveHeader(iBytesSent);
nowSendBufferSize += iBytesSent;
if(isTcp())
{
_sendBuffer.moveHeader(iBytesSent);
if (iBytesSent == data.second)
{
_pBindAdapter->decreaseSendBufferSize();
}
}
else
{
_sendBuffer.moveHeader(data.second);
_pBindAdapter->decreaseSendBufferSize();
}
}
//发送的数据小于需要发送的,break, 内核会再通知你的
@ -1043,52 +1077,171 @@ int TC_EpollServer::Connection::sendBuffer()
return -2;
}
// 当出现队列积压的前提下, 且积压超过一定大小
// 每5秒检查一下积压情况, 连续12次(一分钟), 都是积压
// 且每个检查点, 积压长度都增加或者连续3次发送buffer字节小于1k, 就关闭连接, 主要避免极端情况
size_t iBackPacketBuffLimit = _pBindAdapter->getBackPacketBuffLimit();
if(_sendBuffer.getBufferLength() > iBackPacketBuffLimit)
{
if(_sendBufferSize == 0)
{
//开始积压
_lastCheckTime = TNOW;
}
_sendBufferSize += nowSendBufferSize;
if (TNOW - _lastCheckTime >= 5)
{
//如果持续有积压, 则每5秒检查一次
_lastCheckTime = TNOW;
_checkSend.push_back(make_pair(_sendBufferSize, nowLeftBufferSize));
_sendBufferSize = 0;
size_t iBackPacketBuffMin = _pBindAdapter->getBackPacketBuffMin();
//连续3个5秒, 发送速度都极慢, 每5秒发送 < iBackPacketBuffMin, 认为连接有问题, 关闭之
int left = 3;
if (_checkSend.size() >= left)
{
bool slow = true;
for (int i = (int)_checkSend.size() - 1; i >= (int)(_checkSend.size() - left); i--)
{
//发送速度
if (_checkSend[i].first > iBackPacketBuffMin)
{
slow = false;
continue;
}
}
if (slow)
{
ostringstream os;
os << "send [" << _ip << ":" << _port << "] buffer queue send to slow, send size:";
for (int i = (int)_checkSend.size() - 1; i >= (int)(_checkSend.size() - left); i--)
{
os << ", " << _checkSend[i].first;
}
_pBindAdapter->getEpollServer()->error(os.str());
_sendBuffer.clearBuffers();
return -5;
}
}
//连续12个5秒, 都有积压现象, 检查
if (_checkSend.size() >= 12)
{
bool accumulate = true;
for (size_t i = _checkSend.size() - 1; i >= 1; i--) {
//发送buffer 持续增加
if (_checkSend[i].second < _checkSend[i - 1].second) {
accumulate = false;
break;
}
}
//持续积压
if (accumulate)
{
ostringstream os;
os << "send [" << _ip << ":" << _port << "] buffer queue continues to accumulate data, queue size:";
for (size_t i = 0; i < _checkSend.size(); i++)
{
os << ", " << _checkSend[i].second;
}
_pBindAdapter->getEpollServer()->error(os.str());
_sendBuffer.clearBuffers();
return -4;
}
_checkSend.erase(_checkSend.begin());
}
}
}
else
{
//无积压
_sendBufferSize = 0;
_lastCheckTime = TNOW;
_checkSend.clear();
}
return 0;
}
//
//int TC_EpollServer::Connection::sendTcp(const shared_ptr<SendContext> &sc)
//{
//#if TAF_SSL
// if (getBindAdapter()->getEndpoint().isSSL())
// {
// assert(_openssl->isHandshaked());
//
// int ret = _openssl->write(sc->buffer()->buffer(), sc->buffer()->length(), _sendBuffer);
// if (ret != 0) {
// _pBindAdapter->getEpollServer()->error("[TC_EpollServer::Connection] sendTcp [" + _ip + ":" + TC_Common::tostr(_port) + "] error:" + _openssl->getErrMsg());
//
// return -1; // should not happen
// }
// }
// else
//#endif
// {
// _sendBuffer.addBuffer(sc->buffer());
// }
//
// return sendBuffer();
//}
//
//int TC_EpollServer::Connection::sendUdp(const shared_ptr<SendContext> &sc)
//{
// _sendBuffer.addBuffer(sc->buffer());
//
// return sendBuffer();
////
//// //udp的直接发送即可
//// int iRet = _sock.sendto((const void *) sc->buffer()->buffer(), sc->buffer()->length(), sc->ip(), sc->port(), 0);
//// if (iRet < 0)
//// {
//// _pBindAdapter->getEpollServer()->error("[TC_EpollServer::Connection] send [" + _ip + ":" + TC_Common::tostr(_port) + "] error");
//// return -1;
//// }
////
//// return 0;
//}
int TC_EpollServer::Connection::sendTcp(const shared_ptr<SendContext> &sc)
int TC_EpollServer::Connection::send(const shared_ptr<SendContext> &sc)
{
if(!sc->buffer()->empty())
{
#if TARS_SSL
if (getBindAdapter()->getEndpoint().isSSL())
{
assert(_openssl->isHandshaked());
assert(sc);
int ret = _openssl->write(sc->buffer()->buffer(), sc->buffer()->length(), _sendBuffer);
if (ret != 0) {
_pBindAdapter->getEpollServer()->error("[TC_EpollServer::Connection] sendTcp [" + _ip + ":" + TC_Common::tostr(_port) + "] error:" + _openssl->getErrMsg());
_pBindAdapter->increaseSendBufferSize();
#if TAF_SSL
if (getBindAdapter()->getEndpoint().isSSL())
{
assert(_openssl->isHandshaked());
int ret = _openssl->write(sc->buffer()->buffer(), sc->buffer()->length(), _sendBuffer);
if (ret != 0) {
_pBindAdapter->getEpollServer()->error("[TC_EpollServer::Connection] send [" + _ip + ":" + TC_Common::tostr(_port) + "] error:" + _openssl->getErrMsg());
return -1; // should not happen
}
}
else
#endif
{
_sendBuffer.addBuffer(sc->buffer());
}
{
_sendBuffer.addBuffer(sc->buffer());
}
return sendBuffer();
}
int TC_EpollServer::Connection::sendUdp(const shared_ptr<SendContext> &sc)
{
//udp的直接发送即可
int iRet = _sock.sendto((const void *) sc->buffer()->buffer(), sc->buffer()->length(), sc->ip(), sc->port(), 0);
if (iRet < 0)
{
_pBindAdapter->getEpollServer()->error("[TC_EpollServer::Connection] send [" + _ip + ":" + TC_Common::tostr(_port) + "] error");
return -1;
}
return 0;
}
int TC_EpollServer::Connection::send(const shared_ptr<SendContext> &sc)
{
return isTcp() ? sendTcp(sc) : sendUdp(sc);
return sendBuffer();
}
bool TC_EpollServer::Connection::setRecvBuffer(size_t nSize)
@ -1157,7 +1310,7 @@ void TC_EpollServer::ConnectionList::init(uint32_t size, uint32_t iIndex)
uint32_t TC_EpollServer::ConnectionList::getUniqId()
{
TC_LockT<TC_SpinLock> lock(_mutex);
TC_LockT<TC_ThreadMutex> lock(_mutex);
uint32_t uid = _free.front();
@ -1182,7 +1335,7 @@ TC_EpollServer::Connection* TC_EpollServer::ConnectionList::get(uint32_t uid)
void TC_EpollServer::ConnectionList::add(Connection *cPtr, time_t iTimeOutStamp)
{
TC_LockT<TC_SpinLock> lock(_mutex);
TC_LockT<TC_ThreadMutex> lock(_mutex);
uint32_t muid = cPtr->getId();
uint32_t magi = muid & (0xFFFFFFFF << 22);
@ -1195,7 +1348,7 @@ void TC_EpollServer::ConnectionList::add(Connection *cPtr, time_t iTimeOutStamp)
void TC_EpollServer::ConnectionList::refresh(uint32_t uid, time_t iTimeOutStamp)
{
TC_LockT<TC_SpinLock> lock(_mutex);
TC_LockT<TC_ThreadMutex> lock(_mutex);
uint32_t magi = uid & (0xFFFFFFFF << 22);
uid = uid & (0x7FFFFFFF >> 9);
@ -1224,7 +1377,7 @@ void TC_EpollServer::ConnectionList::checkTimeout(time_t iCurTime)
_lastTimeoutTime = iCurTime;
TC_LockT<TC_SpinLock> lock(_mutex);
TC_LockT<TC_ThreadMutex> lock(_mutex);
multimap<time_t, uint32_t>::iterator it = _tl.begin();
@ -1295,7 +1448,7 @@ vector<TC_EpollServer::ConnStatus> TC_EpollServer::ConnectionList::getConnStatus
{
vector<TC_EpollServer::ConnStatus> v;
TC_LockT<TC_SpinLock> lock(_mutex);
TC_LockT<TC_ThreadMutex> lock(_mutex);
for(size_t i = 1; i <= _total; i++)
{
@ -1309,6 +1462,8 @@ vector<TC_EpollServer::ConnStatus> TC_EpollServer::ConnectionList::getConnStatus
cs.port = _vConn[i].first->getPort();
cs.timeout = _vConn[i].first->getTimeout();
cs.uid = _vConn[i].first->getId();
cs.recvBufferSize = _vConn[i].first->getRecvBuffer().getBufferLength();
cs.sendBufferSize = _vConn[i].first->getSendBuffer().getBufferLength();
v.push_back(cs);
}
@ -1319,7 +1474,7 @@ vector<TC_EpollServer::ConnStatus> TC_EpollServer::ConnectionList::getConnStatus
void TC_EpollServer::ConnectionList::del(uint32_t uid)
{
TC_LockT<TC_SpinLock> lock(_mutex);
TC_LockT<TC_ThreadMutex> lock(_mutex);
uint32_t magi = uid & (0xFFFFFFFF << 22);
uid = uid & (0x7FFFFFFF >> 9);
@ -1346,7 +1501,7 @@ void TC_EpollServer::ConnectionList::_del(uint32_t uid)
size_t TC_EpollServer::ConnectionList::size()
{
TC_LockT<TC_SpinLock> lock(_mutex);
TC_LockT<TC_ThreadMutex> lock(_mutex);
return _total - _free_size;
}
@ -1751,10 +1906,10 @@ void TC_EpollServer::NetThread::run()
}
}
size_t TC_EpollServer::NetThread::getSendRspSize()
{
return _sbuffer.size();
}
//size_t TC_EpollServer::NetThread::getSendRspSize()
//{
// return _sbuffer.size();
//}
//////////////////////////////////////////////////////////////
TC_EpollServer::TC_EpollServer(unsigned int iNetThreadNum)
: _netThreadNum(iNetThreadNum)

File diff suppressed because it is too large Load Diff

View File

@ -22,26 +22,56 @@ namespace tars
{
TC_HttpAsync::AsyncRequest::AsyncRequest(TC_HttpRequest &stHttpRequest, TC_HttpAsync::RequestCallbackPtr &callbackPtr, bool bUseProxy)
: _pHttpAsync(NULL), _iUniqId(0), _callbackPtr(callbackPtr), _bUseProxy(bUseProxy), _isConnected(false)
: _pHttpAsync(NULL)
, _iUniqId(0)
, _sendBuffer(this)
, _recvBuffer(this)
, _callbackPtr(callbackPtr)
, _bUseProxy(bUseProxy)
, _isConnected(false)
{
memset(&_bindAddr, 0, sizeof(struct sockaddr));
_bindAddrSet = false;
_sReq = stHttpRequest.encode();
vector<char> buff;
stHttpRequest.encode(buff);
_sendBuffer.addBuffer(std::move(buff));
stHttpRequest.getHostPort(_sHost, _iPort);
}
// _notify.createSocket();
TC_HttpAsync::AsyncRequest::AsyncRequest(TC_HttpRequest &stHttpRequest, RequestCallbackPtr &callbackPtr, const string &addr)
: _pHttpAsync(NULL)
, _iUniqId(0)
, _sendBuffer(this)
, _recvBuffer(this)
, _callbackPtr(callbackPtr)
, _bUseProxy(false)
, _isConnected(false)
{
memset(&_bindAddr, 0, sizeof(struct sockaddr));
_bindAddrSet = false;
stHttpRequest.encode(_sendBuffer);
vector<string> v = TC_Common::sepstr<string>(addr, ":");
if (v.size() < 2)
{
stHttpRequest.getHostPort(_sHost, _iPort);
}
else
{
_sHost = v[0];
_iPort = TC_Common::strto<uint32_t>(v[1]);
}
}
TC_HttpAsync::AsyncRequest::~AsyncRequest()
{
doClose();
// if (_pHttpAsync)
// _pHttpAsync->delConnection(_notify.getfd());
// _notify.close();
}
void TC_HttpAsync::AsyncRequest::doClose()
@ -154,41 +184,16 @@ void TC_HttpAsync::AsyncRequest::timeout()
}
string TC_HttpAsync::AsyncRequest::getError(const char* sDefault) const
string TC_HttpAsync::AsyncRequest::getError(const string &sDefault) const
{
int ret = TC_Exception::getSystemCode();
if(ret!= 0)
{
return TC_Exception::parseError(ret);
return sDefault + ", ret:" + TC_Common::tostr(ret) + ", msg:" + TC_Exception::parseError(ret);
}
return sDefault;
return sDefault + ", ret:" + TC_Common::tostr(ret);
}
// string TC_HttpAsync::AsyncRequest::getError(const string &sDefault) const
// {
// string err;
// if (_fd.isValid())
// {
// int error;
// SOCKET_LEN_TYPE len = sizeof(error);
// _fd.getSockOpt(SO_ERROR, (void*)&error, len, SOL_SOCKET);
// if (error != 0)
// {
// err = strerror(error);
// }
// }
// if (err.empty() && errno != 0)
// {
// err = strerror(errno);
// }
// if (err.empty())
// err = sDefault;
// return err;
// }
void TC_HttpAsync::AsyncRequest::doException(RequestCallback::FAILED_CODE ret, const string &e)
{
@ -210,14 +215,15 @@ void TC_HttpAsync::AsyncRequest::doRequest()
{
ret = -1;
if (!_sReq.empty())
if (!_sendBuffer.empty())
{
if ((ret = this->send(_sReq.c_str(), _sReq.length(), 0)) > 0)
auto data = _sendBuffer.getBufferPointer();
if ((ret = this->send(data.first, data.second, 0)) > 0)
{
_sReq = _sReq.substr(ret);
_sendBuffer.moveHeader(ret);
}
}
} while (ret > 0);
} while (ret > 0 && !_sendBuffer.empty());
//网络异常
if (ret == -2)
@ -239,19 +245,19 @@ void TC_HttpAsync::AsyncRequest::doReceive()
{
if ((recv = this->recv(buff, sizeof(buff), 0)) > 0)
{
_sRsp.append(buff, recv);
_recvBuffer.addBuffer(buff, recv);
}
}
while (recv > 0);
if (recv == -2)
{
doException(RequestCallback::Failed_Net, getError("recv error."));
doException(RequestCallback::Failed_Net, getError("recv error"));
}
else
{
//增量decode
bool ret = _stHttpResp.incrementDecode(_sRsp);
bool ret = _stHttpResp.incrementDecode(_recvBuffer);
//有头部数据了
if (_callbackPtr && !_stHttpResp.getHeaders().empty())
@ -259,7 +265,7 @@ void TC_HttpAsync::AsyncRequest::doReceive()
bool bContinue = _callbackPtr->onContinue(_stHttpResp);
if (!bContinue)
{
doException(RequestCallback::Failed_Interrupt, getError("receive interrupt."));
doException(RequestCallback::Failed_Interrupt, getError("receive interrupt"));
return;
}
}
@ -282,8 +288,6 @@ void TC_HttpAsync::AsyncRequest::doReceive()
doClose();
try { if (_callbackPtr) _callbackPtr->onSucc(_stHttpResp); } catch (...) { }
// doException(RequestCallback::Failed_Close, getError("close by server."));
}
}
}
@ -291,15 +295,19 @@ void TC_HttpAsync::AsyncRequest::doReceive()
void TC_HttpAsync::AsyncRequest::processNet(const epoll_event &ev)
{
if (TC_Epoller::readEvent(ev))//events & EPOLLIN)
if (TC_Epoller::errorEvent(ev))
{
doException(RequestCallback::Failed_Net, getError("net error"));
return;
}
if (TC_Epoller::readEvent(ev))
{
// cout << "readEvent" << endl;
doReceive();
}
if (TC_Epoller::writeEvent(ev))
{
// cout << "writeEvent" << endl;
doRequest();
}
}
@ -331,7 +339,7 @@ TC_HttpAsync::TC_HttpAsync() : _terminate(false)
_data = new http_queue_type(10000);
_epoller.create(1024);
_epoller.create(20480);
// _notify.createSocket();
_notify.init(&_epoller);
@ -348,7 +356,6 @@ TC_HttpAsync::~TC_HttpAsync()
delete _data;
// _notify.close();
_notify.release();
_epoller.close();
@ -359,9 +366,6 @@ void TC_HttpAsync::start()
_tpool.init(1);
_tpool.start();
// TC_Functor<void> cmd(this, &TC_HttpAsync::run);
// TC_Functor<void>::wrapper_type wt(cmd);
_tpool.exec(std::bind(&TC_HttpAsync::run, this));
}
@ -435,7 +439,30 @@ void TC_HttpAsync::doAsyncRequest(TC_HttpRequest &stHttpRequest, RequestCallback
_events.push_back(H64(_notify.notifyFd()) | uniqId);
}
_notify.notify();
// addConnection(req->getNotifyfd(), uniqId, EPOLLIN);
}
void TC_HttpAsync::doAsyncRequest(TC_HttpRequest &stHttpRequest, RequestCallbackPtr &callbackPtr, const string &addr)
{
AsyncRequest * req = new AsyncRequest(stHttpRequest, callbackPtr, addr);
if (_bindAddrSet)
{
req->setBindAddr(&_bindAddr);
}
uint32_t uniqId = _data->generateId();
req->setUniqId(uniqId);
req->setHttpAsync(this);
_data->push(req, uniqId);
{
std::lock_guard<std::mutex> lock(_mutex);
_events.push_back(H64(_notify.notifyFd()) | uniqId);
}
_notify.notify();
}
void TC_HttpAsync::addConnection(int fd, uint32_t uniqId, uint32_t events)
@ -541,7 +568,6 @@ void TC_HttpAsync::run()
if ((int)fd == _notify.notifyFd())
{
// cout << "notify" << endl;
deque<uint64_t> events;
{
@ -554,7 +580,6 @@ void TC_HttpAsync::run()
{
uint32_t uniqId = (uint32_t)data;
// cout << "socket uniqId:" << uniqId << endl;
AsyncRequestPtr ptr = _data->getAndRefresh(uniqId);
if (!ptr)
continue;
@ -568,13 +593,10 @@ void TC_HttpAsync::run()
uint32_t uniqId = TC_Epoller::getU32(ev, false);
// cout << "http socket uniqId:" << uniqId << endl;
AsyncRequestPtr ptr = _data->getAndRefresh(uniqId);
if (!ptr)
continue;
// assert(fd == ptr->getfd());
ptr->processNet(ev);
}
}

View File

@ -12,20 +12,50 @@ namespace tars
void TC_NetWorkBuffer::addBuffer(const shared_ptr<TC_NetWorkBuffer::Buffer> & buff)
{
if(buff->empty()) return;
_bufferList.push_back(buff);
_length += buff->length();
}
void TC_NetWorkBuffer::addBuffer(const vector<char>& buff)
{
if(buff.empty()) return;
_bufferList.push_back(std::make_shared<Buffer>(buff));
_length += buff.size();
}
void TC_NetWorkBuffer::addBuffer(const std::string& buff)
{
if(buff.empty()) return;
_bufferList.push_back(std::make_shared<Buffer>(buff.c_str(), buff.size()));
_length += buff.size();
}
void TC_NetWorkBuffer::addBuffer(const char* buff, size_t length)
{
addBuffer(vector<char>(buff, buff + length));
if(buff == NULL || length == 0) return;
addBuffer(vector<char>(buff, buff + length));
}
TC_NetWorkBuffer::buffer_iterator TC_NetWorkBuffer::begin() const
{
return buffer_iterator(this, 0);
}
TC_NetWorkBuffer::buffer_iterator TC_NetWorkBuffer::end() const
{
return buffer_iterator(this, this->getBufferLength());
}
TC_NetWorkBuffer::buffer_iterator TC_NetWorkBuffer::find(const char *str, size_t length)
{
return std::search(begin(), end(), str, str + length);
}
void TC_NetWorkBuffer::clearBuffers()
@ -230,16 +260,7 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::checkHttp()
{
try
{
mergeBuffers();
pair<const char*, size_t> buffer = getBufferPointer();
if(buffer.first == NULL || buffer.second == 0)
{
return PACKET_LESS;
}
bool b = TC_HttpRequest::checkRequest(buffer.first, buffer.second);
bool b = TC_HttpRequest::checkRequest(*this);
return b ? PACKET_FULL : PACKET_LESS;
}