diff --git a/examples/HttpDemo/HttpClient/main.cpp b/examples/HttpDemo/HttpClient/main.cpp index cb29274..6d408bd 100644 --- a/examples/HttpDemo/HttpClient/main.cpp +++ b/examples/HttpDemo/HttpClient/main.cpp @@ -58,9 +58,6 @@ void httpCall(int excut_num) stHttpReq.setCacheControl("no-cache"); stHttpReq.setGetRequest(sServer1); - // TC_TCPClient tcpClient1; - // tcpClient1.init("127.0.0.1", 8081, 3000); - int iRet = 0; for (int i = 0; igetNowMs() - _iTime <<"(ms)"<getNowMs() - _iTime <<"(ms)"<& requestHeaders, int expCode) { + cout << "onHttpResponseException expCode:" << expCode << endl; + callback_count++; return 0; @@ -121,8 +118,6 @@ void syncRpc(int c) int64_t t = TC_Common::now2us(); std::map header; - // header[":authority"] = "domain.com"; - // header[":scheme"] = "http"; std::map rheader; //发起远程调用 @@ -133,7 +128,7 @@ void syncRpc(int c) try { - param.servantPrx->http_call("GET", "http://127.0.0.1:8081", header, "helloworld", rheader, rbody); + param.servantPrx->http_call("GET", "/", header, "helloworld", rheader, rbody); } catch(exception& e) { @@ -199,6 +194,13 @@ void asyncRpc2(int c) { cout << "exception:" << e.what() << endl; } + + TC_Common::msleep(10); + + // while(i-callback_count > 0 ) + // { + // TC_Common::msleep(100); + // } } int64_t cost = TC_Common::now2us() - t; @@ -239,6 +241,9 @@ int main(int argc, char *argv[]) param.servantPrx->tars_connect_timeout(5000); param.servantPrx->tars_async_timeout(60*1000); + param.servant2Prx->tars_connect_timeout(5000); + param.servant2Prx->tars_async_timeout(60*1000); + ProxyProtocol proto; proto.requestFunc = ProxyProtocol::http1Request; proto.responseFunc = ProxyProtocol::http1Response; diff --git a/examples/HttpDemo/HttpServer/Http2Imp.cpp b/examples/HttpDemo/HttpServer/Http2Imp.cpp index e7ec792..e282f78 100644 --- a/examples/HttpDemo/HttpServer/Http2Imp.cpp +++ b/examples/HttpDemo/HttpServer/Http2Imp.cpp @@ -46,16 +46,18 @@ void doRequestFunc(const TC_Http2Server::Req_Type reqtype, const string &requri, int Http2Imp::doRequest(TarsCurrentPtr current, vector &buffer) { + TC_Http2Server* session = getHttp2(current->getUId()); -cout << "doRequest" << endl; static bool flag = true; if(flag) { //method 1: vector vtReqid; TC_Http2Server::doRequest(current->getRequestBuffer(), vtReqid); - + + // cout << "doRequest size:" << vtReqid.size() << endl; + TC_Http2Server::Http2Response rsp; rsp.status = 200; rsp.about = "OK"; @@ -72,7 +74,7 @@ cout << "doRequest" << endl; session->doRequest(current->getRequestBuffer(), doRequestFunc, buffer); } - flag = !flag; + // flag = !flag; return 0; } diff --git a/examples/HttpDemo/HttpServer/HttpImp.cpp b/examples/HttpDemo/HttpServer/HttpImp.cpp index 3bcf632..f4f3922 100644 --- a/examples/HttpDemo/HttpServer/HttpImp.cpp +++ b/examples/HttpDemo/HttpServer/HttpImp.cpp @@ -39,6 +39,7 @@ int HttpImp::doRequest(TarsCurrentPtr current, vector &buffer) vector v = current->getRequestBuffer(); string sBuf; sBuf.assign(&v[0],v.size()); + request.decode(sBuf); TC_HttpResponse rsp; string s="hello"; diff --git a/examples/HttpDemo/HttpServer/HttpServer.cpp b/examples/HttpDemo/HttpServer/HttpServer.cpp index 59eb19e..8dd0ee2 100644 --- a/examples/HttpDemo/HttpServer/HttpServer.cpp +++ b/examples/HttpDemo/HttpServer/HttpServer.cpp @@ -25,7 +25,6 @@ HttpServer g_app; TC_NetWorkBuffer::PACKET_TYPE parseHttp2(TC_NetWorkBuffer&in, vector &out) { - cout << "parseHttp2" << endl; TC_Http2Server*session = (TC_Http2Server*)(in.getContextData()); if(session == NULL) diff --git a/examples/scripts/run-http.bat b/examples/scripts/run-http.bat index 2f2b75c..36a74b7 100644 --- a/examples/scripts/run-http.bat +++ b/examples/scripts/run-http.bat @@ -12,7 +12,9 @@ sleep 3 echo "client: .\\bin\\Release\\HttpClient.exe" -.\\bin\\Release\\HttpClient.exe 2 10000 +.\\bin\\Release\\HttpClient.exe --count=10000 --thread=2 --call=basehttp +.\\bin\\Release\\HttpClient.exe --count=10000 --thread=2 --call=synchttp2 +.\\bin\\Release\\HttpClient.exe --count=10000 --thread=2 --call=asynchttp2 sleep 1 diff --git a/examples/scripts/run-http.sh b/examples/scripts/run-http.sh index 46b0efe..cae3bf0 100644 --- a/examples/scripts/run-http.sh +++ b/examples/scripts/run-http.sh @@ -14,8 +14,8 @@ sleep 1 echo "client: ./bin/HttpClient" ./bin/HttpClient --count=10000 --thread=2 --call=basehttp -./bin/HttpClient --count=10000 --thread=2 --call=synchttp -./bin/HttpClient --count=10000 --thread=2 --call=asynchttp +./bin/HttpClient --count=10000 --thread=2 --call=synchttp2 +./bin/HttpClient --count=10000 --thread=2 --call=asynchttp2 sleep 1 diff --git a/servant/libservant/AdapterProxy.cpp b/servant/libservant/AdapterProxy.cpp index 34b7251..ceed7b7 100755 --- a/servant/libservant/AdapterProxy.cpp +++ b/servant/libservant/AdapterProxy.cpp @@ -162,7 +162,7 @@ int AdapterProxy::invoke(ReqMessage * msg) startTrack(msg); #endif - msg->sReqData->addBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get())); + msg->sReqData->setBuffer(_objectProxy->getProxyProtocol().requestFunc(msg->request, _trans.get())); //交给连接发送数据,连接连上,buffer不为空,直接发送数据成功 if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData) != Transceiver::eRetError) diff --git a/servant/libservant/AppProtocol.cpp b/servant/libservant/AppProtocol.cpp index faa5056..1179ecf 100644 --- a/servant/libservant/AppProtocol.cpp +++ b/servant/libservant/AppProtocol.cpp @@ -86,6 +86,8 @@ static ssize_t reqbody_read_callback(nghttp2_session *session, int32_t stream_id void *user_data) { std::vector* body = (std::vector* )source->ptr; + // cout << "reqbody_read_callback:" << body->size() << endl; + if (body->empty()) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; @@ -165,7 +167,6 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http1Response(TC_NetWorkBuffer &in, // ENCODE function, called by network thread vector ProxyProtocol::http2Request(RequestPacket& request, Transceiver *trans) { - TC_Http2Client* session = trans->getHttp2Client(); std::vector nva; @@ -195,9 +196,10 @@ vector ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr data.read_callback = reqbody_read_callback; } +// cout << "pData:" << pData << ", " << data.read_callback << endl; int32_t sid = nghttp2_submit_request(session->session(), NULL, - &nva[0], + nva.data(), nva.size(), pData, NULL); @@ -208,28 +210,49 @@ vector ProxyProtocol::http2Request(RequestPacket& request, Transceiver *tr } request.iRequestId = sid; - nghttp2_session_send(session->session()); + int rv = nghttp2_session_send(session->session()); + if (rv != 0) { + TLOGERROR("[TARS]http2Request::Fatal error: nghttp2_session_send return: " << nghttp2_strerror(rv) << endl); + return vector(); + } + // cout << "nghttp2_session_send, id:" << request.iRequestId << ", buff size:" << session->sendBuffer().size() << endl; + + // if(session->sendBuffer().empty()) + // { + // exit(0); + // } // get data to send vector out; out.swap(session->sendBuffer()); - - cout << "http2Request out buffer size:" << out.size() << endl; - + return out; } TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in, ResponsePacket& rsp) { + // cout << "http2Response" << endl; + TC_Http2Client* session = ((Transceiver*)(in.getConnection()))->getHttp2Client(); auto it = session->doneResponses().begin(); if(it == session->doneResponses().end() && !in.empty()) { - vector buffer = in.getBuffers(); + if(in.empty()) + { + return TC_NetWorkBuffer::PACKET_LESS; + } + //merge to one buffer + in.mergeBuffers(); - int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.data(), buffer.size()); + pair buffer = in.getBufferPointer(); + + int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.first, buffer.second); + + // vector buffer = in.getBuffers(); + + // int readlen = nghttp2_session_mem_recv(session->session(), (const uint8_t*)buffer.data(), buffer.size()); if (readlen < 0) { diff --git a/util/include/util/tc_epoll_server.h b/util/include/util/tc_epoll_server.h index 72d4788..3a08659 100644 --- a/util/include/util/tc_epoll_server.h +++ b/util/include/util/tc_epoll_server.h @@ -982,13 +982,6 @@ public: } } - // /** - // * 初始化处理线程,线程将会启动 - // */ - // template void setHandle() - // { - // _pEpollServer->setHandleGroup(_handleGroupName, _iHandleNum, this); - // } /** * 获取第几个句柄 * @param index @@ -1121,12 +1114,6 @@ public: * adapter的名字 */ string _name; - - /** - * handle分组名称 - */ - string _handleGroupName; - /** * 监听fd */ @@ -1517,10 +1504,6 @@ public: */ bool _bClose; - /** - * 临时队列的最大长度 - */ - int _iMaxTemQueueSize; /** * 连接类型 */ @@ -1688,7 +1671,7 @@ public: /** * 构造函数 */ - NetThread(TC_EpollServer *epollServer); + NetThread(TC_EpollServer *epollServer, int index); /** * 析构函数 @@ -1766,48 +1749,6 @@ public: */ vector getConnStatus(int lfd); - // /** - // * 获取连接数 - // * - // * @return size_t - // */ - // size_t getConnectionCount() { return _list.size(); } - - // /** - // * 获取监听socket信息 - // * - // * @return map - // */ - // map getListenSocketInfo(); - - // /** - // * 根据名称获取BindAdapter - // * @param sName - // * @return BindAdapterPtr - // */ - // BindAdapterPtr getBindAdapter(const string &sName); - - // /** - // * 关闭连接 - // * @param uid - // */ - // void close(unsigned int uid); - - // /** - // * 发送数据 - // * @param uid - // * @param s - // */ - // void send(unsigned int uid, const string &s, const string &ip, uint16_t port); - - // /** - // * 获取某一监听端口的连接数 - // * @param lfd - // * - // * @return vector - // */ - // vector getConnStatus(int lfd); - /** * 获取连接数 * @@ -1855,11 +1796,6 @@ public: */ void setUdpRecvBufferSize(size_t nSize=DEFAULT_RECV_BUFFERSIZE); - // /* - // *当网络线程中listeners没有监听socket时,使用adapter中设置的最大连接数 - // */ - // void setListSize(size_t iSize) { _listSize += iSize; } - /** * 发送队列的大小 * @return size_t @@ -1898,28 +1834,6 @@ public: */ void delConnection(Connection *cPtr, bool bEraseList = true,EM_CLOSE_T closeType=EM_CLIENT_CLOSE); - // /** - // * 发送数据 - // * @param cPtr - // * @param buffer - // */ - // int sendBuffer(Connection *cPtr, const string &buffer, const string &ip, uint16_t port); - - // /** - // * 发送数据 - // * @param cPtr - // * @return int - // */ - // int sendBuffer(Connection *cPtr); - - // /** - // * 接收buffer - // * @param cPtr - // * @param buffer - // * @return int - // */ - // int recvBuffer(Connection *cPtr, recv_queue::queue_type &v); - /** * 处理管道消息 */ @@ -1930,24 +1844,6 @@ public: */ void processNet(const epoll_event &ev); - // /** - // * 停止线程 - // */ - // void stopThread(); - - // /** - // * 新连接建立 - // * @param fd - // */ - // bool accept(int fd, int domain = AF_INET); - - // /** - // * 绑定端口 - // * @param ep - // * @param s - // */ - // void bind(const TC_Endpoint &ep, TC_Socket &s); - /** * 空连接超时时间 */ @@ -1978,16 +1874,6 @@ public: */ int _threadIndex; - /** - * 监听socket - */ - // map _listeners; - - /** - * 没有监听socket的网络线程时,使用此变量保存adapter信息 - */ - // size_t _listSize; - /** * epoll */ @@ -1998,11 +1884,6 @@ public: */ bool _bTerminate; - /** - * epoll是否已经创建 - */ - // bool _createEpoll; - /** * handle是否已经启动 */ @@ -2013,14 +1894,6 @@ public: */ TC_Epoller::NotifyInfo _notify; - // /** - // * 管道(用于关闭服务) - // */ - // TC_Socket _shutdown; - - // //管道(用于通知有数据需要发送就) - // TC_Socket _notify; - /** * 管理的连接链表 */ @@ -2031,11 +1904,6 @@ public: */ send_queue _sbuffer; - /** - * BindAdapter是否有udp监听 - */ - // bool _hasUdp; - /** *空连接检测机制开关 */ @@ -2056,21 +1924,6 @@ public: * 通知信号 */ bool _notifySignal = false; - /** - * 属于该网络线程的内存池,目前主要用于发送使用 - */ - // TC_BufferPool* _bufferPool; - - // /** - // * 该网络线程的内存池所负责分配的最小字节和最大字节(2的幂向上取整) - // */ - // size_t _poolMinBlockSize; - // size_t _poolMaxBlockSize; - - /** - * 该网络线程的内存池hold的最大字节 - */ - // size_t _poolMaxBytes; }; //////////////////////////////////////////////////////////////////////////// public: diff --git a/util/include/util/tc_network_buffer.h b/util/include/util/tc_network_buffer.h index eab7525..62f03cd 100755 --- a/util/include/util/tc_network_buffer.h +++ b/util/include/util/tc_network_buffer.h @@ -199,6 +199,12 @@ public: */ pair getBufferPointer() const; + /** + * 将链表上的所有buffer拼接起来 + * @return string + */ + void mergeBuffers(); + /** * 返回所有buffer(将所有buffer拼接起来, 注意性能) * @return string diff --git a/util/src/tc_clientsocket.cpp b/util/src/tc_clientsocket.cpp index 5f26639..8c8e51b 100644 --- a/util/src/tc_clientsocket.cpp +++ b/util/src/tc_clientsocket.cpp @@ -301,6 +301,7 @@ int TC_TCPClient::checkSocket() //设置非阻塞模式 _socket.setblock(false); + _socket.setNoCloseWait(); int iRet; #if TARGET_PLATFORM_LINUX diff --git a/util/src/tc_epoll_server.cpp b/util/src/tc_epoll_server.cpp index 719ce77..1a738fd 100644 --- a/util/src/tc_epoll_server.cpp +++ b/util/src/tc_epoll_server.cpp @@ -38,18 +38,6 @@ #endif -// #include -// #include -// #include -// #include -// #include -// #include -// #include -// #include -// #include -// #include -// #include -// #include #if TARS_SSL #include "util/tc_openssl.h" #endif @@ -60,12 +48,10 @@ namespace tars static const int BUFFER_SIZE = 8 * 1024; -// #define H64(x) (((uint64_t)x) << 32) ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // handle的实现 TC_EpollServer::Handle::Handle() : _pEpollServer(NULL) -// , _handleGroup(NULL) , _iWaitTime(100) { } @@ -126,8 +112,6 @@ bool TC_EpollServer::Handle::allFilterIsEmpty() void TC_EpollServer::Handle::setEpollServer(TC_EpollServer *pEpollServer) { - TC_ThreadLock::Lock lock(*this); - _pEpollServer = pEpollServer; } @@ -136,18 +120,6 @@ TC_EpollServer* TC_EpollServer::Handle::getEpollServer() return _pEpollServer; } -// void TC_EpollServer::Handle::setHandleGroup(TC_EpollServer::HandleGroupPtr& pHandleGroup) -// { -// TC_ThreadLock::Lock lock(*this); - -// _handleGroup = pHandleGroup; -// } - -// TC_EpollServer::HandleGroupPtr& TC_EpollServer::Handle::getHandleGroup() -// { -// return _handleGroup; -// } - void TC_EpollServer::Handle::setBindAdapter(BindAdapter* bindAdapter) { _bindAdapter = bindAdapter; @@ -313,126 +285,15 @@ void TC_EpollServer::Handle::handleImp() stopHandle(); } -// void TC_EpollServer::Handle::setWaitTime(uint32_t iWaitTime) -// { -// TC_ThreadLock::Lock lock(*this); - -// _iWaitTime = iWaitTime; -// } - -// void TC_EpollServer::Handle::handleImp() -// { -// startHandle(); - -// while (!getEpollServer()->isTerminate()) -// { -// { -// TC_ThreadLock::Lock lock(_handleGroup->monitor); - -// if (allAdapterIsEmpty() && allFilterIsEmpty()) -// { -// _handleGroup->monitor.timedWait(_iWaitTime); -// } -// } - -// //上报心跳 -// heartbeat(); - -// //为了实现所有主逻辑的单线程化,在每次循环中给业务处理自有消息的机会 -// handleAsyncResponse(); -// handleCustomMessage(true); - - -// tagRecvData* recv = NULL; - -// map& adapters = _handleGroup->adapters; - -// for (auto& kv : adapters) -// { -// BindAdapterPtr& adapter = kv.second; - -// try -// { - -// while (adapter->waitForRecvQueue(recv, 0)) -// { -// //上报心跳 -// heartbeat(); - -// //为了实现所有主逻辑的单线程化,在每次循环中给业务处理自有消息的机会 -// handleAsyncResponse(); - -// tagRecvData& stRecvData = *recv; - -// int64_t now = TNOWMS; - - -// stRecvData.adapter = adapter; - -// //数据已超载 overload -// if (stRecvData.isOverload) -// { -// handleOverload(stRecvData); -// } -// //关闭连接的通知消息 -// else if (stRecvData.isClosed) -// { -// handleClose(stRecvData); -// } -// //数据在队列中已经超时了 -// else if ( (now - stRecvData.recvTimeStamp) > (int64_t)adapter->getQueueTimeout()) -// { -// handleTimeout(stRecvData); -// } -// else -// { -// handle(stRecvData); -// } -// handleCustomMessage(false); - -// delete recv; -// recv = NULL; -// } -// } -// catch (exception &ex) -// { -// if(recv) -// { -// close(recv->uid, recv->fd); -// delete recv; -// recv = NULL; -// } - -// getEpollServer()->error("[Handle::handleImp] error:" + string(ex.what())); -// } -// catch (...) -// { -// if(recv) -// { -// close(recv->uid, recv->fd); -// delete recv; -// recv = NULL; -// } - -// getEpollServer()->error("[Handle::handleImp] unknown error"); -// } -// } -// } - -// stopHandle(); -// } - ////////////////////////////BindAdapter/////////////////////////////////// TC_EpollServer::BindAdapter::BindAdapter(TC_EpollServer *pEpollServer) : _pReportQueue(NULL) , _pReportConRate(NULL) , _pReportTimeoutNum(NULL) , _pEpollServer(pEpollServer) -// , _handleGroup(NULL) , _pf(echo_protocol) , _hf(echo_header_filter) , _name("") -, _handleGroupName("") , _iMaxConns(DEFAULT_MAX_CONN) , _iCurConns(0) , _iHandleNum(0) @@ -566,44 +427,6 @@ void TC_EpollServer::BindAdapter::notifyHandle(uint32_t handleIndex) } } -// void TC_EpollServer::BindAdapter::insertRecvQueue(const recv_queue::queue_type &vtRecvData, bool bPushBack) -// { -// { -// if (bPushBack) -// { -// _rbuffer.push_back(vtRecvData); -// } -// else -// { -// _rbuffer.push_front(vtRecvData); -// } -// } - -// TC_ThreadLock::Lock lock(_handleGroup->monitor); - -// _handleGroup->monitor.notify(); -// } - -// bool TC_EpollServer::BindAdapter::waitForRecvQueue(tagRecvData* &recv, uint32_t iWaitTime) -// { -// bool bRet = false; - -// bRet = _rbuffer.pop_front(recv, iWaitTime); - -// if(!bRet) -// { -// return bRet; -// } - -// return bRet; -// } - -// size_t TC_EpollServer::BindAdapter::getRecvBufferSize() const -// { -// return _rbuffer.size(); -// } - - void TC_EpollServer::BindAdapter::insertRecvQueue(const shared_ptr &recv) { _iBufferSize++; @@ -661,30 +484,6 @@ TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::BindAdapter::echo_header_filter(TC return TC_NetWorkBuffer::PACKET_FULL; } -// int TC_EpollServer::BindAdapter::echo_protocol(string &r, string &o) -// { -// o = r; - -// r = ""; - -// return 1; -// } - -// int TC_EpollServer::BindAdapter::echo_header_filter(int i, string &o) -// { -// return 1; -// } - -// void TC_EpollServer::BindAdapter::setHandleGroupName(const string& handleGroupName) -// { -// _handleGroupName = handleGroupName; -// } - -// string TC_EpollServer::BindAdapter::getHandleGroupName() const -// { -// return _handleGroupName; -// } - void TC_EpollServer::BindAdapter::setName(const string &name) { TC_ThreadLock::Lock lock(*this); @@ -704,8 +503,6 @@ int TC_EpollServer::BindAdapter::getHandleNum() void TC_EpollServer::BindAdapter::setHandleNum(int n) { - TC_ThreadLock::Lock lock(*this); - _iHandleNum = n; } @@ -767,6 +564,7 @@ void TC_EpollServer::BindAdapter::setEndpoint(const string &str) TC_Endpoint TC_EpollServer::BindAdapter::getEndpoint() const { + std::lock_guard lock (_mutex); return _ep; } @@ -787,9 +585,6 @@ size_t TC_EpollServer::BindAdapter::getMaxConns() const void TC_EpollServer::BindAdapter::setHeartBeatTime(time_t t) { - std::lock_guard lock (_mutex); - // TC_ThreadLock::Lock lock(*this); - _iHeartBeatTime = t; } @@ -806,7 +601,6 @@ void TC_EpollServer::BindAdapter::setOrder(EOrder eOrder) void TC_EpollServer::BindAdapter::setAllow(const vector &vtAllow) { std::lock_guard lock (_mutex); - // TC_ThreadLock::Lock lock(*this); _vtAllow = vtAllow; } @@ -814,7 +608,6 @@ void TC_EpollServer::BindAdapter::setAllow(const vector &vtAllow) void TC_EpollServer::BindAdapter::setDeny(const vector &vtDeny) { std::lock_guard lock (_mutex); - // TC_ThreadLock::Lock lock(*this); _vtDeny = vtDeny; } @@ -826,7 +619,6 @@ TC_EpollServer::BindAdapter::EOrder TC_EpollServer::BindAdapter::getOrder() cons vector TC_EpollServer::BindAdapter::getAllow() const { - // TC_ThreadLock::Lock lock(*this); std::lock_guard lock (_mutex); return _vtAllow; @@ -835,7 +627,6 @@ vector TC_EpollServer::BindAdapter::getAllow() const vector TC_EpollServer::BindAdapter::getDeny() const { std::lock_guard lock (_mutex); - // TC_ThreadLock::Lock lock(*this); return _vtDeny; } @@ -885,34 +676,6 @@ TC_EpollServer::header_filter_functor &TC_EpollServer::BindAdapter::getHeaderFil return _hf; } -// void TC_EpollServer::BindAdapter::setProtocol(const TC_EpollServer::protocol_functor &pf, int iHeaderLen, const TC_EpollServer::header_filter_functor &hf) -// { -// _pf = pf; - -// _hf = hf; - -// _iHeaderLen = iHeaderLen; -// } - -// void TC_EpollServer::BindAdapter::setConnProtocol(const TC_NetWorkBuffer::protocol_functor& cpf, int iHeaderLen, const TC_EpollServer::header_filter_functor &hf) -// { -// _cpf = cpf; - -// _hf = hf; - -// _iHeaderLen = iHeaderLen; -// } - -// TC_EpollServer::protocol_functor& TC_EpollServer::BindAdapter::getProtocol() -// { -// return _pf; -// } - -// TC_EpollServer::header_filter_functor& TC_EpollServer::BindAdapter::getHeaderFilterFunctor() -// { -// return _hf; -// } - int TC_EpollServer::BindAdapter::getHeaderFilterLen() { return _iHeaderLen; @@ -942,7 +705,6 @@ TC_EpollServer::Connection::Connection(TC_EpollServer::BindAdapter *pBindAdapter , _sendbuffer(this) , _iHeaderLen(0) , _bClose(false) -, _iMaxTemQueueSize(100) , _enType(EM_TCP) , _bEmptyConn(true) @@ -962,7 +724,6 @@ TC_EpollServer::Connection::Connection(BindAdapter *pBindAdapter, int fd) , _sendbuffer(this) , _iHeaderLen(0) , _bClose(false) -, _iMaxTemQueueSize(100) , _enType(EM_UDP) , _bEmptyConn(false) /*udp is always false*/ @@ -971,23 +732,6 @@ TC_EpollServer::Connection::Connection(BindAdapter *pBindAdapter, int fd) _sock.init(fd, false, pBindAdapter->_ep.isIPv6() ? AF_INET6 : AF_INET); } -// TC_EpollServer::Connection::Connection(BindAdapter *pBindAdapter) -// : _pBindAdapter(pBindAdapter) -// , _uid(0) -// , _lfd(-1) -// , _timeout(0) -// , _port(0) -// , _recvbuffer(this) -// , _sendbuffer(this) -// , _iHeaderLen(0) -// , _bClose(false) -// , _iMaxTemQueueSize(100) -// , _enType(EM_TCP) -// , _bEmptyConn(false) /*udp is always false*/ -// { -// _iLastRefreshTime = TNOW; -// } - TC_EpollServer::Connection::~Connection() { if(_pRecvBuffer) @@ -1059,45 +803,6 @@ void TC_EpollServer::Connection::insertRecvQueue(const shared_ptrisOverloadorDiscard(); - -// if(iRet == 0)//未过载 -// { -// _pBindAdapter->insertRecvQueue(vRecvData); -// } -// else if(iRet == -1)//超过接受队列长度的一半,需要进行overload处理 -// { -// recv_queue::queue_type::iterator it = vRecvData.begin(); - -// recv_queue::queue_type::iterator itEnd = vRecvData.end(); - -// while(it != itEnd) -// { -// (*it)->isOverload = true; - -// ++it; -// } - -// _pBindAdapter->insertRecvQueue(vRecvData,false); -// } -// else//接受队列满,需要丢弃 -// { -// recv_queue::queue_type::iterator it = vRecvData.begin(); - -// recv_queue::queue_type::iterator itEnd = vRecvData.end(); - -// while(it != itEnd) -// { -// delete (*it); -// ++it; -// } -// } -// } -// } int TC_EpollServer::Connection::parseProtocol() { @@ -1155,16 +860,6 @@ int TC_EpollServer::Connection::parseProtocol() // string ro; vector ro; - // int b = TC_EpollServer::PACKET_LESS; - // if (_pBindAdapter->getConnProtocol()) - // { - // b = _pBindAdapter->getConnProtocol()(_recvbuffer, ro, this); - // } - // else - // { - // b = _pBindAdapter->getProtocol()(_recvbuffer, ro); - // } - TC_NetWorkBuffer::PACKET_TYPE b = _pBindAdapter->getProtocol()(_recvbuffer, ro); if(b == TC_NetWorkBuffer::PACKET_LESS) { @@ -1185,33 +880,6 @@ int TC_EpollServer::Connection::parseProtocol() //收到完整包 insertRecvQueue(recv); - - // tagRecvData* recv = new tagRecvData(); - // recv->buffer = std::move(ro); - // recv->ip = _ip; - // recv->port = _port; - // recv->recvTimeStamp = TNOWMS; - // recv->uid = getId(); - // recv->isOverload = false; - // recv->isClosed = false; - // recv->fd = getfd(); - - // //收到完整的包才算 - // this->_bEmptyConn = false; - - // //收到完整包 - // o.push_back(recv); - - // if((int) o.size() > _iMaxTemQueueSize) - // { - // insertRecvQueue(o); - // o.clear(); - // } - - // if(rbuf->empty()) - // { - // break; - // } } else { @@ -1232,114 +900,13 @@ int TC_EpollServer::Connection::parseProtocol() } return 0; - // return o.size(); } -// int TC_EpollServer::Connection::recv(recv_queue::queue_type &o) -// { -// o.clear(); - -// while(true) -// { -// char buffer[32 * 1024]; -// int iBytesReceived = 0; - -// if(_lfd == -1) -// { -// if(_pRecvBuffer) -// { -// iBytesReceived = _sock.recvfrom((void*)_pRecvBuffer,_nRecvBufferSize, _ip, _port, 0); -// } -// else -// { -// iBytesReceived = _sock.recvfrom((void*)buffer,sizeof(buffer), _ip, _port, 0); -// } -// } -// else -// { -// iBytesReceived = ::read(_sock.getfd(), (void*)buffer, sizeof(buffer)); -// } - -// if (iBytesReceived < 0) -// { -// if(errno == EAGAIN) -// { -// //没有数据了 -// break; -// } -// else -// { -// //客户端主动关闭 -// _pBindAdapter->getEpollServer()->debug("recv [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection"); -// return -1; -// } -// } -// else if( iBytesReceived == 0) -// { -// //客户端主动关闭 -// _pBindAdapter->getEpollServer()->debug("recv [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection"); -// return -1; -// } - -// //保存接收到数据 -// if(_lfd == -1) -// { -// if(_pRecvBuffer) -// { -// _recvbuffer.append(_pRecvBuffer, iBytesReceived); -// } -// else -// { -// _recvbuffer.append(buffer, iBytesReceived); -// } -// } -// else -// { -// _recvbuffer.append(buffer, iBytesReceived); -// } - -// //UDP协议 -// if(_lfd == -1) -// { -// if(_pBindAdapter->isIpAllow(_ip) == true) -// { -// parseProtocol(o); -// } -// else -// { -// //udp ip无权限 -// _pBindAdapter->getEpollServer()->debug("accept [" + _ip + ":" + TC_Common::tostr(_port) + "] [" + TC_Common::tostr(_lfd) + "] not allowed"); -// } -// _recvbuffer = ""; -// } -// else -// { -// //接收到数据不超过buffer,没有数据了(如果有数据,内核会再通知你) -// if((size_t)iBytesReceived < sizeof(buffer)) -// { -// break; -// } -// //字符串太长时substr性能会急剧下降 -// if(_recvbuffer.length() > 8192) -// { -// parseProtocol(o); -// } -// } -// } - -// if(_lfd != -1) -// { -// return parseProtocol(o); -// } - -// return o.size(); -// } - - int TC_EpollServer::Connection::recvTcp() { int recvCount = 0; + static int totalRecv = 0; while (true) { // vector buffer(BUFFER_SIZE); @@ -1369,9 +936,8 @@ int TC_EpollServer::Connection::recvTcp() } else { + totalRecv += iBytesReceived; _recvbuffer.addBuffer(buffer, iBytesReceived); - // buffer.resize(iBytesReceived); - // _recvbuffer.addSwapBuffer(buffer); //字符串太长时, 强制解析协议 if (_recvbuffer.getBufferLength() > 8192) { @@ -1458,266 +1024,6 @@ int TC_EpollServer::Connection::recv() return isTcp() ? recvTcp() : recvUdp(); } -// int TC_EpollServer::Connection::send(const string& buffer, const string &ip, uint16_t port, bool byEpollOut) -// { -// const bool isUdp = (_lfd == -1); -// if(isUdp) -// { -// int iRet = _sock.sendto((const void*) buffer.c_str(), buffer.length(), ip, port, 0); -// if(iRet < 0) -// { -// _pBindAdapter->getEpollServer()->error("[TC_EpollServer::Connection] send [" + _ip + ":" + TC_Common::tostr(_port) + "] error"); -// return -1; -// } -// return 0; -// } - -// if (byEpollOut) -// { -// int bytes = this->send(_sendbuffer); -// if (bytes == -1) -// { -// _pBindAdapter->getEpollServer()->debug("send [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection by peer."); -// return -1; -// } - -// this->adjustSlices(_sendbuffer, bytes); -// _pBindAdapter->getEpollServer()->info("byEpollOut [" + _ip + ":" + TC_Common::tostr(_port) + "] send bytes " + TC_Common::tostr(bytes)); -// } -// else -// { -// const size_t kChunkSize = 8 * 1024 * 1024; -// if (!_sendbuffer.empty()) -// { -// TC_BufferPool* pool = _pBindAdapter->getEpollServer()->getNetThreadOfFd(_sock.getfd())->_bufferPool; -// // avoid too big chunk -// for (size_t chunk = 0; chunk * kChunkSize < buffer.size(); chunk ++) -// { -// size_t needs = std::min(kChunkSize, buffer.size() - chunk * kChunkSize); - -// TC_Slice slice = pool->Allocate(needs); -// ::memcpy(slice.data, buffer.data() + chunk * kChunkSize, needs); -// slice.dataLen = needs; - -// _sendbuffer.push_back(slice); -// } -// } -// else -// { -// int bytes = this->tcpSend(buffer.data(), buffer.size()); -// if (bytes == -1) -// { -// _pBindAdapter->getEpollServer()->debug("send [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection by peer."); -// return -1; -// } -// else if (bytes < static_cast(buffer.size())) -// { -// const char* remainData = &buffer[bytes]; -// const size_t remainLen = buffer.size() - static_cast(bytes); - -// TC_BufferPool* pool = _pBindAdapter->getEpollServer()->getNetThreadOfFd(_sock.getfd())->_bufferPool; -// // avoid too big chunk -// for (size_t chunk = 0; chunk * kChunkSize < remainLen; chunk ++) -// { -// size_t needs = std::min(kChunkSize, remainLen - chunk * kChunkSize); - -// TC_Slice slice = pool->Allocate(needs); -// ::memcpy(slice.data, remainData + chunk * kChunkSize, needs); -// slice.dataLen = needs; - -// _sendbuffer.push_back(slice); -// } -// // end -// _pBindAdapter->getEpollServer()->info("EAGAIN[" + _ip + ":" + TC_Common::tostr(_port) + -// ", to sent bytes " + TC_Common::tostr(remainLen) + -// ", total sent " + TC_Common::tostr(buffer.size())); -// } -// } -// } - -// size_t toSendBytes = 0; -// for (const auto& slice : _sendbuffer) -// { -// toSendBytes += slice.dataLen; -// } - -// if (toSendBytes >= 8 * 1024) -// { -// _pBindAdapter->getEpollServer()->info("big _sendbuffer > 8K"); -// size_t iBackPacketBuffLimit = _pBindAdapter->getBackPacketBuffLimit(); - -// if(iBackPacketBuffLimit != 0 && toSendBytes >= iBackPacketBuffLimit) -// { -// _pBindAdapter->getEpollServer()->error("send [" + _ip + ":" + TC_Common::tostr(_port) + "] buffer too long close."); -// clearSlices(_sendbuffer); -// return -2; -// } -// } - - -// //需要关闭链接 -// if(_bClose && _sendbuffer.empty()) -// { -// _pBindAdapter->getEpollServer()->debug("send [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection by user."); -// return -2; -// } - -// return 0; -// } - -// int TC_EpollServer::Connection::send() -// { -// if(_sendbuffer.empty()) return 0; - -// return send("", _ip, _port, true); -// } - -// int TC_EpollServer::Connection::send(const std::vector& slices) -// { -// const int kIOVecCount = std::max(sysconf(_SC_IOV_MAX), 16); // be care of IOV_MAX - -// size_t alreadySentVecs = 0; -// size_t alreadySentBytes = 0; -// while (alreadySentVecs < slices.size()) -// { -// const size_t vc = std::min(slices.size() - alreadySentVecs, kIOVecCount); - -// // convert to iovec array -// std::vector vecs; -// size_t expectSent = 0; -// for (size_t i = alreadySentVecs; i < alreadySentVecs + vc; ++ i) -// { -// assert (slices[i].dataLen > 0); - -// iovec ivc; -// ivc.iov_base = slices[i].data; -// ivc.iov_len = slices[i].dataLen; -// expectSent += slices[i].dataLen; - -// vecs.push_back(ivc); -// } - -// int bytes = tcpWriteV(vecs); -// if (bytes == -1) -// return -1; // should close -// else if (bytes == 0) -// return alreadySentBytes; // EAGAIN -// else if (bytes == static_cast(expectSent)) -// { -// alreadySentBytes += bytes; -// alreadySentVecs += vc; // continue sent -// } -// else -// { -// assert (bytes > 0); // partial send -// alreadySentBytes += bytes; -// return alreadySentBytes; -// } -// } - -// return alreadySentBytes; -// } - - -// int TC_EpollServer::Connection::tcpSend(const void* data, size_t len) -// { -// if (len == 0) -// return 0; - -// int bytes = ::send(_sock.getfd(), data, len, 0); -// if (bytes == -1) -// { -// if (EAGAIN == errno) -// bytes = 0; - -// if (EINTR == errno) -// bytes = 0; // try ::send later -// } - -// return bytes; -// } - -// int TC_EpollServer::Connection::tcpWriteV(const std::vector& buffers) -// { -// const int kIOVecCount = std::max(sysconf(_SC_IOV_MAX), 16); // be care of IOV_MAX -// const int cnt = static_cast(buffers.size()); - -// assert (cnt <= kIOVecCount); - -// const int sock = _sock.getfd(); - -// int bytes = static_cast(::writev(sock, &buffers[0], cnt)); -// if (bytes == -1) -// { -// assert (errno != EINVAL); -// if (errno == EAGAIN) -// return 0; - -// return -1; // can not send any more -// } -// else -// { -// return bytes; -// } -// } - -// void TC_EpollServer::Connection::clearSlices(std::vector& slices) -// { -// adjustSlices(slices, std::numeric_limits::max()); -// } - -// void TC_EpollServer::Connection::adjustSlices(std::vector& slices, size_t toSkippedBytes) -// { -// size_t skippedVecs = 0; -// for (size_t i = 0; i < slices.size(); ++ i) -// { -// assert (slices[i].dataLen > 0); -// if (toSkippedBytes >= slices[i].dataLen) -// { -// toSkippedBytes -= slices[i].dataLen; -// ++ skippedVecs; -// } -// else -// { -// if (toSkippedBytes != 0) -// { -// const char* src = (const char*)slices[i].data + toSkippedBytes; -// memmove(slices[i].data, src, slices[i].dataLen - toSkippedBytes); -// slices[i].dataLen -= toSkippedBytes; -// } - -// break; -// } -// } - -// // free to pool -// TC_BufferPool* pool = _pBindAdapter->getEpollServer()->getNetThreadOfFd(_sock.getfd())->_bufferPool; -// assert (pool); -// for (size_t i = 0; i < skippedVecs; ++ i) -// { -// pool->Deallocate(slices[i]); -// } - -// slices.erase(slices.begin(), slices.begin() + skippedVecs); -// } - -// bool TC_EpollServer::Connection::setRecvBuffer(size_t nSize) -// { -// //only udp type needs to malloc -// if(_lfd == -1 && !_pRecvBuffer) -// { -// _nRecvBufferSize = nSize; - -// _pRecvBuffer = new char[_nRecvBufferSize]; -// if(!_pRecvBuffer) -// { -// throw TC_Exception("adapter '" + _pBindAdapter->getName() + "' malloc udp receive buffer fail"); -// } -// } -// return true; -// } - - int TC_EpollServer::Connection::sendBuffer() { while(!_sendbuffer.empty()) @@ -1791,13 +1097,6 @@ int TC_EpollServer::Connection::send(const shared_ptr &sc) return isTcp() ? sendTcp(sc) : sendUdp(sc); } -// bool TC_EpollServer::Connection::setClose() -// { -// _bClose = true; -// return _sendbuffer.empty(); -// } - - bool TC_EpollServer::Connection::setRecvBuffer(size_t nSize) { //only udp type needs to malloc @@ -1890,7 +1189,6 @@ TC_EpollServer::Connection* TC_EpollServer::ConnectionList::get(uint32_t uid) void TC_EpollServer::ConnectionList::add(Connection *cPtr, time_t iTimeOutStamp) { TC_LockT lock(_mutex); - // TC_ThreadLock::Lock lock(*this); uint32_t muid = cPtr->getId(); uint32_t magi = muid & (0xFFFFFFFF << 22); @@ -1904,7 +1202,6 @@ void TC_EpollServer::ConnectionList::add(Connection *cPtr, time_t iTimeOutStamp) void TC_EpollServer::ConnectionList::refresh(uint32_t uid, time_t iTimeOutStamp) { TC_LockT lock(_mutex); - // TC_ThreadLock::Lock lock(*this); uint32_t magi = uid & (0xFFFFFFFF << 22); uid = uid & (0x7FFFFFFF >> 9); @@ -2033,8 +1330,6 @@ void TC_EpollServer::ConnectionList::del(uint32_t uid) { TC_LockT lock(_mutex); - // TC_ThreadLock::Lock lock(*this); - uint32_t magi = uid & (0xFFFFFFFF << 22); uid = uid & (0x7FFFFFFF >> 9); @@ -2061,50 +1356,31 @@ void TC_EpollServer::ConnectionList::_del(uint32_t uid) size_t TC_EpollServer::ConnectionList::size() { TC_LockT lock(_mutex); - // TC_ThreadLock::Lock lock(*this); return _total - _free_size; } //////////////////////////////NetThread////////////////////////////////// -TC_EpollServer::NetThread::NetThread(TC_EpollServer *epollServer) +TC_EpollServer::NetThread::NetThread(TC_EpollServer *epollServer, int index) : _epollServer(epollServer) -// , _listSize(0) +, _threadIndex(index) , _bTerminate(false) -// , _createEpoll(false) , _handleStarted(false) , _list(this) -// , _hasUdp(false) , _bEmptyConnAttackCheck(false) , _iEmptyCheckTimeout(MIN_EMPTY_CONN_TIMEOUT) , _nUdpRecvBufferSize(DEFAULT_RECV_BUFFERSIZE) -// , _bufferPool(NULL) { _epoller.create(10240); _notify.init(&_epoller); _notify.add(_notify.notifyFd()); - - // _shutdown.createSocket(); - // _notify.createSocket(); } TC_EpollServer::NetThread::~NetThread() { - // for (auto& kv : _listeners) - // { - // ::close(kv.first); - // } - // _listeners.clear(); - - // delete _bufferPool; } -// map TC_EpollServer::NetThread::getListenSocketInfo() -// { -// return _listeners; -// } - void TC_EpollServer::NetThread::debug(const string &s) { _epollServer->debug(s); @@ -2145,147 +1421,11 @@ int TC_EpollServer::NetThread::getEmptyConnTimeout() const return _iEmptyCheckTimeout; } -// int TC_EpollServer::NetThread::bind(BindAdapterPtr &lsPtr) -// { -// for (const auto& kv : _listeners) -// { -// if(kv.second->getName() == lsPtr->getName()) -// { -// throw TC_Exception("bind name '" + lsPtr->getName() + "' conflicts."); -// } -// } - -// const TC_Endpoint &ep = lsPtr->getEndpoint(); - -// TC_Socket& s = lsPtr->getSocket(); - -// bind(ep, s); - -// _listeners[s.getfd()] = lsPtr; - -// return s.getfd(); -// } - -// TC_EpollServer::BindAdapterPtr TC_EpollServer::NetThread::getBindAdapter(const string &sName) -// { -// for (const auto& kv : _listeners) -// { -// if(kv.second->getName() == sName) -// return kv.second; -// } - -// return NULL; -// } - -// void TC_EpollServer::NetThread::bind(const TC_Endpoint &ep, TC_Socket &s) -// { -// int type = ep.isUnixLocal() ? AF_LOCAL : ep.isIPv6() ? AF_INET6 : AF_INET; - -// if(ep.isTcp()) -// { -// s.createSocket(SOCK_STREAM, type); -// } -// else -// { -// s.createSocket(SOCK_DGRAM, type); -// } - -// if(ep.isUnixLocal()) -// { -// s.bind(ep.getHost().c_str()); -// } -// else -// { -// s.bind(ep.getHost(), ep.getPort()); -// } - -// if(ep.isTcp() && !ep.isUnixLocal()) -// { -// s.listen(1024); -// s.setKeepAlive(); -// s.setTcpNoDelay(); -// //不要设置close wait否则http服务回包主动关闭连接会有问题 -// s.setNoCloseWait(); -// } - -// s.setblock(false); -// } - void TC_EpollServer::NetThread::createEpoll(uint32_t maxAllConn) { - // if(!_createEpoll) - // { - // _createEpoll = true; - - // // 创建本网络线程的内存池 - // assert (!_bufferPool); - // _bufferPool = new TC_BufferPool(_poolMinBlockSize, _poolMaxBlockSize); - // _bufferPool->SetMaxBytes(_poolMaxBytes); - - // //创建epoll - // _epoller.create(10240); - - // _epoller.add(_shutdown.getfd(), H64(ET_CLOSE), EPOLLIN); - // _epoller.add(_notify.getfd(), H64(ET_NOTIFY), EPOLLIN); - - // size_t maxAllConn = 0; - - // //监听socket - // for (const auto& kv : _listeners) - // { - // if(kv.second->getEndpoint().isTcp()) - // { - // //获取最大连接数 - // maxAllConn += kv.second->getMaxConns(); - - // _epoller.add(kv.first, H64(ET_LISTEN) | kv.first, EPOLLIN); - // } - // else - // { - // maxAllConn++; - // _hasUdp = true; - // } - // } - - // if(maxAllConn == 0) - // { - // //当网络线程中listeners没有监听socket时,使用adapter中设置的最大连接数 - // maxAllConn = _listSize; - // } - - // if(maxAllConn >= (1 << 22)) - // { - // error("createEpoll connection num: " + TC_Common::tostr(maxAllConn) + " >= " + TC_Common::tostr(1 << 22)); - // maxAllConn = (1 << 22) - 1; - // } - - // //初始化连接管理链表 - // _list.init(maxAllConn, iIndex); - // } _list.init((uint32_t)maxAllConn, _threadIndex + 1); } - -// void TC_EpollServer::NetThread::initUdp() -// { -// if(_hasUdp) -// { -// //监听socket -// for (const auto& kv : _listeners) -// { -// if (!kv.second->getEndpoint().isTcp()) -// { -// Connection *cPtr = new Connection(kv.second.get(), kv.first); -// //udp分配接收buffer -// cPtr->setRecvBuffer(_nUdpRecvBufferSize); - -// _epollServer->addConnection(cPtr, kv.first, UDP_CONNECTION); -// } -// } -// } -// } - - void TC_EpollServer::NetThread::initUdp(const unordered_map &listeners) { //监听socket @@ -2312,95 +1452,8 @@ void TC_EpollServer::NetThread::terminate() _bTerminate = true; _notify.notify(); - // //通知队列醒过来 - // _sbuffer.notifyT(); - - // //通知epoll响应, 关闭连接 - // _epoller.mod(_shutdown.getfd(), H64(ET_CLOSE), EPOLLOUT); } -// bool TC_EpollServer::NetThread::accept(int fd, int domain) -// { -// struct sockaddr_in stSockAddr4; -// struct sockaddr_in6 stSockAddr6; - -// socklen_t iSockAddrSize = (AF_INET6 == domain) ? sizeof(sockaddr_in6) : sizeof(sockaddr_in); -// struct sockaddr *stSockAddr = (AF_INET6 == domain) ? (struct sockaddr *)&stSockAddr6 : (struct sockaddr *)&stSockAddr4; - -// TC_Socket cs; - -// cs.setOwner(false); - -// //接收连接 -// TC_Socket s; - -// s.init(fd, false, domain); - -// int iRetCode = s.accept(cs, stSockAddr, iSockAddrSize); - -// if (iRetCode > 0) -// { -// string ip; - -// uint16_t port; - -// char sAddr[INET6_ADDRSTRLEN] = "\0"; - -// inet_ntop(domain, (AF_INET6 == domain) ? (const void *)&stSockAddr6.sin6_addr : (const void *)&stSockAddr4.sin_addr, sAddr, sizeof(sAddr)); - -// ip = sAddr; -// port = (AF_INET6 == domain) ? ntohs(stSockAddr6.sin6_port) : ntohs(stSockAddr4.sin_port); - -// debug("accept [" + ip + ":" + TC_Common::tostr(port) + "] [" + TC_Common::tostr(cs.getfd()) + "] incomming"); - -// if(!_listeners[fd]->isIpAllow(ip)) -// { -// debug("accept [" + ip + ":" + TC_Common::tostr(port) + "] [" + TC_Common::tostr(cs.getfd()) + "] not allowed"); - -// cs.close(); - -// return true; -// } - -// if(_listeners[fd]->isLimitMaxConnection()) -// { -// cs.close(); - -// error("accept [" + ip + ":" + TC_Common::tostr(port) + "][" + TC_Common::tostr(cs.getfd()) + "] beyond max connection:" + TC_Common::tostr(_listeners[fd]->getMaxConns())); - -// return true; -// } - -// cs.setblock(false); -// cs.setKeepAlive(); -// cs.setTcpNoDelay(); -// cs.setCloseWaitDefault(); - -// int timeout = _listeners[fd]->getEndpoint().getTimeout()/1000; - -// Connection *cPtr = new Connection(_listeners[fd].get(), fd, (timeout < 2 ? 2 : timeout), cs.getfd(), ip, port); - -// //过滤连接首个数据包包头 -// cPtr->setHeaderFilterLen(_listeners[fd]->getHeaderFilterLen()); - -// //addTcpConnection(cPtr); -// _epollServer->addConnection(cPtr, cs.getfd(), TCP_CONNECTION); - -// return true; -// } -// else -// { -// //直到发生EAGAIN才不继续accept -// if(errno == EAGAIN) -// { -// return false; -// } - -// return true; -// } -// return true; -// } - void TC_EpollServer::NetThread::addTcpConnection(TC_EpollServer::Connection *cPtr) { uint32_t uid = _list.getUniqId(); @@ -2476,25 +1529,6 @@ void TC_EpollServer::NetThread::delConnection(TC_EpollServer::Connection *cPtr, uint32_t uid = cPtr->getId(); //构造一个tagRecvData,通知业务该连接的关闭事件 - - // tagRecvData* recv = new tagRecvData(); - // recv->adapter = cPtr->getBindAdapter(); - // recv->uid = uid; - // recv->ip = cPtr->getIp(); - // recv->port = cPtr->getPort(); - // recv->isClosed = true; - // recv->isOverload = false; - // recv->recvTimeStamp = TNOWMS; - // recv->fd = cPtr->getfd(); - // recv->closeType = (int)closeType; - - // recv_queue::queue_type vRecvData; - - // vRecvData.push_back(recv); - - // cPtr->getBindAdapter()->insertRecvQueue(vRecvData); - - shared_ptr recv = std::make_shared(uid, cPtr->getIp(), cPtr->getPort(), cPtr->getfd(), cPtr->getBindAdapter(), true, (int)closeType); //如果是merge模式,则close直接交给网络线程处理 @@ -2522,22 +1556,6 @@ void TC_EpollServer::NetThread::delConnection(TC_EpollServer::Connection *cPtr, } } -// int TC_EpollServer::NetThread::sendBuffer(TC_EpollServer::Connection *cPtr, const string &buffer, const string &ip, uint16_t port) -// { -// return cPtr->send(buffer, ip, port); -// } - -// int TC_EpollServer::NetThread::sendBuffer(TC_EpollServer::Connection *cPtr) -// { -// return cPtr->send(); -// } - -// int TC_EpollServer::NetThread::recvBuffer(TC_EpollServer::Connection *cPtr, recv_queue::queue_type &v) -// { -// return cPtr->recv(v); -// } - - void TC_EpollServer::NetThread::notify() { _notify.notify(); @@ -2577,45 +1595,6 @@ void TC_EpollServer::NetThread::send(const shared_ptr &data) } } -// void TC_EpollServer::NetThread::close(uint32_t uid) -// { -// tagSendData* send = new tagSendData(); - -// send->uid = uid; - -// send->cmd = 'c'; - -// _sbuffer.push_back(send); - -// //通知epoll响应, 关闭连接 -// _epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT); -// } - -// void TC_EpollServer::NetThread::send(uint32_t uid, const string &s, const string &ip, uint16_t port) -// { -// if(_bTerminate) -// { -// return; -// } - -// tagSendData* send = new tagSendData(); - -// send->uid = uid; - -// send->cmd = 's'; - -// send->buffer = s; - -// send->ip = ip; - -// send->port = port; - -// _sbuffer.push_back(send); - -// //通知epoll响应, 有数据要发送 -// _epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT); -// } - void TC_EpollServer::NetThread::processPipe() { _notifySignal = false; @@ -2653,7 +1632,6 @@ void TC_EpollServer::NetThread::processPipe() } #endif int ret = cPtr->send(sc); - // int ret = sendBuffer(cPtr, (*it)->buffer, (*it)->ip, (*it)->port); if(ret < 0) { @@ -2692,6 +1670,7 @@ void TC_EpollServer::NetThread::processNet(const epoll_event &ev) if (TC_Epoller::readEvent(ev)) { int ret = cPtr->recv(); + if (ret < 0) { delConnection(cPtr, true, EM_CLIENT_CLOSE); @@ -2703,7 +1682,6 @@ void TC_EpollServer::NetThread::processNet(const epoll_event &ev) if (TC_Epoller::writeEvent(ev)) { int ret = cPtr->sendBuffer(); - if (ret < 0) { delConnection(cPtr, true, (ret == -1) ? EM_CLIENT_CLOSE : EM_SERVER_CLOSE); @@ -2713,46 +1691,6 @@ void TC_EpollServer::NetThread::processNet(const epoll_event &ev) } _list.refresh(uid, cPtr->getTimeout() + TNOW); - - // if (ev.events & EPOLLERR || ev.events & EPOLLHUP) - // { - // delConnection(cPtr,true,EM_SERVER_CLOSE); - - // return; - // } - - // if(ev.events & EPOLLIN) //有数据需要读取 - // { - // recv_queue::queue_type vRecvData; - - // int ret = recvBuffer(cPtr, vRecvData); - - // if(ret < 0) - // { - // delConnection(cPtr,true,EM_CLIENT_CLOSE); - - // return; - // } - - // if(!vRecvData.empty()) - // { - // cPtr->insertRecvQueue(vRecvData); - // } - // } - - // if (ev.events & EPOLLOUT) //有数据需要发送 - // { - // int ret = sendBuffer(cPtr); - - // if (ret < 0) - // { - // delConnection(cPtr,true,(ret==-1)?EM_CLIENT_CLOSE:EM_SERVER_CLOSE); - - // return; - // } - // } - - // _list.refresh(uid, cPtr->getTimeout() + TNOW); } void TC_EpollServer::NetThread::run() @@ -2816,55 +1754,6 @@ void TC_EpollServer::NetThread::run() error("run exception:" + string(ex.what())); } } - - // for(int i = 0; i < iEvNum; ++i) - // { - // try - // { - // const epoll_event &ev = _epoller.get(i); - - - // uint32_t h = ev.data.u64 >> 32; - - // switch(h) - // { - // case ET_LISTEN: - // { - // //监听端口有请求 - // auto it = _listeners.find(ev.data.u32); - // if( it != _listeners.end()) - // { - // if(ev.events & EPOLLIN) - // { - // bool ret; - // do - // { - // ret = accept(ev.data.u32, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET); - // }while(ret); - // } - // } - // } - // break; - // case ET_CLOSE: - // //关闭请求 - // break; - // case ET_NOTIFY: - // //发送通知 - // processPipe(); - // break; - // case ET_NET: - // //网络请求 - // processNet(ev); - // break; - // default: - // assert(true); - // } - // } - // catch(exception &ex) - // { - // error("run exception:" + string(ex.what())); - // } - // } } } size_t TC_EpollServer::NetThread::getSendRspSize() @@ -2902,7 +1791,7 @@ TC_EpollServer::TC_EpollServer(unsigned int iNetThreadNum) for (size_t i = 0; i < _netThreadNum; ++i) { - TC_EpollServer::NetThread* netThreads = new TC_EpollServer::NetThread(this); + TC_EpollServer::NetThread* netThreads = new TC_EpollServer::NetThread(this, i); _netThreads.push_back(netThreads); } } @@ -2947,9 +1836,6 @@ bool TC_EpollServer::accept(int fd, int domain) socklen_t iSockAddrSize = (AF_INET6 == domain) ? sizeof(::sockaddr_in6) : sizeof(sockaddr_in); struct sockaddr* stSockAddr = (AF_INET6 == domain) ? (struct sockaddr*) & stSockAddr6 : (struct sockaddr*) & stSockAddr4; - //struct sockaddr_in stSockAddr; - //SOCKET_LEN_TYPE iSockAddrSize = sizeof(sockaddr_in); - TC_Socket cs; cs.setOwner(false); @@ -2964,13 +1850,8 @@ bool TC_EpollServer::accept(int fd, int domain) string ip; uint16_t port; -// char sAddr[128] = "\0"; char sAddr[INET6_ADDRSTRLEN] = "\0"; - // struct sockaddr_in *p = (struct sockaddr_in *)&stSockAddr; - // inet_ntop(AF_INET, &p->sin_addr, sAddr, sizeof(sAddr)); - // port = ntohs(p->sin_port); - inet_ntop(domain, (AF_INET6 == domain) ? ( void *)&stSockAddr6.sin6_addr : ( void *)&stSockAddr4.sin_addr, sAddr, sizeof(sAddr)); port = (AF_INET6 == domain) ? ntohs(stSockAddr6.sin6_port) : ntohs(stSockAddr4.sin_port); ip = sAddr; @@ -3021,46 +1902,6 @@ bool TC_EpollServer::accept(int fd, int domain) return true; } -// void TC_EpollServer::waitForShutdown() -// { -// for (size_t i = 0; i < _netThreadNum; ++i) -// { -// _netThreads[i]->start(); -// } - -// debug("server netthread num : " + TC_Common::tostr(_netThreadNum)); - -// while(!_bTerminate) -// { -// { -// TC_ThreadLock::Lock sync(*this); -// timedWait(5000); -// } -// } - -// if(_bTerminate) -// { -// for(size_t i = 0; i < _netThreads.size(); ++i) -// { -// _netThreads[i]->terminate(); -// _netThreads[i]->getThreadControl().join(); -// } - -// stopThread(); -// } -// } - -// void TC_EpollServer::terminate() -// { -// if(!_bTerminate) -// { -// tars::TC_ThreadLock::Lock sync(*this); -// _bTerminate = true; -// notifyAll(); -// } -// } - - void TC_EpollServer::waitForShutdown() { @@ -3075,84 +1916,70 @@ void TC_EpollServer::waitForShutdown() } int64_t iLastCheckTime = TNOWMS; - // while (!_bTerminate) - // { - // try - // { - //循环监听网路连接请求 - while (!_bTerminate) - { - int iEvNum = _epoller.wait(1000); - if (_bTerminate) - break; + while (!_bTerminate) + { + int iEvNum = _epoller.wait(1000); - if(TNOWMS - iLastCheckTime > 1000) { - try { _hf(this); } catch(...) {} - iLastCheckTime = TNOWMS; - } + if (_bTerminate) + break; - for (int i = 0; i < iEvNum; ++i) - { - try - { - const epoll_event &ev = _epoller.get(i); + if(TNOWMS - iLastCheckTime > 1000) { + try { _hf(this); } catch(...) {} + iLastCheckTime = TNOWMS; + } - uint32_t fd = TC_Epoller::getU32(ev, false); + for (int i = 0; i < iEvNum; ++i) + { + try + { + const epoll_event &ev = _epoller.get(i); - auto it = _listeners.find(fd); + uint32_t fd = TC_Epoller::getU32(ev, false); - if (it != _listeners.end()) - { - //manualListen 会进入这个场景 - if (TC_Epoller::writeEvent(ev)) - { - TC_Socket s; - s.init(fd, false); - s.listen(1024); + auto it = _listeners.find(fd); - debug("run listen fd: " + TC_Common::tostr(fd)); - } + if (it != _listeners.end()) + { + //manualListen 会进入这个场景 + if (TC_Epoller::writeEvent(ev)) + { + TC_Socket s; + s.init(fd, false); + s.listen(1024); - //监听端口有请求 - if (TC_Epoller::readEvent(ev)) - { + debug("run listen fd: " + TC_Common::tostr(fd)); + } + + //监听端口有请求 + if (TC_Epoller::readEvent(ev)) + { #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS - bool ret; - do - { + bool ret; + do + { - ret = accept(fd, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET); - } while (ret); + ret = accept(fd, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET); + } while (ret); #else - accept(fd, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET); + accept(fd, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET); #endif - } - } - } - catch (exception &ex) - { - error("run exception:" + string(ex.what())); - } - catch (...) - { - error("run exception"); - } - } - } - // } - // catch (exception &ex) - // { - // error(string("TC_EpollServer::waitForShutdown error : ") + ex.what()); - // } - // catch (...) - // { - // error("TC_EpollServer::waitForShutdown unknown error"); - // } - // } - + } + } + } + catch (exception &ex) + { + error("run exception:" + string(ex.what())); + } + catch (...) + { + error("run exception"); + } + } + } + for (size_t i = 0; i < _netThreads.size(); ++i) { if (_netThreads[i]->isAlive()) @@ -3232,7 +2059,7 @@ void TC_EpollServer::bind(const TC_Endpoint &ep, TC_Socket &s, bool manualListen if(!manualListen) { //手工监听 - s.listen(1024); + s.listen(10240); } s.setKeepAlive(); s.setTcpNoDelay(); @@ -3270,41 +2097,11 @@ int TC_EpollServer::bind(BindAdapterPtr &lsPtr) return s.getfd(); } -// void TC_EpollServer::setNetThreadBufferPoolInfo(size_t minBlock, size_t maxBlock, size_t maxBytes) -// { -// for(size_t i = 0; i < _netThreads.size(); ++i) -// { -// _netThreads[i]->_poolMinBlockSize = minBlock; -// _netThreads[i]->_poolMaxBlockSize = maxBlock; -// _netThreads[i]->_poolMaxBytes = maxBytes; -// } -// } - -// int TC_EpollServer::bind(TC_EpollServer::BindAdapterPtr &lsPtr) -// { -// int iRet = 0; - -// for(size_t i = 0; i < _netThreads.size(); ++i) -// { -// if(i == 0) -// { -// iRet = _netThreads[i]->bind(lsPtr); -// } -// else -// { -// //当网络线程中listeners没有监听socket时,list使用adapter中设置的最大连接数作为初始化 -// _netThreads[i]->setListSize(lsPtr->getMaxConns()); -// } -// } - -// return iRet; -// } - void TC_EpollServer::addConnection(TC_EpollServer::Connection * cPtr, int fd, TC_EpollServer::CONN_TYPE iType) { TC_EpollServer::NetThread* netThread = getNetThreadOfFd(fd); - if(iType == 0) + if(iType == TCP_CONNECTION) { netThread->addTcpConnection(cPtr); } @@ -3314,26 +2111,6 @@ void TC_EpollServer::addConnection(TC_EpollServer::Connection * cPtr, int fd, TC } } -// void TC_EpollServer::startHandle() -// { -// if (!_handleStarted) -// { -// _handleStarted = true; - -// for (auto& kv : _handleGroups) -// { -// auto& hds = kv.second->handles; - -// for (auto& handle : hds) -// { -// if (!handle->isAlive()) -// handle->start(); -// } -// } -// } -// } - - void TC_EpollServer::startHandle() { if(!this->isMergeHandleNetThread()) @@ -3356,27 +2133,6 @@ void TC_EpollServer::startHandle() } } -// void TC_EpollServer::stopThread() -// { -// for (auto& kv : _handleGroups) -// { -// { -// TC_ThreadLock::Lock lock(kv.second->monitor); - -// kv.second->monitor.notifyAll(); -// } -// auto& hds = kv.second->handles; -// for (auto& handle : hds) -// { -// if (handle->isAlive()) -// { -// handle->getThreadControl().join(); -// } -// } -// } -// } - - void TC_EpollServer::stopThread() { if(!this->isMergeHandleNetThread()) @@ -3404,20 +2160,6 @@ void TC_EpollServer::stopThread() } } -// void TC_EpollServer::createEpoll() -// { -// for(size_t i = 0; i < _netThreads.size(); ++i) -// { -// _netThreads[i]->createEpoll(i+1); -// } -// //必须先等所有网络线程调用createEpoll(),初始化list后,才能调用initUdp() -// for(size_t i = 0; i < _netThreads.size(); ++i) -// { -// _netThreads[i]->initUdp(); -// } -// } - - void TC_EpollServer::createEpoll() { @@ -3438,18 +2180,11 @@ void TC_EpollServer::createEpoll() else { maxAllConn++; -// _hasUdp = true; } ++it; } -// if (maxAllConn == 0) -// { -// //当网络线程中listeners没有监听socket时,使用adapter中设置的最大连接数 -// maxAllConn = _listSize; -// } - if (maxAllConn >= (1 << 22)) { error("createEpoll connection num: " + TC_Common::tostr(maxAllConn) + " >= " + TC_Common::tostr(1 << 22)); @@ -3489,35 +2224,6 @@ vector TC_EpollServer::getBindAdapters() return this->_bindAdapters; } -// TC_EpollServer::BindAdapterPtr TC_EpollServer::getBindAdapter(const string &sName) -// { -// for(size_t i = 0; i < _netThreads.size(); ++i) -// { -// TC_EpollServer::BindAdapterPtr ptr = _netThreads[i]->getBindAdapter(sName); -// if(ptr) -// { -// return ptr; -// } - -// } - -// return NULL; -// } -// void TC_EpollServer::close(unsigned int uid, int fd) -// { -// TC_EpollServer::NetThread* netThread = getNetThreadOfFd(fd); - -// netThread->close(uid); -// } - -// void TC_EpollServer::send(unsigned int uid, const string &s, const string &ip, uint16_t port, int fd) -// { -// TC_EpollServer::NetThread* netThread = getNetThreadOfFd(fd); - -// netThread->send(uid, s, ip, port); -// } - - void TC_EpollServer::close(const shared_ptr &data) { TC_EpollServer::NetThread *netThread = getNetThreadOfFd(data->fd()); @@ -3576,20 +2282,6 @@ unordered_map TC_EpollServer::getListenSock return _listeners; } -// map TC_EpollServer::getListenSocketInfo() -// { -// map mListen; -// for(size_t i = 0; i < _netThreads.size(); ++i) -// { -// auto tmp = _netThreads[i]->getListenSocketInfo(); -// for (const auto& kv : tmp) -// { -// mListen.insert(kv); -// } -// } -// return mListen; -// } - size_t TC_EpollServer::getConnectionCount() { size_t iConnTotal = 0; @@ -3600,18 +2292,6 @@ size_t TC_EpollServer::getConnectionCount() return iConnTotal; } -// unsigned int TC_EpollServer::getLogicThreadNum() -// { -// unsigned int iNum = 0; -// for (const auto& kv : _handleGroups) -// { -// iNum += kv.second->handles.size(); -// } - -// return iNum; -// } - - size_t TC_EpollServer::getLogicThreadNum() { if(this->isMergeHandleNetThread()) diff --git a/util/src/tc_network_buffer.cpp b/util/src/tc_network_buffer.cpp index 1e3e97b..f1ff085 100755 --- a/util/src/tc_network_buffer.cpp +++ b/util/src/tc_network_buffer.cpp @@ -58,6 +58,21 @@ pair TC_NetWorkBuffer::getBufferPointer() const return make_pair(it->data() + _pos, it->size() - _pos); } +void TC_NetWorkBuffer::mergeBuffers() +{ + //merge to one buffer + if(_bufferList.size() > 1) + { + vector buffer = getBuffers(); + + _pos = 0; + _bufferList.clear(); + _bufferList.push_back(buffer); + } + + assert(_bufferList.size() <= 1); +} + string TC_NetWorkBuffer::getBuffersString() const { string buffer; @@ -86,8 +101,8 @@ string TC_NetWorkBuffer::getBuffersString() const vector TC_NetWorkBuffer::getBuffers() const { - vector buffer; - buffer.resize(_length); + vector buffer; + buffer.resize(_length); auto it = _bufferList.begin(); @@ -96,17 +111,17 @@ vector TC_NetWorkBuffer::getBuffers() const { if(it == _bufferList.begin()) { - memcpy(&buffer[pos], it->data() + _pos, it->size() - _pos); - pos += it->size() - _pos; + memcpy(&buffer[pos], it->data() + _pos, it->size() - _pos); + pos += it->size() - _pos; } else { - memcpy(&buffer[pos], it->data(), it->size()); - pos += it->size(); + memcpy(&buffer[pos], it->data(), it->size()); + pos += it->size(); } ++it; } - + return buffer; } @@ -261,65 +276,25 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseBufferOf4(vector &buf return parseBuffer(buffer, minLength, maxLength); } -//bool TC_NetWorkBuffer::checkHttp(string &buffer) const -//{ -// //这样处理性能是有问题的, 有提升的空间 -// buffer = getBuffers(); -// -// return TC_HttpRequest::checkRequest(buffer.c_str(), buffer.length()); -//} -// -//TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseHttp(TC_NetWorkBuffer&in, string &out) -//{ -// try -// { -// string buffer; -// bool b = in.checkHttp(buffer); -// if (b) -// { -// out.swap(buffer); -// in.clearBuffers();; -// return PACKET_FULL; -// } -// else -// { -// return PACKET_LESS; -// } -// } -// catch (exception &ex) -// { -// return PACKET_ERR; -// } -// -// return PACKET_LESS; -//} - TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::checkHttp() { try { - const static int headerLen = 10; - if(_bufferList.empty() || getBufferLength() <= headerLen) + mergeBuffers(); + + pair buffer = getBufferPointer(); + + if(buffer.first == NULL || buffer.second == 0) { return PACKET_LESS; } - vector buffer; - getHeader(headerLen, buffer); + bool b = TC_HttpRequest::checkRequest(buffer.first, buffer.second); - bool b = TC_HttpRequest::checkRequest(buffer.data(), buffer.size()); - if (b) - { - return PACKET_FULL; - } - else - { - return PACKET_LESS; - } + return b ? PACKET_FULL : PACKET_LESS; } catch (exception &ex) { - cout << ex.what() << endl; return PACKET_ERR; } @@ -333,7 +308,6 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseHttp(TC_NetWorkBuffer&in, v if (b == PACKET_FULL) { out = in.getBuffers(); - in.clearBuffers(); } return b; @@ -348,9 +322,8 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseEcho(TC_NetWorkBuffer&in, v { return PACKET_LESS; } - + out = in.getBuffers(); - in.clearBuffers(); return TC_NetWorkBuffer::PACKET_FULL; } catch (exception &ex)