better AsyncHttpConnectionHandler processing loop

This commit is contained in:
lganzzzo 2018-09-16 18:11:59 +03:00
parent 2eb807f1d7
commit b5338c72b1
3 changed files with 27 additions and 3 deletions

View File

@ -47,6 +47,9 @@ public:
void addCoroutine(AbstractCoroutine* coroutine); void addCoroutine(AbstractCoroutine* coroutine);
void addWaitingCoroutine(AbstractCoroutine* coroutine); void addWaitingCoroutine(AbstractCoroutine* coroutine);
bool iterate(v_int32 numIterations); bool iterate(v_int32 numIterations);
bool isEmpty() {
return m_activeQueue.first == nullptr && m_waitingQueue.first == nullptr;
}
}; };

View File

@ -65,12 +65,27 @@ void AsyncHttpConnectionHandler::Task::run(){
oatpp::async::Processor processor; oatpp::async::Processor processor;
while(true) { while(true) {
/* Load all waiting connections into processor */
consumeConnections(processor); consumeConnections(processor);
/* Process all, and check for incoming connections once in 100 iterations */
while (processor.iterate(100)) { while (processor.iterate(100)) {
consumeConnections(processor); consumeConnections(processor);
} }
//OATPP_LOGD("task", "sleeping");
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::unique_lock<std::mutex> lock(m_taskMutex);
if(processor.isEmpty()) {
/* No tasks in the processor. Wait for incoming connections */
while (m_connections.getFirstNode() == nullptr) {
m_taskCondition.wait(lock);
}
} else {
/* There is still something in slow queue. Wait and get back to processing */
/* Waiting for IO is not Applicable here as slow queue may contain NON-IO tasks */
m_taskCondition.wait_for(lock, std::chrono::milliseconds(10));
}
} }
} }

View File

@ -44,6 +44,9 @@
#include "oatpp/core/data/buffer/IOBuffer.hpp" #include "oatpp/core/data/buffer/IOBuffer.hpp"
#include "oatpp/core/async/Processor.hpp" #include "oatpp/core/async/Processor.hpp"
#include <mutex>
#include <condition_variable>
namespace oatpp { namespace web { namespace server { namespace oatpp { namespace web { namespace server {
class AsyncHttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler { class AsyncHttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler {
@ -61,6 +64,8 @@ private:
HttpRouter* m_router; HttpRouter* m_router;
std::shared_ptr<handler::ErrorHandler> m_errorHandler; std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors* m_requestInterceptors; HttpProcessor::RequestInterceptors* m_requestInterceptors;
std::mutex m_taskMutex;
std::condition_variable m_taskCondition;
public: public:
Task(HttpRouter* router, Task(HttpRouter* router,
const std::shared_ptr<handler::ErrorHandler>& errorHandler, const std::shared_ptr<handler::ErrorHandler>& errorHandler,
@ -83,6 +88,7 @@ private:
void addConnection(const std::shared_ptr<HttpProcessor::ConnectionState>& connectionState){ void addConnection(const std::shared_ptr<HttpProcessor::ConnectionState>& connectionState){
oatpp::concurrency::SpinLock lock(m_atom); oatpp::concurrency::SpinLock lock(m_atom);
m_connections.pushBack(connectionState); m_connections.pushBack(connectionState);
m_taskCondition.notify_one();
} }
}; };
@ -91,7 +97,7 @@ private:
std::shared_ptr<HttpRouter> m_router; std::shared_ptr<HttpRouter> m_router;
std::shared_ptr<handler::ErrorHandler> m_errorHandler; std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors m_requestInterceptors; HttpProcessor::RequestInterceptors m_requestInterceptors;
v_int32 m_taskBalancer; std::atomic<v_int32> m_taskBalancer;
v_int32 m_threadCount; v_int32 m_threadCount;
std::shared_ptr<Task>* m_tasks; std::shared_ptr<Task>* m_tasks;
public: public: