Tests. ConnectionPoolTest.

This commit is contained in:
lganzzzo 2019-10-20 05:27:22 +03:00
parent 414e94678b
commit 2592f79934
7 changed files with 447 additions and 45 deletions

View File

@ -176,15 +176,25 @@ void Processor::pushQueues() {
if (m_pushList.count < MAX_BATCH_SIZE && m_queue.first != nullptr) {
std::unique_lock<oatpp::concurrency::SpinLock> lock(m_taskLock, std::try_to_lock);
if (lock.owns_lock()) {
while(m_pushList.first != nullptr) {
addCoroutine(m_pushList.popFront());
oatpp::collection::FastQueue<CoroutineHandle> tmpList;
oatpp::collection::FastQueue<CoroutineHandle>::moveAll(m_pushList, tmpList);
lock.unlock();
while(tmpList.first != nullptr) {
addCoroutine(tmpList.popFront());
}
}
} else {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
while(m_pushList.first != nullptr) {
addCoroutine(m_pushList.popFront());
oatpp::collection::FastQueue<CoroutineHandle> tmpList;
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
oatpp::collection::FastQueue<CoroutineHandle>::moveAll(m_pushList, tmpList);
}
while(tmpList.first != nullptr) {
addCoroutine(tmpList.popFront());
}
}
}

View File

@ -24,38 +24,29 @@
#include "ConnectionPool.hpp"
#include <thread>
#include <chrono>
namespace oatpp { namespace network {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionPool::Pool
void ConnectionPool::pushConnection(Pool* pool, const std::shared_ptr<IOStream>& connection, v_int32 inc) {
void ConnectionPool::Pool::onNewItem(oatpp::async::CoroutineWaitList& list) {
{
std::lock_guard<std::mutex> lock(pool->lock);
if (inc >= 0) {
pool->connections.push_back({connection, oatpp::base::Environment::getMicroTickCount()});
} else {
pool->size --;
}
if (inc > 0) {
pool->size ++;
}
std::lock_guard<std::mutex> lockGuard(lock);
if(!isOpen) {
list.notifyAll();
return;
}
pool->condition.notify_one();
if(size < maxConnections || connections.size() > 0) {
list.notifyFirst();
}
}
std::shared_ptr<ConnectionPool::IOStream> ConnectionPool::popConnection_NON_BLOCKING(Pool* pool) {
auto result = pool->connections.front();
pool->connections.pop_front();
return result.connection;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionPool::ConnectionWrapper
@ -109,33 +100,192 @@ void ConnectionPool::ConnectionWrapper::invalidate() {
m_recycleConnection = false;
}
bool ConnectionPool::ConnectionWrapper::isValid() {
return m_connection && m_recycleConnection;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionPool
ConnectionPool::ConnectionPool(const std::shared_ptr<ConnectionProvider>& connectionProvider,
v_int64 maxConnections,
v_int64 maxConnectionTTL)
: m_pool(std::make_shared<Pool>(maxConnections, maxConnectionTTL))
, m_connectionProvider(connectionProvider)
{
std::thread poolCleanupTask(cleanupTask, m_pool);
poolCleanupTask.detach();
}
void ConnectionPool::cleanupTask(std::shared_ptr<Pool> pool) {
while(pool->isOpen) {
{
std::lock_guard<std::mutex> lock(pool->lock);
auto ticks = oatpp::base::Environment::getMicroTickCount();
std::list<ConnectionHandle>::iterator i = pool->connections.begin();
while (i != pool->connections.end()) {
if(ticks - i->timestamp > pool->maxConnectionTTL) {
i = pool->connections.erase(i);
pool->size --;
} else {
i ++;
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
void ConnectionPool::pushConnection(Pool* pool, const std::shared_ptr<IOStream>& connection, v_int32 inc) {
{
std::lock_guard<std::mutex> lock(pool->lock);
if(!pool->isOpen) {
pool->size --;
return;
}
if (inc >= 0) {
pool->connections.push_back({connection, oatpp::base::Environment::getMicroTickCount()});
} else {
pool->size --;
}
if (inc > 0) {
pool->size ++;
}
}
pool->condition.notify_one();
pool->waitList.notifyFirst();
}
std::shared_ptr<ConnectionPool::IOStream> ConnectionPool::popConnection_NON_BLOCKING(Pool* pool) {
auto result = pool->connections.front();
pool->connections.pop_front();
return result.connection;
}
std::shared_ptr<ConnectionPool::ConnectionWrapper> ConnectionPool::getConnection() {
std::unique_lock<std::mutex> lock(m_pool->lock);
while (m_pool->size >= m_maxConnections && m_pool->connections.size() == 0) {
m_pool->condition.wait(lock);
{
std::unique_lock<std::mutex> lock(m_pool->lock);
while (m_pool->isOpen && m_pool->size >= m_pool->maxConnections && m_pool->connections.size() == 0) {
m_pool->condition.wait(lock);
}
if(!m_pool->isOpen) {
return nullptr;
}
if (m_pool->connections.size() > 0) {
return std::make_shared<ConnectionWrapper>(popConnection_NON_BLOCKING(m_pool.get()), m_pool);
} else {
m_pool->size ++;
}
}
if(m_pool->connections.size() == 0) {
m_pool->size ++;
return std::make_shared<ConnectionWrapper>(m_connectionProvider->getConnection(), m_pool);
}
return std::make_shared<ConnectionWrapper>(popConnection_NON_BLOCKING(m_pool.get()), m_pool);
return std::make_shared<ConnectionWrapper>(m_connectionProvider->getConnection(), m_pool);
}
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<ConnectionPool::ConnectionWrapper>&> ConnectionPool::getConnectionAsync() {
class ConnectionCoroutine : public oatpp::async::CoroutineWithResult<ConnectionCoroutine, const std::shared_ptr<ConnectionPool::ConnectionWrapper>&> {
private:
std::shared_ptr<Pool> m_pool;
std::shared_ptr<ConnectionProvider> m_connectionProvider;
bool m_wasInc;
public:
ConnectionCoroutine(const std::shared_ptr<Pool>& pool,
const std::shared_ptr<ConnectionProvider>& connectionProvider)
: m_pool(pool)
, m_connectionProvider(connectionProvider)
, m_wasInc(false)
{}
Action act() override {
{
/* Careful!!! Using non-async lock */
std::unique_lock<std::mutex> lock(m_pool->lock);
if (m_pool->isOpen && m_pool->size >= m_pool->maxConnections && m_pool->connections.size() == 0) {
lock.unlock();
return Action::createWaitListAction(&m_pool->waitList);
}
if(!m_pool->isOpen) {
lock.unlock();
return _return(nullptr);
}
if (m_pool->connections.size() > 0) {
auto connection = std::make_shared<ConnectionWrapper>(popConnection_NON_BLOCKING(m_pool.get()), m_pool);
lock.unlock();
return _return(connection);
} else {
m_pool->size ++;
m_wasInc = true;
}
}
return m_connectionProvider->getConnectionAsync().callbackTo(&ConnectionCoroutine::onConnection);
}
Action onConnection(const std::shared_ptr<IOStream>& connection) {
return _return(std::make_shared<ConnectionWrapper>(connection, m_pool));
}
Action handleError(oatpp::async::Error* error) override {
if(m_wasInc) {
/* Careful!!! Using non-async lock */
std::lock_guard<std::mutex> lock(m_pool->lock);
m_pool->size --;
}
return error;
}
};
return ConnectionCoroutine::startForResult(m_pool, m_connectionProvider);
}
void ConnectionPool::close() {
{
std::lock_guard<std::mutex> lock(m_pool->lock);
m_pool->isOpen = false;
auto size = m_pool->connections.size();
m_pool->connections.clear();
m_pool->size -= size;
}
m_pool->condition.notify_all();
m_pool->waitList.notifyAll();
}
}}

View File

@ -28,6 +28,7 @@
#include "ConnectionProvider.hpp"
#include "oatpp/core/data/stream/Stream.hpp"
#include "oatpp/core/async/CoroutineWaitList.hpp"
#include <list>
#include <condition_variable>
@ -46,13 +47,27 @@ private:
private:
class Pool {
class Pool : private oatpp::async::CoroutineWaitList::Listener {
friend ConnectionPool;
private:
void onNewItem(oatpp::async::CoroutineWaitList& list) override;
private:
oatpp::async::CoroutineWaitList waitList;
std::condition_variable condition;
std::mutex lock;
std::list<ConnectionHandle> connections;
v_int64 size = 0;
v_int64 maxConnections;
v_int64 maxConnectionTTL;
private:
std::atomic<bool> isOpen;
public:
Pool(v_int64 pMaxConnections, v_int64 pMmaxConnectionTTL)
: maxConnections(pMaxConnections)
, maxConnectionTTL(pMmaxConnectionTTL)
, isOpen(true)
{}
};
public:
@ -80,6 +95,7 @@ public:
oatpp::data::stream::IOMode getInputStreamIOMode() override;
void invalidate();
bool isValid();
};
@ -101,11 +117,13 @@ private:
*/
static std::shared_ptr<IOStream> popConnection_NON_BLOCKING(Pool* pool);
public:
static void cleanupTask(std::shared_ptr<Pool> pool);
private:
std::shared_ptr<Pool> m_pool;
std::shared_ptr<ConnectionProvider> m_connectionProvider;
v_int64 m_maxConnections;
v_int64 m_maxConnectionTTL;
public:
/**
@ -114,12 +132,7 @@ public:
*/
ConnectionPool(const std::shared_ptr<ConnectionProvider>& connectionProvider,
v_int64 maxConnections,
v_int64 maxConnectionTTL)
: m_pool(std::make_shared<Pool>())
, m_connectionProvider(connectionProvider)
, m_maxConnections(maxConnections)
, m_maxConnectionTTL(maxConnectionTTL)
{}
v_int64 maxConnectionTTL);
/**
* Get connection.

View File

@ -29,6 +29,8 @@ add_executable(oatppAllTests
oatpp/encoding/Base64Test.hpp
oatpp/encoding/UnicodeTest.cpp
oatpp/encoding/UnicodeTest.hpp
oatpp/network/ConnectionPoolTest.cpp
oatpp/network/ConnectionPoolTest.hpp
oatpp/network/UrlTest.cpp
oatpp/network/UrlTest.hpp
oatpp/network/virtual_/InterfaceTest.cpp
@ -64,7 +66,8 @@ add_executable(oatppAllTests
oatpp/web/app/ControllerAsync.hpp
oatpp/web/app/DTOs.hpp
oatpp/web/app/ControllerWithInterceptors.hpp
oatpp/web/app/ControllerWithInterceptorsAsync.hpp)
oatpp/web/app/ControllerWithInterceptorsAsync.hpp
)
target_link_libraries(oatppAllTests PRIVATE oatpp PRIVATE oatpp-test)

View File

@ -15,6 +15,7 @@
#include "oatpp/network/virtual_/PipeTest.hpp"
#include "oatpp/network/virtual_/InterfaceTest.hpp"
#include "oatpp/network/UrlTest.hpp"
#include "oatpp/network/ConnectionPoolTest.hpp"
#include "oatpp/core/data/stream/BufferStreamTest.hpp"
#include "oatpp/core/data/stream/ChunkedBufferTest.hpp"
@ -86,6 +87,9 @@ void runTests() {
OATPP_RUN_TEST(oatpp::test::encoding::UnicodeTest);
OATPP_RUN_TEST(oatpp::test::network::UrlTest);
OATPP_RUN_TEST(oatpp::test::network::ConnectionPoolTest);
OATPP_RUN_TEST(oatpp::test::network::virtual_::PipeTest);
OATPP_RUN_TEST(oatpp::test::network::virtual_::InterfaceTest);

View File

@ -0,0 +1,179 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "ConnectionPoolTest.hpp"
#include "oatpp/network/ConnectionPool.hpp"
#include "oatpp/core/async/Executor.hpp"
namespace oatpp { namespace test { namespace network {
namespace {
typedef oatpp::network::ConnectionPool ConnectionPool;
class StubStream : public oatpp::data::stream::IOStream, public oatpp::base::Countable {
public:
data::v_io_size write(const void *buff, data::v_io_size count) override {
throw std::runtime_error("It's a stub!");
}
data::v_io_size read(void *buff, data::v_io_size count) override {
throw std::runtime_error("It's a stub!");
}
oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override {
throw std::runtime_error("It's a stub!");
}
oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override {
throw std::runtime_error("It's a stub!");
}
void setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) override {
throw std::runtime_error("It's a stub!");
}
oatpp::data::stream::IOMode getOutputStreamIOMode() override {
throw std::runtime_error("It's a stub!");
}
void setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) override {
throw std::runtime_error("It's a stub!");
}
oatpp::data::stream::IOMode getInputStreamIOMode() override {
throw std::runtime_error("It's a stub!");
}
};
class StubStreamProvider : public oatpp::network::ConnectionProvider {
public:
StubStreamProvider()
: counter(0)
{}
std::atomic<v_int64> counter;
virtual std::shared_ptr<IOStream> getConnection() {
++ counter;
return std::make_shared<StubStream>();
}
virtual oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::stream::IOStream>&> getConnectionAsync() {
class ConnectionCoroutine : public oatpp::async::CoroutineWithResult<ConnectionCoroutine, const std::shared_ptr<oatpp::data::stream::IOStream>&> {
public:
Action act() override {
return _return(std::make_shared<StubStream>());
}
};
++ counter;
return ConnectionCoroutine::startForResult();
}
virtual void close() {
// DO NOTHING
}
};
class ClientCoroutine : public oatpp::async::Coroutine<ClientCoroutine> {
private:
std::shared_ptr<ConnectionPool> m_pool;
std::shared_ptr<ConnectionPool::ConnectionWrapper> m_connection;
v_int32 m_repeats;
public:
ClientCoroutine(const std::shared_ptr<ConnectionPool>& pool)
: m_pool(pool)
, m_repeats(0)
{}
Action act() override {
return m_pool->getConnectionAsync().callbackTo(&ClientCoroutine::onConnection);
}
Action onConnection(const std::shared_ptr<ConnectionPool::ConnectionWrapper>& connection) {
m_connection = connection;
return yieldTo(&ClientCoroutine::useConnection);
}
Action useConnection() {
if(m_repeats < 1) {
m_repeats ++;
return waitRepeat(std::chrono::milliseconds(100));
}
return finish();
}
};
void clientMethod(std::shared_ptr<ConnectionPool> pool) {
auto connection = pool->getConnection();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void ConnectionPoolTest::onRun() {
oatpp::async::Executor executor(1, 1, 1);
auto connectionProvider = std::make_shared<StubStreamProvider>();
auto pool = std::make_shared<ConnectionPool>(connectionProvider, 10 /* maxConnections */, 10 * 1000 * 1000 /* 10s maxConnectionTTL */);
std::list<std::thread> threads;
for(v_int32 i = 0; i < 100; i ++ ) {
threads.push_back(std::thread(clientMethod, pool));
executor.execute<ClientCoroutine>(pool);
}
for(std::thread& thread : threads) {
thread.join();
}
executor.waitTasksFinished();
OATPP_LOGD(TAG, "connections_counter=%d", connectionProvider->counter.load());
OATPP_ASSERT(connectionProvider->counter <= 10);
pool->close();
executor.stop();
executor.join();
}
}}}

View File

@ -0,0 +1,43 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_test_network_ConnectionPoolTest_hpp
#define oatpp_test_network_ConnectionPoolTest_hpp
#include "oatpp-test/UnitTest.hpp"
namespace oatpp { namespace test { namespace network {
class ConnectionPoolTest : public UnitTest {
public:
ConnectionPoolTest():UnitTest("TEST[network::ConnectionPoolTest]"){}
void onRun() override;
};
}}}
#endif // oatpp_test_network_ConnectionPoolTest_hpp