Async: Introduce ConditionVariable.

This commit is contained in:
Leonid Stryzhevskyi 2023-01-15 02:43:43 +02:00
parent f078b09759
commit d9fd5854bf
9 changed files with 396 additions and 20 deletions

View File

@ -0,0 +1,145 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "ConditionVariable.hpp"
namespace oatpp { namespace async {
ConditionVariable::ConditionVariable()
: m_notified(false)
{
m_list.setListener(this);
}
void ConditionVariable::onNewItem(CoroutineWaitList& list) {
if(m_notified) {
m_list.notifyFirst();
}
}
CoroutineStarter ConditionVariable::wait(Lock* lock, std::function<bool()> condition) {
class WaitCoroutine : public Coroutine<WaitCoroutine> {
private:
ConditionVariable* m_cv;
oatpp::async::LockGuard m_lockGuard;
std::function<bool()> m_condition;
public:
WaitCoroutine(ConditionVariable* cv, Lock* lock, std::function<bool()> condition)
: m_cv(cv)
, m_lockGuard(lock)
, m_condition(condition)
{}
Action act() override {
return m_lockGuard.lockAsyncInline(yieldTo(&WaitCoroutine::onLocked));
}
Action onLocked() {
if(m_lockGuard.owns_lock()) {
if (m_condition()) {
return finish();
}
m_cv->m_notified = false;
m_lockGuard.unlock();
} else {
return yieldTo(&WaitCoroutine::act);
}
return Action::createWaitListAction(&m_cv->m_list);
}
};
return WaitCoroutine::start(this, lock, condition);
}
CoroutineStarter ConditionVariable::waitUntil(Lock* lock, std::function<bool()> condition, const std::chrono::steady_clock::time_point& timeoutTime) {
class WaitCoroutine : public Coroutine<WaitCoroutine> {
private:
ConditionVariable* m_cv;
oatpp::async::LockGuard m_lockGuard;
std::function<bool()> m_condition;
std::chrono::steady_clock::time_point m_timeoutTime;
public:
WaitCoroutine(ConditionVariable* cv, Lock* lock, std::function<bool()> condition, const std::chrono::steady_clock::time_point& timeoutTime)
: m_cv(cv)
, m_lockGuard(lock)
, m_condition(condition)
, m_timeoutTime(timeoutTime)
{}
Action act() override {
return m_lockGuard.lockAsyncInline(yieldTo(&WaitCoroutine::onLocked));
}
Action onLocked() {
OATPP_LOGD("WaitCoroutine", "Waked!")
if(m_lockGuard.owns_lock()) {
OATPP_LOGD("WaitCoroutine", "Locked")
if (m_condition()) {
return finish();
}
m_cv->m_notified = false;
m_lockGuard.unlock();
OATPP_LOGD("WaitCoroutine", "UnLocked")
} else {
if(std::chrono::steady_clock::now() > m_timeoutTime) {
return finish();
}
return yieldTo(&WaitCoroutine::act);
}
if(std::chrono::steady_clock::now() > m_timeoutTime) {
return finish();
}
OATPP_LOGD("WaitCoroutine", "Sleeeeep")
return Action::createWaitListActionWithTimeout(&m_cv->m_list, m_timeoutTime);
}
};
return WaitCoroutine::start(this, lock, condition, timeoutTime);
}
CoroutineStarter ConditionVariable::waitFor(Lock* lock, std::function<bool()> condition, const std::chrono::duration<v_int64, std::micro>& timeout) {
return waitUntil(lock, condition, std::chrono::steady_clock::now() + timeout);
}
void ConditionVariable::notifyFirst() {
m_notified = true;
m_list.notifyFirst();
}
void ConditionVariable::notifyAll() {
m_notified = true;
m_list.notifyAll();
}
}}

View File

@ -0,0 +1,73 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_async_ConditionVariable_hpp
#define oatpp_async_ConditionVariable_hpp
#include "./Lock.hpp"
namespace oatpp { namespace async {
class ConditionVariable : private CoroutineWaitList::Listener {
public:
class Condition {
public:
virtual bool check() = 0;
};
public:
template<typename F>
class ConditionTemplate : public Condition {
private:
F lambda;
public:
ConditionTemplate(const F& f) : lambda(f) {};
bool check() override {
return lambda();
}
};
private:
std::atomic_bool m_notified;
CoroutineWaitList m_list;
private:
void onNewItem(CoroutineWaitList& list) override;
public:
ConditionVariable();
CoroutineStarter wait(Lock* lock, std::function<bool()> condition);
CoroutineStarter waitUntil(Lock* lock, std::function<bool()> condition, const std::chrono::steady_clock::time_point& timeoutTime);
CoroutineStarter waitFor(Lock* lock, std::function<bool()>, const std::chrono::duration<v_int64, std::micro>& timeout);
void notifyFirst();
void notifyAll();
};
}}
#endif //oatpp_async_ConditionVariable_hpp

View File

@ -34,11 +34,11 @@ namespace oatpp { namespace async {
CoroutineWaitList::CoroutineWaitList(CoroutineWaitList&& other) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_lock};
std::lock_guard<std::mutex> lock{other.m_lock};
m_list = std::move(other.m_list);
}
{
std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_timeoutsLock};
std::lock_guard<std::mutex> lock{other.m_timeoutsLock};
m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout);
m_timeoutCheckingProcessors = std::move(other.m_timeoutCheckingProcessors);
@ -55,8 +55,8 @@ CoroutineWaitList::~CoroutineWaitList() {
}
void CoroutineWaitList::checkCoroutinesForTimeouts() {
std::lock_guard<oatpp::concurrency::SpinLock> listLock{m_lock};
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
std::lock_guard<std::mutex> listLock{m_lock};
std::lock_guard<std::mutex> lock{m_timeoutsLock};
const auto currentTimeSinceEpochMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
const auto newEndIt = std::remove_if(std::begin(m_coroutinesWithTimeout), std::end(m_coroutinesWithTimeout), [&](const std::pair<CoroutineHandle*, v_int64>& entry) {
return currentTimeSinceEpochMS > entry.second;
@ -89,7 +89,7 @@ void CoroutineWaitList::setListener(Listener* listener) {
void CoroutineWaitList::pushFront(CoroutineHandle* coroutine) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
std::lock_guard<std::mutex> lock(m_lock);
m_list.pushFront(coroutine);
}
if(m_listener != nullptr) {
@ -99,7 +99,7 @@ void CoroutineWaitList::pushFront(CoroutineHandle* coroutine) {
void CoroutineWaitList::pushFront(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
std::lock_guard<std::mutex> lock{m_timeoutsLock};
m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
if (++m_timeoutCheckingProcessors[coroutine->_PP] == 1) {
coroutine->_PP->addCoroutineWaitListWithTimeouts(this);
@ -110,7 +110,7 @@ void CoroutineWaitList::pushFront(CoroutineHandle* coroutine, v_int64 timeoutTim
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
std::lock_guard<std::mutex> lock(m_lock);
m_list.pushBack(coroutine);
}
if(m_listener != nullptr) {
@ -120,7 +120,7 @@ void CoroutineWaitList::pushBack(CoroutineHandle* coroutine) {
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
std::lock_guard<std::mutex> lock{m_timeoutsLock};
m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
if (++m_timeoutCheckingProcessors[coroutine->_PP] == 1) {
coroutine->_PP->addCoroutineWaitListWithTimeouts(this);
@ -130,14 +130,14 @@ void CoroutineWaitList::pushBack(CoroutineHandle* coroutine, v_int64 timeoutTime
}
void CoroutineWaitList::notifyFirst() {
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_lock};
std::lock_guard<std::mutex> lock{m_lock};
if(m_list.first) {
removeFirstCoroutine();
}
}
void CoroutineWaitList::notifyAll() {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
std::lock_guard<std::mutex> lock(m_lock);
while (!m_list.empty()) {
removeFirstCoroutine();
}
@ -147,7 +147,7 @@ void CoroutineWaitList::removeFirstCoroutine() {
auto coroutine = m_list.popFront();
{
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
std::lock_guard<std::mutex> lock{m_timeoutsLock};
if (--m_timeoutCheckingProcessors[coroutine->_PP] <= 0) {
coroutine->_PP->removeCoroutineWaitListWithTimeouts(this);
m_timeoutCheckingProcessors.erase(coroutine->_PP);
@ -163,13 +163,13 @@ CoroutineWaitList& CoroutineWaitList::operator=(CoroutineWaitList&& other) {
notifyAll();
{
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_lock};
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_lock};
std::lock_guard<std::mutex> otherLock{other.m_lock};
std::lock_guard<std::mutex> myLock{m_lock};
m_list = std::move(other.m_list);
}
{
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_timeoutsLock};
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_timeoutsLock};
std::lock_guard<std::mutex> otherLock{other.m_timeoutsLock};
std::lock_guard<std::mutex> myLock{m_timeoutsLock};
m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout);
m_timeoutCheckingProcessors = std::move(other.m_timeoutCheckingProcessors);

View File

@ -61,12 +61,12 @@ public:
};
private:
utils::FastQueue<CoroutineHandle> m_list;
oatpp::concurrency::SpinLock m_lock;
std::mutex m_lock;
Listener* m_listener = nullptr;
std::map<Processor*, v_int64> m_timeoutCheckingProcessors;
std::vector<std::pair<CoroutineHandle*, v_int64>> m_coroutinesWithTimeout;
oatpp::concurrency::SpinLock m_timeoutsLock;
std::mutex m_timeoutsLock;
private:
void checkCoroutinesForTimeouts();

View File

@ -149,6 +149,9 @@ Action LockGuard::lockAsyncInline(oatpp::async::Action&& nextAction) {
}
bool LockGuard::owns_lock() const {
return m_ownsLock;
}
void LockGuard::unlock() {

View File

@ -133,6 +133,12 @@ public:
*/
Action lockAsyncInline(oatpp::async::Action&& nextAction);
/**
* Check if owns lock.
* @return
*/
bool owns_lock() const;
/**
* Unlock guarded lock.
*/

View File

@ -32,6 +32,7 @@
#include "oatpp/core/parser/CaretTest.hpp"
#include "oatpp/core/provider/PoolTest.hpp"
#include "oatpp/core/provider/PoolTemplateTest.hpp"
#include "oatpp/core/async/ConditionVariableTest.hpp"
#include "oatpp/core/async/LockTest.hpp"
#include "oatpp/core/data/mapping/type/UnorderedMapTest.hpp"
@ -73,7 +74,7 @@ namespace {
void runTests() {
oatpp::base::Environment::printCompilationConfig();
/*
OATPP_LOGD("Tests", "coroutine size=%d", sizeof(oatpp::async::AbstractCoroutine));
OATPP_LOGD("Tests", "action size=%d", sizeof(oatpp::async::Action));
OATPP_LOGD("Tests", "class count=%d", oatpp::data::mapping::type::ClassId::getClassCount());
@ -116,7 +117,9 @@ void runTests() {
OATPP_RUN_TEST(oatpp::test::core::data::mapping::TypeResolverTest);
OATPP_RUN_TEST(oatpp::test::core::data::resource::InMemoryDataTest);
*/
OATPP_RUN_TEST(oatpp::test::async::ConditionVariableTest);
/*
OATPP_RUN_TEST(oatpp::test::async::LockTest);
OATPP_RUN_TEST(oatpp::test::parser::CaretTest);
@ -219,7 +222,7 @@ void runTests() {
test_port.run();
}
*/
}
}

View File

@ -0,0 +1,104 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "ConditionVariableTest.hpp"
#include "oatpp/core/async/Executor.hpp"
#include "oatpp/core/async/ConditionVariable.hpp"
namespace oatpp { namespace test { namespace async {
namespace {
struct Resource {
v_int64 counter;
};
class TestCoroutine : public oatpp::async::Coroutine<TestCoroutine> {
private:
std::shared_ptr<Resource> m_resource;
oatpp::async::Lock* m_lock;
oatpp::async::ConditionVariable* m_cv;
public:
TestCoroutine(std::shared_ptr<Resource> resource,
oatpp::async::Lock* lock,
oatpp::async::ConditionVariable* cv)
: m_resource(resource)
, m_lock(lock)
, m_cv(cv)
{}
bool condition() {
OATPP_LOGD("Resource", "%d", m_resource->counter)
return m_resource->counter == 100;
}
Action act() override {
OATPP_LOGD("TestCoroutine", "Waiting...")
return m_cv->waitFor(m_lock, [this]{return condition();}, std::chrono::milliseconds(5000)).next(yieldTo(&TestCoroutine::onReady));
}
Action onReady() {
OATPP_LOGD("TestCoroutine", "Ready!!!")
return finish();
}
};
}
void ConditionVariableTest::onRun() {
for(v_int32 iter = 0; iter < 100; iter ++ ) {
OATPP_LOGD("ITER", "%d", iter)
oatpp::async::Executor executor;
auto resource = std::make_shared<Resource>();
oatpp::async::Lock lock;
oatpp::async::ConditionVariable cv;
executor.execute<TestCoroutine>(resource, &lock, &cv);
for (v_int32 i = 0; i < 100; i++) {
std::thread t([&resource, &lock, &cv] {
{
std::lock_guard<oatpp::async::Lock> guard(lock);
resource->counter++;
}
cv.notifyAll();
});
t.detach();
}
executor.waitTasksFinished();
executor.stop();
executor.join();
}
}
}}}

View File

@ -0,0 +1,42 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_test_async_ConditionVariableTest_hpp
#define oatpp_test_async_ConditionVariableTest_hpp
#include "oatpp-test/UnitTest.hpp"
namespace oatpp { namespace test { namespace async {
class ConditionVariableTest : public UnitTest{
public:
ConditionVariableTest():UnitTest("TEST[async::ConditionVariableTest]"){}
void onRun() override;
};
}}}
#endif // oatpp_test_async_ConditionVariableTest_hpp