Feature async::Executor. Suggest threads count.

This commit is contained in:
lganzzzo 2019-09-02 03:48:31 +03:00
parent 549ba11381
commit 693f1a1123
9 changed files with 50 additions and 30 deletions

View File

@ -30,7 +30,6 @@ option(OATPP_DISABLE_POOL_ALLOCATIONS "This will make oatpp::base::memory::Memor
set(OATPP_THREAD_HARDWARE_CONCURRENCY "AUTO" CACHE STRING "Predefined value for function oatpp::concurrency::Thread::getHardwareConcurrency()")
set(OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT "10" CACHE STRING "Number of shards of ThreadDistributedMemoryPool")
set(OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT "2" CACHE STRING "oatpp::async::Executor default number of threads")
option(OATPP_COMPAT_BUILD_NO_THREAD_LOCAL "Disable 'thread_local' feature" OFF)
@ -43,7 +42,6 @@ message("OATPP_DISABLE_ENV_OBJECT_COUNTERS=${OATPP_DISABLE_ENV_OBJECT_COUNTERS}"
message("OATPP_DISABLE_POOL_ALLOCATIONS=${OATPP_DISABLE_POOL_ALLOCATIONS}")
message("OATPP_THREAD_HARDWARE_CONCURRENCY=${OATPP_THREAD_HARDWARE_CONCURRENCY}")
message("OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT=${OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT}")
message("OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT=${OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT}")
message("OATPP_COMPAT_BUILD_NO_THREAD_LOCAL=${OATPP_COMPAT_BUILD_NO_THREAD_LOCAL}")
@ -64,7 +62,6 @@ endif()
add_definitions (
-DOATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT=${OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT}
-DOATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT=${OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT}
)
if(OATPP_COMPAT_BUILD_NO_THREAD_LOCAL)

View File

@ -78,12 +78,14 @@ void Executor::SubmissionProcessor::detach() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Executor
const v_int32 Executor::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT;
Executor::Executor(v_int32 processorWorkersCount, v_int32 ioWorkersCount, v_int32 timerWorkersCount, bool useIOEventWorker)
: m_balancer(0)
{
processorWorkersCount = chooseProcessorWorkersCount(processorWorkersCount);
ioWorkersCount = chooseIOWorkersCount(processorWorkersCount, ioWorkersCount);
timerWorkersCount = chooseTimerWorkersCount(timerWorkersCount);
for(v_int32 i = 0; i < processorWorkersCount; i ++) {
m_processorWorkers.push_back(std::make_shared<SubmissionProcessor>());
}
@ -112,7 +114,38 @@ Executor::Executor(v_int32 processorWorkersCount, v_int32 ioWorkersCount, v_int3
}
Executor::~Executor() {
v_int32 Executor::chooseProcessorWorkersCount(v_int32 processorWorkersCount) {
if(processorWorkersCount >= 1) {
return processorWorkersCount;
}
if(processorWorkersCount == VALUE_SUGGESTED) {
return oatpp::concurrency::getHardwareConcurrency();
}
throw std::runtime_error("[oatpp::async::Executor::chooseProcessorWorkersCount()]: Error. Invalid processor workers count specified.");
}
v_int32 Executor::chooseIOWorkersCount(v_int32 processorWorkersCount, v_int32 ioWorkersCount) {
if(ioWorkersCount >= 1) {
return ioWorkersCount;
}
if(ioWorkersCount == VALUE_SUGGESTED) {
v_int32 count = processorWorkersCount >> 1;
if(count == 0) {
count = 1;
}
return count;
}
throw std::runtime_error("[oatpp::async::Executor::chooseIOWorkersCount()]: Error. Invalid I/O workers count specified.");
}
v_int32 Executor::chooseTimerWorkersCount(v_int32 timerWorkersCount) {
if(timerWorkersCount >= 1) {
return timerWorkersCount;
}
if(timerWorkersCount == VALUE_SUGGESTED) {
return 1;
}
throw std::runtime_error("[oatpp::async::Executor::chooseTimerWorkersCount()]: Error. Invalid timer workers count specified.");
}
void Executor::linkWorkers(const std::vector<std::shared_ptr<worker::Worker>>& workers) {

View File

@ -81,15 +81,18 @@ private:
public:
/**
* Default number of threads to run coroutines.
* Special value to indicate that Executor should choose it's own the value of specified parameter.
*/
static const v_int32 THREAD_NUM_DEFAULT;
static constexpr const v_int32 VALUE_SUGGESTED = -1000;
private:
std::atomic<v_word32> m_balancer;
private:
std::vector<std::shared_ptr<SubmissionProcessor>> m_processorWorkers;
std::vector<std::shared_ptr<worker::Worker>> m_allWorkers;
private:
static v_int32 chooseProcessorWorkersCount(v_int32 processorWorkersCount);
static v_int32 chooseIOWorkersCount(v_int32 processorWorkersCount, v_int32 ioWorkersCount);
static v_int32 chooseTimerWorkersCount(v_int32 timerWorkersCount);
void linkWorkers(const std::vector<std::shared_ptr<worker::Worker>>& workers);
public:
@ -99,9 +102,9 @@ public:
* @param ioWorkersCount - number of I/O processing workers.
* @param timerWorkersCount - number of timer processing workers.
*/
Executor(v_int32 processorWorkersCount = THREAD_NUM_DEFAULT,
v_int32 ioWorkersCount = 1,
v_int32 timerWorkersCount = 1,
Executor(v_int32 processorWorkersCount = VALUE_SUGGESTED,
v_int32 ioWorkersCount = VALUE_SUGGESTED,
v_int32 timerWorkersCount = VALUE_SUGGESTED,
#if defined(WIN32) || defined(_WIN32)
bool useIOEventWorker = false
#else
@ -112,7 +115,7 @@ public:
/**
* Non-virtual Destructor.
*/
~Executor();
~Executor() = default;
/**
* Join all worker-threads.

View File

@ -56,13 +56,6 @@
#define OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT 10
#endif
/**
* oatpp::async::Executor default number of threads
*/
#ifndef OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT
#define OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT 2
#endif
/**
* Disable `thread_local` feature. <br>
* See https://github.com/oatpp/oatpp/issues/81

View File

@ -254,7 +254,6 @@ void Environment::printCompilationConfig() {
#endif
OATPP_LOGD("oatpp/Config", "OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT=%d", OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT);
OATPP_LOGD("oatpp/Config", "OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT=%d\n", OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT);
}

View File

@ -311,8 +311,7 @@ public:
* - `OATPP_DISABLE_ENV_OBJECT_COUNTERS`<br>
* - `OATPP_DISABLE_POOL_ALLOCATIONS`<br>
* - `OATPP_THREAD_HARDWARE_CONCURRENCY`<br>
* - `OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT`<br>
* - `OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT`
* - `OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT`
*/
static void printCompilationConfig();

View File

@ -26,8 +26,6 @@
namespace oatpp { namespace web { namespace server {
const v_int32 AsyncHttpConnectionHandler::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT;
AsyncHttpConnectionHandler::AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router,
v_int32 threadCount)
: m_executor(std::make_shared<oatpp::async::Executor>(threadCount))

View File

@ -43,8 +43,6 @@ namespace oatpp { namespace web { namespace server {
class AsyncHttpConnectionHandler : public base::Countable, public network::server::ConnectionHandler {
private:
typedef oatpp::web::protocol::http::incoming::BodyDecoder BodyDecoder;
public:
static const v_int32 THREAD_NUM_DEFAULT;
private:
std::shared_ptr<oatpp::async::Executor> m_executor;
private:
@ -53,12 +51,12 @@ private:
HttpProcessor::RequestInterceptors m_requestInterceptors;
std::shared_ptr<const BodyDecoder> m_bodyDecoder; // TODO make bodyDecoder configurable here
public:
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router, v_int32 threadCount = THREAD_NUM_DEFAULT);
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router, v_int32 threadCount = oatpp::async::Executor::VALUE_SUGGESTED);
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router, const std::shared_ptr<oatpp::async::Executor>& executor);
public:
static std::shared_ptr<AsyncHttpConnectionHandler> createShared(const std::shared_ptr<HttpRouter>& router,
v_int32 threadCount = THREAD_NUM_DEFAULT);
v_int32 threadCount = oatpp::async::Executor::VALUE_SUGGESTED);
static std::shared_ptr<AsyncHttpConnectionHandler> createShared(const std::shared_ptr<HttpRouter>& router,
const std::shared_ptr<oatpp::async::Executor>& executor);

View File

@ -150,9 +150,9 @@ public:
Action handleError(const std::shared_ptr<const Error>& error) override {
if(error->is<oatpp::data::AsyncIOError>()) {
auto e = static_cast<const oatpp::data::AsyncIOError*>(error.get());
OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_echoBodyAsync::handleError()]", "AsyncIOError. %s, %d", e->what(), e->getCode());
OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_getRootAsync::handleError()]", "AsyncIOError. %s, %d", e->what(), e->getCode());
} else {
OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_echoBodyAsync::handleError()]", "Error. %s", error->what());
OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_getRootAsync::handleError()]", "Error. %s", error->what());
}
return propagateError();
}