diff --git a/core/async/Processor.hpp b/core/async/Processor.hpp index 9e62819b..e66ea408 100644 --- a/core/async/Processor.hpp +++ b/core/async/Processor.hpp @@ -47,6 +47,9 @@ public: void addCoroutine(AbstractCoroutine* coroutine); void addWaitingCoroutine(AbstractCoroutine* coroutine); bool iterate(v_int32 numIterations); + bool isEmpty() { + return m_activeQueue.first == nullptr && m_waitingQueue.first == nullptr; + } }; diff --git a/web/server/AsyncHttpConnectionHandler.cpp b/web/server/AsyncHttpConnectionHandler.cpp index 9ee4d98d..c79ab9e0 100644 --- a/web/server/AsyncHttpConnectionHandler.cpp +++ b/web/server/AsyncHttpConnectionHandler.cpp @@ -65,12 +65,27 @@ void AsyncHttpConnectionHandler::Task::run(){ oatpp::async::Processor processor; while(true) { + + /* Load all waiting connections into processor */ consumeConnections(processor); + + /* Process all, and check for incoming connections once in 100 iterations */ while (processor.iterate(100)) { consumeConnections(processor); } - //OATPP_LOGD("task", "sleeping"); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + std::unique_lock lock(m_taskMutex); + if(processor.isEmpty()) { + /* No tasks in the processor. Wait for incoming connections */ + while (m_connections.getFirstNode() == nullptr) { + m_taskCondition.wait(lock); + } + } else { + /* There is still something in slow queue. Wait and get back to processing */ + /* Waiting for IO is not Applicable here as slow queue may contain NON-IO tasks */ + m_taskCondition.wait_for(lock, std::chrono::milliseconds(10)); + } + } } diff --git a/web/server/AsyncHttpConnectionHandler.hpp b/web/server/AsyncHttpConnectionHandler.hpp index 0b4f9ec6..ac36dfbb 100644 --- a/web/server/AsyncHttpConnectionHandler.hpp +++ b/web/server/AsyncHttpConnectionHandler.hpp @@ -44,6 +44,9 @@ #include "oatpp/core/data/buffer/IOBuffer.hpp" #include "oatpp/core/async/Processor.hpp" +#include +#include + namespace oatpp { namespace web { namespace server { class AsyncHttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler { @@ -61,6 +64,8 @@ private: HttpRouter* m_router; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors* m_requestInterceptors; + std::mutex m_taskMutex; + std::condition_variable m_taskCondition; public: Task(HttpRouter* router, const std::shared_ptr& errorHandler, @@ -83,6 +88,7 @@ private: void addConnection(const std::shared_ptr& connectionState){ oatpp::concurrency::SpinLock lock(m_atom); m_connections.pushBack(connectionState); + m_taskCondition.notify_one(); } }; @@ -91,7 +97,7 @@ private: std::shared_ptr m_router; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors m_requestInterceptors; - v_int32 m_taskBalancer; + std::atomic m_taskBalancer; v_int32 m_threadCount; std::shared_ptr* m_tasks; public: