diff --git a/src/oatpp/core/async/CoroutineWaitList.cpp b/src/oatpp/core/async/CoroutineWaitList.cpp index bb1fdc57..f73684e2 100644 --- a/src/oatpp/core/async/CoroutineWaitList.cpp +++ b/src/oatpp/core/async/CoroutineWaitList.cpp @@ -34,12 +34,19 @@ namespace oatpp { namespace async { CoroutineWaitList::CoroutineWaitList(CoroutineWaitList&& other) { { - std::lock_guard lock{other.m_data->m_lock}; - m_data->m_list = std::move(other.m_data->m_list); + std::lock_guard lock{other.m_lock}; + m_list = std::move(other.m_list); } { - std::lock_guard lock{other.m_data->m_timeoutsLock}; - m_data->m_coroutinesWithTimeout = std::move(other.m_data->m_coroutinesWithTimeout); + std::lock_guard lock{other.m_timeoutsLock}; + m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout); + + m_timeoutCheckingProcessors = std::move(other.m_timeoutCheckingProcessors); + for (const std::pair& entry : m_timeoutCheckingProcessors) { + Processor* processor = entry.first; + processor->removeCoroutineWaitListWithTimeouts(std::addressof(other)); + processor->addCoroutineWaitListWithTimeouts(this); + } } } @@ -47,27 +54,27 @@ CoroutineWaitList::~CoroutineWaitList() { notifyAll(); } -void CoroutineWaitList::checkCoroutinesForTimeouts(const std::shared_ptr& data) { +void CoroutineWaitList::checkCoroutinesForTimeouts() { std::set timedoutCoroutines; { - std::lock_guard lock{data->m_timeoutsLock}; + std::lock_guard lock{m_timeoutsLock}; const auto currentTimeSinceEpochMS = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - const auto newEndIt = std::remove_if(std::begin(data->m_coroutinesWithTimeout), std::end(data->m_coroutinesWithTimeout), [&](const std::pair& entry) { + const auto newEndIt = std::remove_if(std::begin(m_coroutinesWithTimeout), std::end(m_coroutinesWithTimeout), [&](const std::pair& entry) { if (currentTimeSinceEpochMS > entry.second) { timedoutCoroutines.insert(entry.first); return true; } return false; }); - data->m_coroutinesWithTimeout.erase(newEndIt, std::end(data->m_coroutinesWithTimeout)); + m_coroutinesWithTimeout.erase(newEndIt, std::end(m_coroutinesWithTimeout)); } if (!timedoutCoroutines.empty()) { - std::lock_guard lock{ data->m_lock}; + std::lock_guard lock{ m_lock}; CoroutineHandle* prev = nullptr; - CoroutineHandle* curr = data->m_list.first; + CoroutineHandle* curr = m_list.first; while (curr) { if (timedoutCoroutines.count(curr)) { - data->m_list.cutEntry(curr, prev); + m_list.cutEntry(curr, prev); curr->_PP->pushOneTask(curr); } prev = curr; @@ -77,60 +84,99 @@ void CoroutineWaitList::checkCoroutinesForTimeouts(const std::shared_ptr& } void CoroutineWaitList::setListener(Listener* listener) { - m_data->m_listener = listener; + m_listener = listener; } void CoroutineWaitList::pushFront(CoroutineHandle* coroutine) { { - std::lock_guard lock(m_data->m_lock); - m_data->m_list.pushFront(coroutine); + std::lock_guard lock(m_lock); + m_list.pushFront(coroutine); } - if(m_data->m_listener != nullptr) { - m_data->m_listener->onNewItem(*this); + if(m_listener != nullptr) { + m_listener->onNewItem(*this); } } void CoroutineWaitList::pushFront(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) { { - std::lock_guard lock{m_data->m_timeoutsLock}; - m_data->m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS); + std::lock_guard lock{m_timeoutsLock}; + m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS); + if (++m_timeoutCheckingProcessors[coroutine->_PP] == 1) { + coroutine->_PP->addCoroutineWaitListWithTimeouts(this); + } } pushFront(coroutine); } void CoroutineWaitList::pushBack(CoroutineHandle* coroutine) { { - std::lock_guard lock(m_data->m_lock); - m_data->m_list.pushBack(coroutine); + std::lock_guard lock(m_lock); + m_list.pushBack(coroutine); } - if(m_data->m_listener != nullptr) { - m_data->m_listener->onNewItem(*this); + if(m_listener != nullptr) { + m_listener->onNewItem(*this); } } void CoroutineWaitList::pushBack(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) { { - std::lock_guard lock{m_data->m_timeoutsLock}; - m_data->m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS); + std::lock_guard lock{m_timeoutsLock}; + m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS); + if (++m_timeoutCheckingProcessors[coroutine->_PP] == 1) { + coroutine->_PP->addCoroutineWaitListWithTimeouts(this); + } } pushBack(coroutine); } void CoroutineWaitList::notifyFirst() { - std::lock_guard lock(m_data->m_lock); - if(m_data->m_list.first) { - auto coroutine = m_data->m_list.popFront(); - coroutine->_PP->pushOneTask(coroutine); + std::lock_guard lock{m_lock}; + if(m_list.first) { + removeFirstCoroutine(); } } void CoroutineWaitList::notifyAll() { - std::lock_guard lock(m_data->m_lock); - while (!m_data->m_list.empty()) { - auto curr = m_data->m_list.popFront(); - curr->_PP->pushOneTask(curr); + std::lock_guard lock(m_lock); + while (!m_list.empty()) { + removeFirstCoroutine(); } } +void CoroutineWaitList::removeFirstCoroutine() { + auto coroutine = m_list.popFront(); + coroutine->_PP->pushOneTask(coroutine); + + std::lock_guard lock{m_timeoutsLock}; + if (--m_timeoutCheckingProcessors[coroutine->_PP] <= 0) { + coroutine->_PP->removeCoroutineWaitListWithTimeouts(this); + m_timeoutCheckingProcessors.erase(coroutine->_PP); + } +} + +CoroutineWaitList& CoroutineWaitList::operator=(CoroutineWaitList&& other) { + if (this == std::addressof(other)) return *this; + + notifyAll(); + + { + std::lock_guard otherLock{other.m_lock}; + std::lock_guard myLock{m_lock}; + m_list = std::move(other.m_list); + } + { + std::lock_guard otherLock{other.m_timeoutsLock}; + std::lock_guard myLock{m_timeoutsLock}; + m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout); + + m_timeoutCheckingProcessors = std::move(other.m_timeoutCheckingProcessors); + for (const std::pair& entry : m_timeoutCheckingProcessors) { + Processor* processor = entry.first; + processor->removeCoroutineWaitListWithTimeouts(std::addressof(other)); + processor->addCoroutineWaitListWithTimeouts(this); + } + } + return *this; + } }} \ No newline at end of file diff --git a/src/oatpp/core/async/CoroutineWaitList.hpp b/src/oatpp/core/async/CoroutineWaitList.hpp index e681bc55..f8f90c45 100644 --- a/src/oatpp/core/async/CoroutineWaitList.hpp +++ b/src/oatpp/core/async/CoroutineWaitList.hpp @@ -30,6 +30,7 @@ #include "oatpp/core/collection/FastQueue.hpp" #include "oatpp/core/concurrency/SpinLock.hpp" +#include #include #include #include @@ -59,18 +60,18 @@ public: virtual void onNewItem(CoroutineWaitList& list) = 0; }; private: - struct Data { - oatpp::collection::FastQueue m_list; - oatpp::concurrency::SpinLock m_lock; - Listener* m_listener = nullptr; - - std::vector> m_coroutinesWithTimeout; - oatpp::concurrency::SpinLock m_timeoutsLock; - }; + oatpp::collection::FastQueue m_list; + oatpp::concurrency::SpinLock m_lock; + Listener* m_listener = nullptr; + + std::map m_timeoutCheckingProcessors; + std::vector> m_coroutinesWithTimeout; + oatpp::concurrency::SpinLock m_timeoutsLock; - std::shared_ptr m_data{std::make_shared()}; private: - static void checkCoroutinesForTimeouts(const std::shared_ptr& data); + void checkCoroutinesForTimeouts(); + + void removeFirstCoroutine(); protected: /* @@ -147,23 +148,7 @@ public: */ void notifyAll(); - CoroutineWaitList& operator=(CoroutineWaitList&& other) { - if (this == std::addressof(other)) return *this; - - notifyAll(); - - { - std::lock_guard otherLock{other.m_data->m_lock}; - std::lock_guard myLock{m_data->m_lock}; - m_data->m_list = std::move(other.m_data->m_list); - } - { - std::lock_guard otherLock{other.m_data->m_timeoutsLock}; - std::lock_guard myLock{m_data->m_timeoutsLock}; - m_data->m_coroutinesWithTimeout = std::move(other.m_data->m_coroutinesWithTimeout); - } - return *this; - } + CoroutineWaitList& operator=(CoroutineWaitList&& other); }; diff --git a/src/oatpp/core/async/Processor.cpp b/src/oatpp/core/async/Processor.cpp index 5be2ec97..cbdb1b61 100644 --- a/src/oatpp/core/async/Processor.cpp +++ b/src/oatpp/core/async/Processor.cpp @@ -38,16 +38,8 @@ void Processor::checkCoroutinesForTimeouts() { if (!m_running) return; } - auto curr = m_coroutineWaitListsWithTimeouts.rbegin(); - const auto end = m_coroutineWaitListsWithTimeouts.rend(); - for (; curr != end; ++curr) { - std::shared_ptr data = curr->lock(); - if (!data) { - m_coroutineWaitListsWithTimeouts.erase(std::next(curr).base()); - continue; - } - - CoroutineWaitList::checkCoroutinesForTimeouts(data); + for (CoroutineWaitList* waitList : m_coroutineWaitListsWithTimeouts) { + waitList->checkCoroutinesForTimeouts(); } } @@ -55,6 +47,19 @@ void Processor::checkCoroutinesForTimeouts() { } } +void Processor::addCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList) { + { + std::lock_guard lock{m_coroutineWaitListsWithTimeoutsMutex}; + m_coroutineWaitListsWithTimeouts.insert(waitList); + } + m_coroutineWaitListsWithTimeoutsCV.notify_one(); +} + +void Processor::removeCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList) { + std::lock_guard lock{m_coroutineWaitListsWithTimeoutsMutex}; + m_coroutineWaitListsWithTimeouts.erase(waitList); +} + void Processor::addWorker(const std::shared_ptr& worker) { switch(worker->getType()) { @@ -127,12 +132,6 @@ void Processor::addCoroutine(CoroutineHandle* coroutine) { case Action::TYPE_WAIT_LIST_WITH_TIMEOUT: coroutine->_SCH_A = Action::createActionByType(Action::TYPE_NONE); action.m_data.waitListWithTimeout.waitList->pushBack(coroutine, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS); - - { - std::lock_guard lock{m_coroutineWaitListsWithTimeoutsMutex}; - m_coroutineWaitListsWithTimeouts.emplace_back(action.m_data.waitListWithTimeout.waitList->m_data); - } - m_coroutineWaitListsWithTimeoutsCV.notify_one(); break; default: @@ -251,12 +250,6 @@ bool Processor::iterate(v_int32 numIterations) { CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE); m_queue.popFront(); action.m_data.waitListWithTimeout.waitList->pushBack(CP, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS); - - { - std::lock_guard lock{m_coroutineWaitListsWithTimeoutsMutex}; - m_coroutineWaitListsWithTimeouts.emplace_back(action.m_data.waitListWithTimeout.waitList->m_data); - } - m_coroutineWaitListsWithTimeoutsCV.notify_one(); break; default: diff --git a/src/oatpp/core/async/Processor.hpp b/src/oatpp/core/async/Processor.hpp index 9a0beb95..fcca13e4 100644 --- a/src/oatpp/core/async/Processor.hpp +++ b/src/oatpp/core/async/Processor.hpp @@ -30,10 +30,11 @@ #include "./CoroutineWaitList.hpp" #include "oatpp/core/collection/FastQueue.hpp" -#include -#include -#include #include +#include +#include +#include +#include namespace oatpp { namespace async { @@ -43,6 +44,7 @@ namespace oatpp { namespace async { * Do not use bare processor to run coroutines. Use &id:oatpp::async::Executor; instead;. */ class Processor { + friend class CoroutineWaitList; private: class TaskSubmission { @@ -117,10 +119,12 @@ private: std::mutex m_coroutineWaitListsWithTimeoutsMutex; std::condition_variable m_coroutineWaitListsWithTimeoutsCV; - std::vector> m_coroutineWaitListsWithTimeouts; + std::set m_coroutineWaitListsWithTimeouts; std::thread m_coroutineWaitListTimeoutChecker{&Processor::checkCoroutinesForTimeouts, this}; void checkCoroutinesForTimeouts(); + void addCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList); + void removeCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList); private: