From 693f1a1123c9a4db07d5d3768fbb1a3abac3061d Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Mon, 2 Sep 2019 03:48:31 +0300 Subject: [PATCH] Feature async::Executor. Suggest threads count. --- CMakeLists.txt | 3 -- src/oatpp/core/async/Executor.cpp | 39 +++++++++++++++++-- src/oatpp/core/async/Executor.hpp | 15 ++++--- src/oatpp/core/base/Config.hpp | 7 ---- src/oatpp/core/base/Environment.cpp | 1 - src/oatpp/core/base/Environment.hpp | 3 +- .../web/server/AsyncHttpConnectionHandler.cpp | 2 - .../web/server/AsyncHttpConnectionHandler.hpp | 6 +-- test/oatpp/web/FullAsyncClientTest.cpp | 4 +- 9 files changed, 50 insertions(+), 30 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 51153b13..30eda8ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,6 @@ option(OATPP_DISABLE_POOL_ALLOCATIONS "This will make oatpp::base::memory::Memor set(OATPP_THREAD_HARDWARE_CONCURRENCY "AUTO" CACHE STRING "Predefined value for function oatpp::concurrency::Thread::getHardwareConcurrency()") set(OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT "10" CACHE STRING "Number of shards of ThreadDistributedMemoryPool") -set(OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT "2" CACHE STRING "oatpp::async::Executor default number of threads") option(OATPP_COMPAT_BUILD_NO_THREAD_LOCAL "Disable 'thread_local' feature" OFF) @@ -43,7 +42,6 @@ message("OATPP_DISABLE_ENV_OBJECT_COUNTERS=${OATPP_DISABLE_ENV_OBJECT_COUNTERS}" message("OATPP_DISABLE_POOL_ALLOCATIONS=${OATPP_DISABLE_POOL_ALLOCATIONS}") message("OATPP_THREAD_HARDWARE_CONCURRENCY=${OATPP_THREAD_HARDWARE_CONCURRENCY}") message("OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT=${OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT}") -message("OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT=${OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT}") message("OATPP_COMPAT_BUILD_NO_THREAD_LOCAL=${OATPP_COMPAT_BUILD_NO_THREAD_LOCAL}") @@ -64,7 +62,6 @@ endif() add_definitions ( -DOATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT=${OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT} - -DOATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT=${OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT} ) if(OATPP_COMPAT_BUILD_NO_THREAD_LOCAL) diff --git a/src/oatpp/core/async/Executor.cpp b/src/oatpp/core/async/Executor.cpp index be0d5f04..4b8b55d8 100644 --- a/src/oatpp/core/async/Executor.cpp +++ b/src/oatpp/core/async/Executor.cpp @@ -78,12 +78,14 @@ void Executor::SubmissionProcessor::detach() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Executor -const v_int32 Executor::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT; - Executor::Executor(v_int32 processorWorkersCount, v_int32 ioWorkersCount, v_int32 timerWorkersCount, bool useIOEventWorker) : m_balancer(0) { + processorWorkersCount = chooseProcessorWorkersCount(processorWorkersCount); + ioWorkersCount = chooseIOWorkersCount(processorWorkersCount, ioWorkersCount); + timerWorkersCount = chooseTimerWorkersCount(timerWorkersCount); + for(v_int32 i = 0; i < processorWorkersCount; i ++) { m_processorWorkers.push_back(std::make_shared()); } @@ -112,7 +114,38 @@ Executor::Executor(v_int32 processorWorkersCount, v_int32 ioWorkersCount, v_int3 } -Executor::~Executor() { +v_int32 Executor::chooseProcessorWorkersCount(v_int32 processorWorkersCount) { + if(processorWorkersCount >= 1) { + return processorWorkersCount; + } + if(processorWorkersCount == VALUE_SUGGESTED) { + return oatpp::concurrency::getHardwareConcurrency(); + } + throw std::runtime_error("[oatpp::async::Executor::chooseProcessorWorkersCount()]: Error. Invalid processor workers count specified."); +} + +v_int32 Executor::chooseIOWorkersCount(v_int32 processorWorkersCount, v_int32 ioWorkersCount) { + if(ioWorkersCount >= 1) { + return ioWorkersCount; + } + if(ioWorkersCount == VALUE_SUGGESTED) { + v_int32 count = processorWorkersCount >> 1; + if(count == 0) { + count = 1; + } + return count; + } + throw std::runtime_error("[oatpp::async::Executor::chooseIOWorkersCount()]: Error. Invalid I/O workers count specified."); +} + +v_int32 Executor::chooseTimerWorkersCount(v_int32 timerWorkersCount) { + if(timerWorkersCount >= 1) { + return timerWorkersCount; + } + if(timerWorkersCount == VALUE_SUGGESTED) { + return 1; + } + throw std::runtime_error("[oatpp::async::Executor::chooseTimerWorkersCount()]: Error. Invalid timer workers count specified."); } void Executor::linkWorkers(const std::vector>& workers) { diff --git a/src/oatpp/core/async/Executor.hpp b/src/oatpp/core/async/Executor.hpp index c12a543f..ffabdae7 100644 --- a/src/oatpp/core/async/Executor.hpp +++ b/src/oatpp/core/async/Executor.hpp @@ -81,15 +81,18 @@ private: public: /** - * Default number of threads to run coroutines. + * Special value to indicate that Executor should choose it's own the value of specified parameter. */ - static const v_int32 THREAD_NUM_DEFAULT; + static constexpr const v_int32 VALUE_SUGGESTED = -1000; private: std::atomic m_balancer; private: std::vector> m_processorWorkers; std::vector> m_allWorkers; private: + static v_int32 chooseProcessorWorkersCount(v_int32 processorWorkersCount); + static v_int32 chooseIOWorkersCount(v_int32 processorWorkersCount, v_int32 ioWorkersCount); + static v_int32 chooseTimerWorkersCount(v_int32 timerWorkersCount); void linkWorkers(const std::vector>& workers); public: @@ -99,9 +102,9 @@ public: * @param ioWorkersCount - number of I/O processing workers. * @param timerWorkersCount - number of timer processing workers. */ - Executor(v_int32 processorWorkersCount = THREAD_NUM_DEFAULT, - v_int32 ioWorkersCount = 1, - v_int32 timerWorkersCount = 1, + Executor(v_int32 processorWorkersCount = VALUE_SUGGESTED, + v_int32 ioWorkersCount = VALUE_SUGGESTED, + v_int32 timerWorkersCount = VALUE_SUGGESTED, #if defined(WIN32) || defined(_WIN32) bool useIOEventWorker = false #else @@ -112,7 +115,7 @@ public: /** * Non-virtual Destructor. */ - ~Executor(); + ~Executor() = default; /** * Join all worker-threads. diff --git a/src/oatpp/core/base/Config.hpp b/src/oatpp/core/base/Config.hpp index 6a16e710..ddaef323 100644 --- a/src/oatpp/core/base/Config.hpp +++ b/src/oatpp/core/base/Config.hpp @@ -56,13 +56,6 @@ #define OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT 10 #endif -/** - * oatpp::async::Executor default number of threads - */ -#ifndef OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT - #define OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT 2 -#endif - /** * Disable `thread_local` feature.
* See https://github.com/oatpp/oatpp/issues/81 diff --git a/src/oatpp/core/base/Environment.cpp b/src/oatpp/core/base/Environment.cpp index d25c7567..cbad2541 100644 --- a/src/oatpp/core/base/Environment.cpp +++ b/src/oatpp/core/base/Environment.cpp @@ -254,7 +254,6 @@ void Environment::printCompilationConfig() { #endif OATPP_LOGD("oatpp/Config", "OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT=%d", OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT); - OATPP_LOGD("oatpp/Config", "OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT=%d\n", OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT); } diff --git a/src/oatpp/core/base/Environment.hpp b/src/oatpp/core/base/Environment.hpp index 7b67ffa1..d5707764 100644 --- a/src/oatpp/core/base/Environment.hpp +++ b/src/oatpp/core/base/Environment.hpp @@ -311,8 +311,7 @@ public: * - `OATPP_DISABLE_ENV_OBJECT_COUNTERS`
* - `OATPP_DISABLE_POOL_ALLOCATIONS`
* - `OATPP_THREAD_HARDWARE_CONCURRENCY`
- * - `OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT`
- * - `OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT` + * - `OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT` */ static void printCompilationConfig(); diff --git a/src/oatpp/web/server/AsyncHttpConnectionHandler.cpp b/src/oatpp/web/server/AsyncHttpConnectionHandler.cpp index f2a1f5fc..6ac8fecf 100644 --- a/src/oatpp/web/server/AsyncHttpConnectionHandler.cpp +++ b/src/oatpp/web/server/AsyncHttpConnectionHandler.cpp @@ -26,8 +26,6 @@ namespace oatpp { namespace web { namespace server { -const v_int32 AsyncHttpConnectionHandler::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT; - AsyncHttpConnectionHandler::AsyncHttpConnectionHandler(const std::shared_ptr& router, v_int32 threadCount) : m_executor(std::make_shared(threadCount)) diff --git a/src/oatpp/web/server/AsyncHttpConnectionHandler.hpp b/src/oatpp/web/server/AsyncHttpConnectionHandler.hpp index 508574ae..8fe22e78 100644 --- a/src/oatpp/web/server/AsyncHttpConnectionHandler.hpp +++ b/src/oatpp/web/server/AsyncHttpConnectionHandler.hpp @@ -43,8 +43,6 @@ namespace oatpp { namespace web { namespace server { class AsyncHttpConnectionHandler : public base::Countable, public network::server::ConnectionHandler { private: typedef oatpp::web::protocol::http::incoming::BodyDecoder BodyDecoder; -public: - static const v_int32 THREAD_NUM_DEFAULT; private: std::shared_ptr m_executor; private: @@ -53,12 +51,12 @@ private: HttpProcessor::RequestInterceptors m_requestInterceptors; std::shared_ptr m_bodyDecoder; // TODO make bodyDecoder configurable here public: - AsyncHttpConnectionHandler(const std::shared_ptr& router, v_int32 threadCount = THREAD_NUM_DEFAULT); + AsyncHttpConnectionHandler(const std::shared_ptr& router, v_int32 threadCount = oatpp::async::Executor::VALUE_SUGGESTED); AsyncHttpConnectionHandler(const std::shared_ptr& router, const std::shared_ptr& executor); public: static std::shared_ptr createShared(const std::shared_ptr& router, - v_int32 threadCount = THREAD_NUM_DEFAULT); + v_int32 threadCount = oatpp::async::Executor::VALUE_SUGGESTED); static std::shared_ptr createShared(const std::shared_ptr& router, const std::shared_ptr& executor); diff --git a/test/oatpp/web/FullAsyncClientTest.cpp b/test/oatpp/web/FullAsyncClientTest.cpp index c2d37f76..20844c0a 100644 --- a/test/oatpp/web/FullAsyncClientTest.cpp +++ b/test/oatpp/web/FullAsyncClientTest.cpp @@ -150,9 +150,9 @@ public: Action handleError(const std::shared_ptr& error) override { if(error->is()) { auto e = static_cast(error.get()); - OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_echoBodyAsync::handleError()]", "AsyncIOError. %s, %d", e->what(), e->getCode()); + OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_getRootAsync::handleError()]", "AsyncIOError. %s, %d", e->what(), e->getCode()); } else { - OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_echoBodyAsync::handleError()]", "Error. %s", error->what()); + OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_getRootAsync::handleError()]", "Error. %s", error->what()); } return propagateError(); }