Merge remote-tracking branch 'origin/conditional_server_run' into conditional_server_run

This commit is contained in:
Benedikt-Alexander Mokroß 2021-01-04 09:37:08 +01:00
commit dbd02bb98d
4 changed files with 69 additions and 37 deletions

View File

@ -100,12 +100,13 @@ public:
return runConditionForLambda;
};
m_server->run(true, condition);
std::thread serverThread([&condition, this]{
m_server->run(condition);
});
std::thread clientThread([&runConditionForLambda, this, &lambda]{
lambda();
// m_server->stop();
runConditionForLambda = false;
m_connectionHandler->stop();
@ -124,6 +125,7 @@ public:
});
serverThread.join();
clientThread.join();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now() - startTime);

View File

@ -43,51 +43,55 @@ Server::Server(const std::shared_ptr<ServerConnectionProvider> &connectionProvid
, m_connectionHandler(connectionHandler)
, m_threaded(false) {}
void Server::mainLoop(Server *instance) {
instance->setStatus(STATUS_STARTING, STATUS_RUNNING);
// This isn't implemented as static since threading is dropped and therefore static isn't needed anymore.
void Server::conditionalMainLoop() {
setStatus(STATUS_STARTING, STATUS_RUNNING);
std::shared_ptr<const std::unordered_map<oatpp::String, oatpp::String>> params;
// Code duplication to extract unnecessary checks for the conditional to maintain performance when the conditional isn't used.
if (instance->m_condition) {
while (instance->getStatus() == STATUS_RUNNING) {
if (instance->m_condition()) {
auto connection = instance->m_connectionProvider->get();
if (connection) {
if (instance->getStatus() == STATUS_RUNNING) {
if (instance->m_condition()) {
instance->m_connectionHandler->handleConnection(connection, params /* null params */);
} else {
instance->setStatus(STATUS_STOPPING);
}
} else {
OATPP_LOGD("[oatpp::network::server::mainLoop()]", "Error. Server already stopped - closing connection...");
}
}
} else {
instance->setStatus(STATUS_STOPPING);
}
}
} else {
while (instance->getStatus() == STATUS_RUNNING) {
auto connection = instance->m_connectionProvider->get();
while (getStatus() == STATUS_RUNNING) {
if (m_condition()) {
auto connection = m_connectionProvider->get();
if (connection) {
if (instance->getStatus() == STATUS_RUNNING) {
instance->m_connectionHandler->handleConnection(connection, params /* null params */);
if (getStatus() == STATUS_RUNNING) {
if (m_condition()) {
m_connectionHandler->handleConnection(connection, params /* null params */);
} else {
setStatus(STATUS_STOPPING);
}
} else {
OATPP_LOGD("[oatpp::network::server::mainLoop()]", "Error. Server already stopped - closing connection...");
}
}
} else {
setStatus(STATUS_STOPPING);
}
}
setStatus(STATUS_DONE);
}
void Server::mainLoop(Server *instance) {
instance->setStatus(STATUS_STARTING, STATUS_RUNNING);
std::shared_ptr<const std::unordered_map<oatpp::String, oatpp::String>> params;
while (instance->getStatus() == STATUS_RUNNING) {
auto connection = instance->m_connectionProvider->get();
if (connection) {
if (instance->getStatus() == STATUS_RUNNING) {
instance->m_connectionHandler->handleConnection(connection, params /* null params */);
} else {
OATPP_LOGD("[oatpp::network::server::mainLoop()]", "Error. Server already stopped - closing connection...");
}
}
}
instance->setStatus(STATUS_DONE);
}
void Server::run(bool startAsNewThread, std::function<bool()> conditional) {
void Server::run(std::function<bool()> conditional) {
std::unique_lock<std::mutex> ul(m_mutex);
switch (getStatus()) {
case STATUS_STARTING:
@ -96,11 +100,28 @@ void Server::run(bool startAsNewThread, std::function<bool()> conditional) {
throw std::runtime_error("[oatpp::network::server::run()] Error. Server already started");
}
m_threaded = false;
setStatus(STATUS_CREATED, STATUS_STARTING);
m_condition = std::move(conditional);
ul.unlock(); // early unlock
mainLoop(this);
}
void Server::run(bool startAsNewThread) {
std::unique_lock<std::mutex> ul(m_mutex);
OATPP_LOGW("[oatpp::network::server::run(bool)]", "Using oatpp::network::server::run(bool) is deprecated and will be removed in the next release. Please implement your own threading.")
switch (getStatus()) {
case STATUS_STARTING:
throw std::runtime_error("[oatpp::network::server::run()] Error. Server already starting");
case STATUS_RUNNING:
throw std::runtime_error("[oatpp::network::server::run()] Error. Server already started");
}
m_threaded = startAsNewThread;
setStatus(STATUS_CREATED, STATUS_STARTING);
m_condition = std::move(conditional);
if (m_threaded) {
m_thread = std::thread(mainLoop, this);
} else {

View File

@ -47,6 +47,7 @@ class Server : public base::Countable {
private:
static void mainLoop(Server *instance);
void conditionalMainLoop();
bool setStatus(v_int32 expectedStatus, v_int32 newStatus);
void setStatus(v_int32 status);
@ -116,10 +117,18 @@ public:
/**
* Call &id:oatpp::network::ConnectionProvider::getConnection; in the loop and passes obtained Connection
* to &id:oatpp::network::ConnectionHandler;.
* @param startAsNewThread - Start the server blocking (thread of callee) or non-blocking (own thread)
* @param conditional - Function that is called every mainloop iteration to check if the server should continue to run <br>
* Return true to let the server continue, false to shut it down.
*/
void run(bool startAsNewThread, std::function<bool()> conditional = nullptr);
void run(std::function<bool()> conditional = nullptr);
/**
* Call &id:oatpp::network::ConnectionProvider::getConnection; in the loop and passes obtained Connection
* to &id:oatpp::network::ConnectionHandler;.
* @param startAsNewThread - Start the server blocking (thread of callee) or non-blocking (own thread)
* @deprecated Deprecated since 1.3.0, will be removed in the next release.
*/
void run(bool startAsNewThread);
/**
* Break server loop.

View File

@ -176,7 +176,7 @@ void PipelineTest::onRun() {
pipeOutThread.join();
pipeInThread.join();
}, std::chrono::minutes(1));
}, std::chrono::minutes(10));
std::this_thread::sleep_for(std::chrono::seconds(1));