normal & ssl all succ

This commit is contained in:
jarodruan 2020-02-14 15:08:17 +08:00
parent a20b3e1e01
commit 50f74d5982
7 changed files with 118 additions and 76 deletions

View File

@ -28,6 +28,11 @@
cert = ../examples/AuthDemo/certs/client.crt cert = ../examples/AuthDemo/certs/client.crt
#can be empty #can be empty
key = ../examples/AuthDemo/certs/client.key key = ../examples/AuthDemo/certs/client.key
<TestApp.AuthServer.AuthObjAdapter>
accesskey=tars-test-user
secretkey=123456
</TestApp.AuthServer.AuthObjAdapter>
</client> </client>
</application> </application>
</tars> </tars>

View File

@ -25,7 +25,7 @@ using namespace TestApp;
Communicator* _comm; Communicator* _comm;
static string helloObj = "TestApp.AuthServer.AuthObj@ssl -h 127.0.0.1 -p 9016"; static string helloObj = "TestApp.AuthServer.AuthObj@ssl -h 127.0.0.1 -p 9016 -e 1";
struct Param struct Param
{ {

View File

@ -51,7 +51,7 @@
#log obj #log obj
# log = tars.tarslog.LogObj # log = tars.tarslog.LogObj
<HelloAdapter> <TestApp.AuthServer.AuthObjAdapter>
#ip:port:timeout #ip:port:timeout
endpoint = tcp -h 127.0.0.1 -p 9016 -t 10000 endpoint = tcp -h 127.0.0.1 -p 9016 -t 10000
#allow ip #allow ip
@ -66,7 +66,9 @@
queuecap = 1000000 queuecap = 1000000
#tars protocol #tars protocol
protocol = tars protocol = tars
</HelloAdapter> accesskey=tars-test-user
secretkey=123456
</TestApp.AuthServer.AuthObjAdapter>
</server> </server>
</application> </application>
</tars> </tars>

View File

@ -73,6 +73,7 @@ if(WIN32)
COMMAND ../servant/script/busybox.exe bash ../examples/scripts/run-http.bat COMMAND ../servant/script/busybox.exe bash ../examples/scripts/run-http.bat
COMMAND ../servant/script/busybox.exe bash ../examples/scripts/run-co.bat COMMAND ../servant/script/busybox.exe bash ../examples/scripts/run-co.bat
COMMAND ../servant/script/busybox.exe bash ../examples/scripts/run-push.bat COMMAND ../servant/script/busybox.exe bash ../examples/scripts/run-push.bat
COMMAND ../servant/script/busybox.exe bash ../examples/scripts/run-auth.bat
COMMENT "call run all ${CMAKE_BINARY_DIR}") COMMENT "call run all ${CMAKE_BINARY_DIR}")
add_custom_target(run-kill add_custom_target(run-kill
@ -136,11 +137,12 @@ else(WIN32)
add_custom_target(run-all add_custom_target(run-all
WORKING_DIRECTORY ${CMAKE_BINARY_DIR} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
DEPENDS run-quick-start run-http run-co run-push DEPENDS run-quick-start run-http run-co run-push run-auth
COMMAND sh ../examples/scripts/run-quick-start.sh COMMAND sh ../examples/scripts/run-quick-start.sh
COMMAND sh ../examples/scripts/run-http.sh COMMAND sh ../examples/scripts/run-http.sh
COMMAND sh ../examples/scripts/run-co.sh COMMAND sh ../examples/scripts/run-co.sh
COMMAND sh ../examples/scripts/run-push.sh COMMAND sh ../examples/scripts/run-push.sh
COMMAND sh ../examples/scripts/run-auth.sh
COMMENT "call run all") COMMENT "call run all")
add_custom_target(run-kill add_custom_target(run-kill

View File

@ -533,6 +533,10 @@ int TcpTransceiver::doResponse()
rbuf = _openssl->recvBuffer(); rbuf = _openssl->recvBuffer();
} }
else
{
rbuf->addBuffer(buff, iRet);
}
#else #else
rbuf->addBuffer(buff, iRet); rbuf->addBuffer(buff, iRet);
#endif #endif
@ -916,11 +920,7 @@ int UdpTransceiver::doResponse()
} }
while (recv > 0); while (recv > 0);
// _adapterProxy->getObjProxy()->getCommunicatorEpoll()->modFd(_fd, &_fdInfo, EPOLLIN | EPOLLOUT);
// cout << "----------------------- recv:" << recv << ", errno:" << strerror(errno)<< endl;
return 0; return 0;
// return done.empty()?0:1;
} }
int UdpTransceiver::send(const void* buf, uint32_t len, uint32_t flag) int UdpTransceiver::send(const void* buf, uint32_t len, uint32_t flag)

View File

@ -34,6 +34,14 @@ public:
typedef D queue_type; typedef D queue_type;
/**
* @brief , .
*
* @param t
* @return bool: true, , false,
*/
T front();
/** /**
* @brief , . * @brief , .
* *
@ -42,6 +50,13 @@ public:
*/ */
bool pop_front(T& t); bool pop_front(T& t);
/**
* @brief , .
*
* @return bool: true, , false,
*/
bool pop_front();
/** /**
* @brief . * @brief .
* *
@ -118,6 +133,13 @@ protected:
TC_SpinLock _mutex; TC_SpinLock _mutex;
}; };
template<typename T, typename D> T TC_CasQueue<T, D>::front()
{
TC_LockT<TC_SpinLock> lock (_mutex);
return _queue.front();
}
template<typename T, typename D> bool TC_CasQueue<T, D>::pop_front(T& t) template<typename T, typename D> bool TC_CasQueue<T, D>::pop_front(T& t)
{ {
TC_LockT<TC_SpinLock> lock (_mutex); TC_LockT<TC_SpinLock> lock (_mutex);
@ -134,6 +156,22 @@ template<typename T, typename D> bool TC_CasQueue<T, D>::pop_front(T& t)
return true; return true;
} }
template<typename T, typename D> bool TC_CasQueue<T, D>::pop_front()
{
TC_LockT<TC_SpinLock> lock (_mutex);
if (_queue.empty())
{
return false;
}
_queue.pop_front();
assert(_size > 0);
--_size;
return true;
}
template<typename T, typename D> void TC_CasQueue<T, D>::push_back(const T& t) template<typename T, typename D> void TC_CasQueue<T, D>::push_back(const T& t)
{ {
TC_LockT<TC_SpinLock> lock (_mutex); TC_LockT<TC_SpinLock> lock (_mutex);

View File

@ -837,7 +837,6 @@ int TC_EpollServer::Connection::parseProtocol(TC_NetWorkBuffer &rbuf)
TC_NetWorkBuffer::PACKET_TYPE b = _pBindAdapter->getProtocol()(rbuf, ro); TC_NetWorkBuffer::PACKET_TYPE b = _pBindAdapter->getProtocol()(rbuf, ro);
if(b == TC_NetWorkBuffer::PACKET_LESS) if(b == TC_NetWorkBuffer::PACKET_LESS)
{ {
//包不完全
break; break;
} }
else if(b == TC_NetWorkBuffer::PACKET_FULL) else if(b == TC_NetWorkBuffer::PACKET_FULL)
@ -890,7 +889,7 @@ int TC_EpollServer::Connection::recvTcp()
if (iBytesReceived < 0) if (iBytesReceived < 0)
{ {
if (TC_Socket::isPending())//errno == EAGAIN) if (TC_Socket::isPending())
{ {
//没有数据了 //没有数据了
break; break;
@ -910,14 +909,9 @@ int TC_EpollServer::Connection::recvTcp()
} }
else else
{ {
#if TARS_SSL #if TARS_SSL
if (_pBindAdapter->getEndpoint().isSSL()) if (_pBindAdapter->getEndpoint().isSSL())
{ {
// const char * data = _recvBuffer.mergeBuffers();
// cout << "parseProtocol:" << _recvBuffer.getBufferLength() << endl;
int ret = _openssl->read(buffer, iBytesReceived, _sendBuffer); int ret = _openssl->read(buffer, iBytesReceived, _sendBuffer);
if (ret != 0) if (ret != 0)
{ {
@ -934,9 +928,15 @@ int TC_EpollServer::Connection::recvTcp()
rbuf = _openssl->recvBuffer(); rbuf = _openssl->recvBuffer();
} }
} }
else
{
rbuf->addBuffer(buffer, iBytesReceived);
}
#else #else
rbuf->addBuffer(buffer, iBytesReceived); rbuf->addBuffer(buffer, iBytesReceived);
#endif #endif
//字符串太长时, 强制解析协议 //字符串太长时, 强制解析协议
if (rbuf->getBufferLength() > 8192) { if (rbuf->getBufferLength() > 8192) {
parseProtocol(*rbuf); parseProtocol(*rbuf);
@ -1070,10 +1070,21 @@ int TC_EpollServer::Connection::sendBuffer()
int TC_EpollServer::Connection::sendTcp(const shared_ptr<SendContext> &sc) int TC_EpollServer::Connection::sendTcp(const shared_ptr<SendContext> &sc)
{ {
//tcp的, 将buffer放到队列末尾 if(!sc->buffer()->empty())
if (!sc->buffer()->empty()) { {
_sendBuffer.addBuffer(sc->buffer()); if (getBindAdapter()->getEndpoint().isSSL())
} {
assert(_openssl->isHandshaked());
int ret = _openssl->write(sc->buffer()->buffer(), sc->buffer()->length(), _sendBuffer);
if (ret != 0)
return -1; // should not happen
}
else
{
_sendBuffer.addBuffer(sc->buffer());
}
}
return sendBuffer(); return sendBuffer();
} }
@ -1302,7 +1313,6 @@ vector<TC_EpollServer::ConnStatus> TC_EpollServer::ConnectionList::getConnStatus
vector<TC_EpollServer::ConnStatus> v; vector<TC_EpollServer::ConnStatus> v;
TC_LockT<TC_SpinLock> lock(_mutex); TC_LockT<TC_SpinLock> lock(_mutex);
// TC_ThreadLock::Lock lock(*this);
for(size_t i = 1; i <= _total; i++) for(size_t i = 1; i <= _total; i++)
{ {
@ -1569,11 +1579,9 @@ void TC_EpollServer::NetThread::close(const shared_ptr<RecvContext> &data)
_notify.notify(); _notify.notify();
} }
// int64_t us;
void TC_EpollServer::NetThread::send(const shared_ptr<SendContext> &data) void TC_EpollServer::NetThread::send(const shared_ptr<SendContext> &data)
{ {
if(_threadId == std::this_thread::get_id()) { if(_threadId == std::this_thread::get_id()) {
// assert(false);
//发送包线程和网络线程是同一个线程,直接发送即可 //发送包线程和网络线程是同一个线程,直接发送即可
Connection *cPtr = getConnectionPtr(data->uid()); Connection *cPtr = getConnectionPtr(data->uid());
if(cPtr) if(cPtr)
@ -1597,65 +1605,52 @@ void TC_EpollServer::NetThread::processPipe()
{ {
_notifySignal = false; _notifySignal = false;
shared_ptr<SendContext> sc; while(!_sbuffer.empty())
while(_sbuffer.pop_front(sc))
{ {
shared_ptr<SendContext> sc = _sbuffer.front();
Connection *cPtr = getConnectionPtr(sc->uid()); Connection *cPtr = getConnectionPtr(sc->uid());
if (!cPtr) if (cPtr) {
{ switch (sc->cmd()) {
continue; case 'c': {
if (cPtr->setClose()) {
delConnection(cPtr, true, EM_SERVER_CLOSE);
}
break;
}
case 's': {
int ret = 0;
#if TARS_SSL
if (cPtr->getBindAdapter()->getEndpoint().isSSL()) {
if (!cPtr->_openssl->isHandshaked()) {
return;
}
//
// ret = cPtr->_openssl->write(sc->buffer()->buffer(), sc->buffer()->length(), cPtr->_sendBuffer);
// if (ret != 0)
// break; // should not happen
//
// cPtr->sendBuffer();
}
ret = cPtr->send(sc);
#else
ret = cPtr->send(sc);
#endif
if (ret < 0) {
delConnection(cPtr, true, (ret == -1) ? EM_CLIENT_CLOSE : EM_SERVER_CLOSE);
}
else {
_list.refresh(sc->uid(), cPtr->getTimeout() + TNOW);
}
break;
}
default:
assert(false);
}
} }
switch(sc->cmd()) _sbuffer.pop_front();
{
case 'c':
{
if(cPtr->setClose())
{
delConnection(cPtr,true,EM_SERVER_CLOSE);
}
break;
}
case 's':
{
int ret = 0;
#if TARS_SSL
if (cPtr->getBindAdapter()->getEndpoint().isSSL() && cPtr->_openssl->isHandshaked())
{
// std::string out = cPtr->_openssl->Write((*it)->buffer.data(), (*it)->buffer.size());
// if (cPtr->_openssl->HasError())
// break; // should not happen
//
// (*it)->buffer = out;
ret = cPtr->_openssl->write(sc->buffer()->buffer(), sc->buffer()->length(), cPtr->_sendBuffer);
if (ret != 0)
break; // should not happen
cPtr->sendBuffer();
// (*it)->buffer = out;
}
else
{
ret = cPtr->send(sc);
}
#else
ret = cPtr->send(sc);
#endif
if(ret < 0)
{
delConnection(cPtr,true,(ret==-1)?EM_CLIENT_CLOSE:EM_SERVER_CLOSE);
}
else
{
_list.refresh(sc->uid(), cPtr->getTimeout() + TNOW);
}
break;
}
default:
assert(false);
}
} }
} }