Removed the need for holding CoroutineWaitList members in shared_ptr

This commit is contained in:
MHaselmaier 2021-05-15 12:13:46 +02:00
parent 0d1cbbf8f8
commit 47d5f783a3
4 changed files with 113 additions and 85 deletions

View File

@ -34,12 +34,19 @@ namespace oatpp { namespace async {
CoroutineWaitList::CoroutineWaitList(CoroutineWaitList&& other) { CoroutineWaitList::CoroutineWaitList(CoroutineWaitList&& other) {
{ {
std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_data->m_lock}; std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_lock};
m_data->m_list = std::move(other.m_data->m_list); m_list = std::move(other.m_list);
} }
{ {
std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_data->m_timeoutsLock}; std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_timeoutsLock};
m_data->m_coroutinesWithTimeout = std::move(other.m_data->m_coroutinesWithTimeout); m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout);
m_timeoutCheckingProcessors = std::move(other.m_timeoutCheckingProcessors);
for (const std::pair<Processor*, v_int64>& entry : m_timeoutCheckingProcessors) {
Processor* processor = entry.first;
processor->removeCoroutineWaitListWithTimeouts(std::addressof(other));
processor->addCoroutineWaitListWithTimeouts(this);
}
} }
} }
@ -47,27 +54,27 @@ CoroutineWaitList::~CoroutineWaitList() {
notifyAll(); notifyAll();
} }
void CoroutineWaitList::checkCoroutinesForTimeouts(const std::shared_ptr<Data>& data) { void CoroutineWaitList::checkCoroutinesForTimeouts() {
std::set<CoroutineHandle*> timedoutCoroutines; std::set<CoroutineHandle*> timedoutCoroutines;
{ {
std::lock_guard<oatpp::concurrency::SpinLock> lock{data->m_timeoutsLock}; std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
const auto currentTimeSinceEpochMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); const auto currentTimeSinceEpochMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
const auto newEndIt = std::remove_if(std::begin(data->m_coroutinesWithTimeout), std::end(data->m_coroutinesWithTimeout), [&](const std::pair<CoroutineHandle*, v_int64>& entry) { const auto newEndIt = std::remove_if(std::begin(m_coroutinesWithTimeout), std::end(m_coroutinesWithTimeout), [&](const std::pair<CoroutineHandle*, v_int64>& entry) {
if (currentTimeSinceEpochMS > entry.second) { if (currentTimeSinceEpochMS > entry.second) {
timedoutCoroutines.insert(entry.first); timedoutCoroutines.insert(entry.first);
return true; return true;
} }
return false; return false;
}); });
data->m_coroutinesWithTimeout.erase(newEndIt, std::end(data->m_coroutinesWithTimeout)); m_coroutinesWithTimeout.erase(newEndIt, std::end(m_coroutinesWithTimeout));
} }
if (!timedoutCoroutines.empty()) { if (!timedoutCoroutines.empty()) {
std::lock_guard<oatpp::concurrency::SpinLock> lock{ data->m_lock}; std::lock_guard<oatpp::concurrency::SpinLock> lock{ m_lock};
CoroutineHandle* prev = nullptr; CoroutineHandle* prev = nullptr;
CoroutineHandle* curr = data->m_list.first; CoroutineHandle* curr = m_list.first;
while (curr) { while (curr) {
if (timedoutCoroutines.count(curr)) { if (timedoutCoroutines.count(curr)) {
data->m_list.cutEntry(curr, prev); m_list.cutEntry(curr, prev);
curr->_PP->pushOneTask(curr); curr->_PP->pushOneTask(curr);
} }
prev = curr; prev = curr;
@ -77,60 +84,99 @@ void CoroutineWaitList::checkCoroutinesForTimeouts(const std::shared_ptr<Data>&
} }
void CoroutineWaitList::setListener(Listener* listener) { void CoroutineWaitList::setListener(Listener* listener) {
m_data->m_listener = listener; m_listener = listener;
} }
void CoroutineWaitList::pushFront(CoroutineHandle* coroutine) { void CoroutineWaitList::pushFront(CoroutineHandle* coroutine) {
{ {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_data->m_lock); std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
m_data->m_list.pushFront(coroutine); m_list.pushFront(coroutine);
} }
if(m_data->m_listener != nullptr) { if(m_listener != nullptr) {
m_data->m_listener->onNewItem(*this); m_listener->onNewItem(*this);
} }
} }
void CoroutineWaitList::pushFront(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) { void CoroutineWaitList::pushFront(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) {
{ {
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_data->m_timeoutsLock}; std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
m_data->m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS); m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
if (++m_timeoutCheckingProcessors[coroutine->_PP] == 1) {
coroutine->_PP->addCoroutineWaitListWithTimeouts(this);
}
} }
pushFront(coroutine); pushFront(coroutine);
} }
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine) { void CoroutineWaitList::pushBack(CoroutineHandle* coroutine) {
{ {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_data->m_lock); std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
m_data->m_list.pushBack(coroutine); m_list.pushBack(coroutine);
} }
if(m_data->m_listener != nullptr) { if(m_listener != nullptr) {
m_data->m_listener->onNewItem(*this); m_listener->onNewItem(*this);
} }
} }
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) { void CoroutineWaitList::pushBack(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) {
{ {
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_data->m_timeoutsLock}; std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
m_data->m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS); m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
if (++m_timeoutCheckingProcessors[coroutine->_PP] == 1) {
coroutine->_PP->addCoroutineWaitListWithTimeouts(this);
}
} }
pushBack(coroutine); pushBack(coroutine);
} }
void CoroutineWaitList::notifyFirst() { void CoroutineWaitList::notifyFirst() {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_data->m_lock); std::lock_guard<oatpp::concurrency::SpinLock> lock{m_lock};
if(m_data->m_list.first) { if(m_list.first) {
auto coroutine = m_data->m_list.popFront(); removeFirstCoroutine();
coroutine->_PP->pushOneTask(coroutine);
} }
} }
void CoroutineWaitList::notifyAll() { void CoroutineWaitList::notifyAll() {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_data->m_lock); std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
while (!m_data->m_list.empty()) { while (!m_list.empty()) {
auto curr = m_data->m_list.popFront(); removeFirstCoroutine();
curr->_PP->pushOneTask(curr);
} }
} }
void CoroutineWaitList::removeFirstCoroutine() {
auto coroutine = m_list.popFront();
coroutine->_PP->pushOneTask(coroutine);
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
if (--m_timeoutCheckingProcessors[coroutine->_PP] <= 0) {
coroutine->_PP->removeCoroutineWaitListWithTimeouts(this);
m_timeoutCheckingProcessors.erase(coroutine->_PP);
}
}
CoroutineWaitList& CoroutineWaitList::operator=(CoroutineWaitList&& other) {
if (this == std::addressof(other)) return *this;
notifyAll();
{
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_lock};
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_lock};
m_list = std::move(other.m_list);
}
{
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_timeoutsLock};
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_timeoutsLock};
m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout);
m_timeoutCheckingProcessors = std::move(other.m_timeoutCheckingProcessors);
for (const std::pair<Processor*, v_int64>& entry : m_timeoutCheckingProcessors) {
Processor* processor = entry.first;
processor->removeCoroutineWaitListWithTimeouts(std::addressof(other));
processor->addCoroutineWaitListWithTimeouts(this);
}
}
return *this;
}
}} }}

View File

@ -30,6 +30,7 @@
#include "oatpp/core/collection/FastQueue.hpp" #include "oatpp/core/collection/FastQueue.hpp"
#include "oatpp/core/concurrency/SpinLock.hpp" #include "oatpp/core/concurrency/SpinLock.hpp"
#include <map>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <utility> #include <utility>
@ -59,18 +60,18 @@ public:
virtual void onNewItem(CoroutineWaitList& list) = 0; virtual void onNewItem(CoroutineWaitList& list) = 0;
}; };
private: private:
struct Data { oatpp::collection::FastQueue<CoroutineHandle> m_list;
oatpp::collection::FastQueue<CoroutineHandle> m_list; oatpp::concurrency::SpinLock m_lock;
oatpp::concurrency::SpinLock m_lock; Listener* m_listener = nullptr;
Listener* m_listener = nullptr;
std::map<Processor*, v_int64> m_timeoutCheckingProcessors;
std::vector<std::pair<CoroutineHandle*, v_int64>> m_coroutinesWithTimeout; std::vector<std::pair<CoroutineHandle*, v_int64>> m_coroutinesWithTimeout;
oatpp::concurrency::SpinLock m_timeoutsLock; oatpp::concurrency::SpinLock m_timeoutsLock;
};
std::shared_ptr<Data> m_data{std::make_shared<Data>()};
private: private:
static void checkCoroutinesForTimeouts(const std::shared_ptr<Data>& data); void checkCoroutinesForTimeouts();
void removeFirstCoroutine();
protected: protected:
/* /*
@ -147,23 +148,7 @@ public:
*/ */
void notifyAll(); void notifyAll();
CoroutineWaitList& operator=(CoroutineWaitList&& other) { CoroutineWaitList& operator=(CoroutineWaitList&& other);
if (this == std::addressof(other)) return *this;
notifyAll();
{
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_data->m_lock};
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_data->m_lock};
m_data->m_list = std::move(other.m_data->m_list);
}
{
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_data->m_timeoutsLock};
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_data->m_timeoutsLock};
m_data->m_coroutinesWithTimeout = std::move(other.m_data->m_coroutinesWithTimeout);
}
return *this;
}
}; };

View File

@ -38,16 +38,8 @@ void Processor::checkCoroutinesForTimeouts() {
if (!m_running) return; if (!m_running) return;
} }
auto curr = m_coroutineWaitListsWithTimeouts.rbegin(); for (CoroutineWaitList* waitList : m_coroutineWaitListsWithTimeouts) {
const auto end = m_coroutineWaitListsWithTimeouts.rend(); waitList->checkCoroutinesForTimeouts();
for (; curr != end; ++curr) {
std::shared_ptr<CoroutineWaitList::Data> data = curr->lock();
if (!data) {
m_coroutineWaitListsWithTimeouts.erase(std::next(curr).base());
continue;
}
CoroutineWaitList::checkCoroutinesForTimeouts(data);
} }
} }
@ -55,6 +47,19 @@ void Processor::checkCoroutinesForTimeouts() {
} }
} }
void Processor::addCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList) {
{
std::lock_guard<std::mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
m_coroutineWaitListsWithTimeouts.insert(waitList);
}
m_coroutineWaitListsWithTimeoutsCV.notify_one();
}
void Processor::removeCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList) {
std::lock_guard<std::mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
m_coroutineWaitListsWithTimeouts.erase(waitList);
}
void Processor::addWorker(const std::shared_ptr<worker::Worker>& worker) { void Processor::addWorker(const std::shared_ptr<worker::Worker>& worker) {
switch(worker->getType()) { switch(worker->getType()) {
@ -127,12 +132,6 @@ void Processor::addCoroutine(CoroutineHandle* coroutine) {
case Action::TYPE_WAIT_LIST_WITH_TIMEOUT: case Action::TYPE_WAIT_LIST_WITH_TIMEOUT:
coroutine->_SCH_A = Action::createActionByType(Action::TYPE_NONE); coroutine->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
action.m_data.waitListWithTimeout.waitList->pushBack(coroutine, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS); action.m_data.waitListWithTimeout.waitList->pushBack(coroutine, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS);
{
std::lock_guard<std::mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
m_coroutineWaitListsWithTimeouts.emplace_back(action.m_data.waitListWithTimeout.waitList->m_data);
}
m_coroutineWaitListsWithTimeoutsCV.notify_one();
break; break;
default: default:
@ -251,12 +250,6 @@ bool Processor::iterate(v_int32 numIterations) {
CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE); CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
m_queue.popFront(); m_queue.popFront();
action.m_data.waitListWithTimeout.waitList->pushBack(CP, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS); action.m_data.waitListWithTimeout.waitList->pushBack(CP, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS);
{
std::lock_guard<std::mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
m_coroutineWaitListsWithTimeouts.emplace_back(action.m_data.waitListWithTimeout.waitList->m_data);
}
m_coroutineWaitListsWithTimeoutsCV.notify_one();
break; break;
default: default:

View File

@ -30,10 +30,11 @@
#include "./CoroutineWaitList.hpp" #include "./CoroutineWaitList.hpp"
#include "oatpp/core/collection/FastQueue.hpp" #include "oatpp/core/collection/FastQueue.hpp"
#include <mutex>
#include <list>
#include <vector>
#include <condition_variable> #include <condition_variable>
#include <list>
#include <mutex>
#include <set>
#include <vector>
namespace oatpp { namespace async { namespace oatpp { namespace async {
@ -43,6 +44,7 @@ namespace oatpp { namespace async {
* Do not use bare processor to run coroutines. Use &id:oatpp::async::Executor; instead;. * Do not use bare processor to run coroutines. Use &id:oatpp::async::Executor; instead;.
*/ */
class Processor { class Processor {
friend class CoroutineWaitList;
private: private:
class TaskSubmission { class TaskSubmission {
@ -117,10 +119,12 @@ private:
std::mutex m_coroutineWaitListsWithTimeoutsMutex; std::mutex m_coroutineWaitListsWithTimeoutsMutex;
std::condition_variable m_coroutineWaitListsWithTimeoutsCV; std::condition_variable m_coroutineWaitListsWithTimeoutsCV;
std::vector<std::weak_ptr<CoroutineWaitList::Data>> m_coroutineWaitListsWithTimeouts; std::set<CoroutineWaitList*> m_coroutineWaitListsWithTimeouts;
std::thread m_coroutineWaitListTimeoutChecker{&Processor::checkCoroutinesForTimeouts, this}; std::thread m_coroutineWaitListTimeoutChecker{&Processor::checkCoroutinesForTimeouts, this};
void checkCoroutinesForTimeouts(); void checkCoroutinesForTimeouts();
void addCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList);
void removeCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList);
private: private: