client socket in windows succ

This commit is contained in:
ruanshudong 2020-02-19 22:09:05 +08:00
parent 40ca80bfed
commit e05a67ef87
11 changed files with 136 additions and 41 deletions

View File

@ -23,7 +23,7 @@ if(WIN32)
add_custom_target(run-http
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
DEPENDS HttpServer HttpClient
COMMAND servant\\script\\busybox.exe bash examples\\scripts\\run-http.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${CMAKE_SOURCE_DIR}
COMMAND examples\\scripts\\run-http.bat ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} ${CMAKE_SOURCE_DIR}
COMMENT "call run http")
add_custom_target(run-auth

View File

@ -1 +1,2 @@
build_tars_server("HttpClient" "HttpServer")
#build_tars_server("HttpClient" "HttpServer")
build_tars_server("HttpClient" "")

View File

@ -51,11 +51,16 @@ void httpCall(int excut_num)
{
int64_t _iTime = TC_TimeProvider::getInstance()->getNowMs();
string sServer1("http://134.175.105.92:8081/");
// string sServer1("http://134.175.105.92:8081/");
string sServer1("http://127.0.0.1:8081/");
TC_HttpRequest stHttpReq;
stHttpReq.setCacheControl("no-cache");
stHttpReq.setGetRequest(sServer1);
// stHttpReq.setGetRequest(sServer1);
TC_TCPClient client ;
// client.init("127.0.0.1", 8081, 3000);
client.init("127.0.0.1", 8082, 3000);
int iRet = 0;
@ -63,7 +68,9 @@ void httpCall(int excut_num)
{
TC_HttpResponse stHttpRsp;
stHttpReq.setPostRequest(sServer1, TC_Common::tostr(i), true);
iRet = stHttpReq.doRequest(stHttpRsp, 3000);
// iRet = stHttpReq.doRequest(client,stHttpRsp);
if (iRet != 0)
{
@ -158,7 +165,7 @@ int main(int argc, char *argv[])
if(param.call.empty()) param.call = "sync";
param.thread = TC_Common::strto<int>(option.getValue("thread"));
if(param.thread <= 0) param.thread = 1;
/*
_comm = new Communicator();
// TarsRollLogger::getInstance()->logger()->setLogLevel(6);
@ -175,7 +182,7 @@ int main(int argc, char *argv[])
proto.requestFunc = ProxyProtocol::http1Request;
proto.responseFunc = ProxyProtocol::http1Response;
param.servantPrx->tars_set_protocol(proto);
*/
int64_t start = TC_Common::now2us();
std::function<void(int)> func;

View File

@ -41,6 +41,7 @@ int HttpImp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
sBuf.assign(v.data(),v.size());
request.decode(sBuf);
// cout << request.getContent() << endl;
TC_HttpResponse rsp;
string s="hello";
rsp.setResponse(s.c_str(),s.size());

View File

@ -1,27 +1,27 @@
echo "run-http.bat"
EXE_PATH=$1
SRC_PATH=$2
set EXE_PATH=%1
set SRC_PATH=%2
echo ${EXE_PATH} ${SRC_PATH}
echo %EXE_PATH% %SRC_PATH%
killall -9 HttpServer.exe
sleep 1
taskkill /im HttpServer.exe /t /f
echo "start server: ${EXE_PATH}/HttpServer.exe --config=${SRC_PATH}/examples/HttpDemo/HttpServer/config.conf &"
timeout /T 1
${EXE_PATH}/HttpServer.exe --config=${SRC_PATH}/examples/HttpDemo/HttpServer/config.conf &
echo "start server: ${EXE_PATH}/HttpServer.exe --config=%SRC_PATH%/examples/HttpDemo/HttpServer/config.conf"
sleep 3
start /b %EXE_PATH%\\HttpServer.exe --config=%SRC_PATH%\\examples\\HttpDemo\\HttpServer\\config.conf
timeout /T 3
echo "client: ${EXE_PATH}/HttpClient.exe"
${EXE_PATH}/HttpClient.exe --count=10000 --thread=2 --call=basehttp
#${EXE_PATH}/HttpClient.exe --count=10000 --thread=2 --call=synchttp
%EXE_PATH%\\HttpClient.exe --count=10000 --thread=2 --call=basehttp
sleep 1
timeout /T 1
killall -9 HttpServer.exe
taskkill /im HttpServer.exe /t /f

View File

@ -7,9 +7,19 @@ killall -9 AuthServer
killall -9 CoroutineDemoAServer
killall -9 CoroutineDemoBServer
killall -9 CustomServer
killall -9 HttpServer
killall -9 HttpServer.exe
killall -9 Http2Server
killall -9 PushServer
killall -9 QuickStartDemo
killall -9 ProxyServer
killall -9 HttpClient.exe
killall -9 Http2Client
killall -9 AuthClient
killall -9 SSLClient
killall -9 CoroutineDemoClient
killall -9 testCoro
killall -9 testParallelCoro
killall -9 CustomClient
killall -9 PushClient
killall -9 QuickStartDemoClient

View File

@ -88,6 +88,8 @@ struct TC_HttpRequest_Exception : public TC_Http_Exception
~TC_HttpRequest_Exception() throw(){};
};
class TC_TCPClient;
class TC_HttpRequest;
class TC_HttpResponse;
@ -1019,7 +1021,7 @@ public:
TC_HttpRequest()
{
TC_HttpRequest::reset();
setUserAgent("TC_Http");
setUserAgent("Tars-Http");
}
/**
@ -1189,6 +1191,8 @@ public:
*/
int doRequest(TC_HttpResponse &stHttpRep, int iTimeout = 3000);
int doRequest(TC_TCPClient &client, TC_HttpResponse& stHttpRsp);
/**
* @brief get request type
*/

View File

@ -171,8 +171,6 @@ int epoll_sock_data_t::submit()
afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT;
}
// cout << "submit sock_data, registered_events:" << _registered_events << endl;
epoll_op_t* op = new epoll_op_t(this, afd_events);
DWORD result = epoll__afd_poll(_peer_sock, &op->_poll_info, &op->_overlapped);
@ -310,7 +308,6 @@ int epoll_port_data_t::epoll_add(SOCKET sock, struct epoll_event *ev)
sock_data->_registered_events = ev->events | EPOLLERR | EPOLLHUP;
sock_data->_user_data = ev->data.u64;
// cout << "add:" << sock_data->_user_data << endl;
return sock_data->submit();
}
@ -420,14 +417,14 @@ int epoll_port_data_t::epoll_wait(OVERLAPPED_ENTRY *entries, ULONG count, struct
{
epoll_op_t *op = CONTAINING_RECORD(entries[i].lpOverlapped, epoll_op_t, _overlapped);
epoll_sock_data_t *sock_data = op->_sock_data;
if (op->_generation < sock_data->_op_generation)
{
/* This op has been superseded. */
// cout << "op superseded" << endl;
// cout << "op superseded" << endl;
delete op;
continue;
}
/* Dequeued the most recent op. Reset generation and submitted_events. */
sock_data->_op_generation = 0;
@ -476,7 +473,7 @@ int epoll_port_data_t::epoll_wait(OVERLAPPED_ENTRY *entries, ULONG count, struct
continue;
}
int registered_events = sock_data->_registered_events;
// int registered_events = sock_data->_registered_events;
int reported_events = 0;
/* Convert afd events to epoll events. */
if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT))
@ -521,10 +518,10 @@ int epoll_port_data_t::epoll_wait(OVERLAPPED_ENTRY *entries, ULONG count, struct
{
struct epoll_event *ev = events + (num_events++);
ev->data.u64 = sock_data->_user_data;
// cout << "report:" << (ev->data.u64 >> 32) << endl;
ev->events = reported_events;
}
}
return num_events;
}
@ -597,18 +594,17 @@ int epoll_wait(epoll_t port_handle, struct epoll_event *events, int maxevents, i
/* Compute how much overlapped entries can be dequeued at most. */
DWORD max_entries = min(ARRAY_COUNT(entries), maxevents);
ULONG count;
// cout << "GetQueuedCompletionStatusEx begin" << endl;
ULONG count = 0;
DWORD result = GetQueuedCompletionStatusEx(port_data->getHandle(), entries, max_entries, &count, gqcs_timeout, TRUE);
// cout << "GetQueuedCompletionStatusEx:" << result << ", " << count << endl;
if (!result)
{
DWORD error = GetLastError();
if (error == WAIT_TIMEOUT)
{
// printf("%d, GetQueuedCompletionStatusEx:%d\n", std::this_thread::get_id() , count);
return 0;
}
else

View File

@ -315,12 +315,16 @@ int TC_TCPClient::checkSocket()
iRet = _socket.connectNoThrow(_ip, _port);
}
int n = TC_Exception::getSystemCode();
if(iRet < 0 && !TC_Socket::isInProgress())
{
_socket.close();
return EM_CONNECT;
}
// printf("%d, create socket1\n", std::this_thread::get_id());
int iRetCode = _epoller->wait(_timeout);
// printf("%d, create socket2, %d\n", std::this_thread::get_id(), iRetCode);
if (iRetCode < 0)
{
_socket.close();
@ -328,16 +332,17 @@ int TC_TCPClient::checkSocket()
}
else if (iRetCode == 0)
{
cout << "create:" << TC_Exception::parseError(n)<< endl;
_socket.close();
return EM_TIMEOUT;
}
else
{
/*
for(int i = 0; i < iRetCode; ++i)
{
const epoll_event& ev = _epoller->get(i);
if(TC_Epoller::errorEvent(ev))
// if (ev.events & EPOLLERR || ev.events & EPOLLHUP)
{
_socket.close();
return EM_CONNECT;
@ -346,7 +351,6 @@ int TC_TCPClient::checkSocket()
{
int iVal = 0;
socklen_t iLen = static_cast<socklen_t>(sizeof(int));
// if (::getsockopt(_socket.getfd(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&iVal), &iLen) == -1 || iVal)
if(_socket.getSockOpt(SO_ERROR, reinterpret_cast<char*>(&iVal), iLen) == -1 || iVal)
{
_socket.close();
@ -354,6 +358,7 @@ int TC_TCPClient::checkSocket()
}
}
}
*/
}
//设置为阻塞模式
@ -394,7 +399,12 @@ int TC_TCPClient::recv(char *sRecvBuffer, size_t &iRecvLen)
return iRet;
}
time_t us = TC_Common::now2us();
// printf("%d, create recv1\n", std::this_thread::get_id());
int iRetCode = _epoller->wait(_timeout);
// printf("%d, create recv2, %d\n", std::this_thread::get_id(), iRetCode);
if (iRetCode < 0)
{
_socket.close();
@ -402,6 +412,7 @@ int TC_TCPClient::recv(char *sRecvBuffer, size_t &iRecvLen)
}
else if (iRetCode == 0)
{
cout << "recv:" << TC_Exception::parseError(TC_Exception::getSystemCode()) << ", " << _timeout << ",cost: " << TC_Common::now2us()-us << endl;
_socket.close();
return EM_TIMEOUT;
}

View File

@ -877,8 +877,6 @@ int TC_EpollServer::Connection::parseProtocol(TC_NetWorkBuffer &rbuf)
int TC_EpollServer::Connection::recvTcp()
{
static std::atomic<int> totalRecv{0};
int recvCount = 0;
TC_NetWorkBuffer *rbuf = &_recvBuffer;
@ -893,6 +891,9 @@ int TC_EpollServer::Connection::recvTcp()
{
if (TC_Socket::isPending())
{
//#if TARGET_PLATFORM_WINDOWS
// _pBindAdapter->getNetThreadOfFd(_sock.getfd())->getEpoller()->mod(_sock.getfd(), getId(), EPOLLIN | EPOLLOUT);
//#endif
//没有数据了
break;
}
@ -976,6 +977,9 @@ int TC_EpollServer::Connection::recvUdp()
{
if (TC_Socket::isPending())//errno == EAGAIN)
{
//#if TARGET_PLATFORM_WINDOWS
// _pBindAdapter->getNetThreadOfFd(_sock.getfd())->getEpoller()->mod(_sock.getfd(), getId(), EPOLLIN | EPOLLOUT);
//#endif
//没有数据了
break;
}
@ -1028,7 +1032,6 @@ int TC_EpollServer::Connection::recv()
int TC_EpollServer::Connection::sendBuffer()
{
static std::atomic<int> totalSend{0};
while(!_sendBuffer.empty())
{
pair<const char*, size_t> data = _sendBuffer.getBufferPointer();
@ -1041,6 +1044,10 @@ int TC_EpollServer::Connection::sendBuffer()
{
if (TC_Socket::isPending())
{
//#if TARGET_PLATFORM_WINDOWS
// _pBindAdapter->getNetThreadOfFd(_sock.getfd())->getEpoller()->mod(_sock.getfd(), getId(), EPOLLIN | EPOLLOUT);
//#endif
break;
}
else
@ -1050,9 +1057,6 @@ int TC_EpollServer::Connection::sendBuffer()
}
}
totalSend += iBytesSent;
// cout << "totalSend:" << totalSend << endl;
if(iBytesSent > 0)
{
_sendBuffer.moveHeader(iBytesSent);
@ -1106,11 +1110,15 @@ int TC_EpollServer::Connection::sendUdp(const shared_ptr<SendContext> &sc)
{
//udp的直接发送即可
int iRet = _sock.sendto((const void *) sc->buffer()->buffer(), sc->buffer()->length(), sc->ip(), sc->port(), 0);
if (iRet < 0)
if (iRet < 0 && !TC_Socket::isPending())
{
_pBindAdapter->getEpollServer()->error("[TC_EpollServer::Connection] send [" + _ip + ":" + TC_Common::tostr(_port) + "] error");
return -1;
}
//#if TARGET_PLATFORM_WINDOWS
// _pBindAdapter->getNetThreadOfFd(_sock.getfd())->getEpoller()->mod(_sock.getfd(), getId(), EPOLLIN | EPOLLOUT);
//#endif
return 0;
}

View File

@ -2153,6 +2153,59 @@ void TC_HttpRequest::setScheme(const char * sScheme)
_httpURL._sScheme = sScheme;
}
int TC_HttpRequest::doRequest(TC_TCPClient& tcpClient, TC_HttpResponse& stHttpRsp)
{
//只支持短连接模式
setConnection("close");
string sSendBuffer = encode();
int iRet = tcpClient.send(sSendBuffer.c_str(), sSendBuffer.length());
if (iRet != TC_ClientSocket::EM_SUCCESS)
{
return iRet;
}
stHttpRsp.reset();
string sBuffer;
char* sTmpBuffer = new char[10240];
size_t iRecvLen = 10240;
while (true)
{
iRecvLen = 10240;
iRet = tcpClient.recv(sTmpBuffer, iRecvLen);
if (iRet == TC_ClientSocket::EM_SUCCESS)
sBuffer.append(sTmpBuffer, iRecvLen);
switch (iRet)
{
case TC_ClientSocket::EM_SUCCESS:
if (stHttpRsp.incrementDecode(sBuffer))
{
delete[]sTmpBuffer;
return TC_ClientSocket::EM_SUCCESS;
}
continue;
case TC_ClientSocket::EM_CLOSE:
delete[]sTmpBuffer;
stHttpRsp.incrementDecode(sBuffer);
return TC_ClientSocket::EM_SUCCESS;
default:
delete[]sTmpBuffer;
return iRet;
}
}
assert(true);
return 0;
}
int TC_HttpRequest::doRequest(TC_HttpResponse &stHttpRsp, int iTimeout)
{
//只支持短连接模式
@ -2166,6 +2219,10 @@ int TC_HttpRequest::doRequest(TC_HttpResponse &stHttpRsp, int iTimeout)
getHostPort(sHost, iPort);
TC_TCPClient tcpClient;
// if (_client == NULL)
// {
// _client = new TC_TCPClient();
// }
tcpClient.init(sHost, iPort, iTimeout);
int iRet = tcpClient.send(sSendBuffer.c_str(), sSendBuffer.length());