Minor upgrades for AsyncProcessor

This commit is contained in:
lganzzzo 2018-03-19 05:13:19 +02:00
parent d34d11cafd
commit 3e05437a49
4 changed files with 310 additions and 321 deletions

View File

@ -7,3 +7,287 @@
//
#include "Processor.hpp"
namespace oatpp { namespace async {
Processor::Queue::Queue()
: m_atom(false)
, m_first(nullptr)
, m_last(nullptr)
{}
Processor::Queue::Entry* Processor::Queue::peekFront() {
return m_first;
}
Processor::Queue::Entry* Processor::Queue::popFront() {
oatpp::concurrency::SpinLock lock(m_atom);
auto result = m_first;
if(m_first != nullptr) {
m_first = m_first->next;
if(m_first == nullptr) {
m_last = nullptr;
}
}
return result;
}
void Processor::Queue::pushFront(Routine* routine) {
pushFront(new Entry(routine, nullptr));
}
void Processor::Queue::pushFront(Entry* entry){
oatpp::concurrency::SpinLock lock(m_atom);
entry->next = m_first;
m_first = entry;
if(m_last == nullptr) {
m_last = entry;
}
}
void Processor::Queue::pushBack(Routine* routine) {
pushBack(new Entry(routine, nullptr));
}
void Processor::Queue::pushBack(Entry* entry) {
oatpp::concurrency::SpinLock lock(m_atom);
entry->next = nullptr;
if(m_last != nullptr) {
m_last->next = entry;
m_last = entry;
} else {
m_first = entry;
m_last = entry;
}
}
void Processor::Queue::moveEntryToQueue(Queue& from, Queue& to, Queue::Entry* curr, Queue::Entry* prev){
//OATPP_LOGD("proc", "moved to fast");
if(prev == nullptr) {
to.pushFront(from.popFront());
} else if(curr->next == nullptr) {
to.pushBack(curr);
from.m_last = prev;
prev->next = nullptr;
} else {
prev->next = curr->next;
to.pushBack(curr);
}
}
// Processor
void Processor::abortCurrentRoutine(){
auto entry = m_queue.popFront();
auto curr = entry->routine;
while(curr != nullptr) {
auto parent = curr->m_parent;
delete curr;
curr = parent;
}
delete entry;
}
void Processor::returnFromCurrentRoutine(){
//OATPP_LOGD("R", "_return");
auto entry = m_queue.popFront();
auto routine = entry->routine->m_parent;
delete entry->routine;
if(routine != nullptr) {
entry->routine = routine;
routine->blocks.popNoData();
m_queue.pushBack(entry);
} else {
delete entry;
}
}
void Processor::doAction(Action& a){
if(a.getType() == Action::TYPE_REPEAT) {
m_queue.pushBack(m_queue.popFront());
return;
} else if(a.getType() == Action::TYPE_WAIT_RETRY) {
m_queueSlow.pushBack(m_queue.popFront());
return;
} else if(a.getType() == Action::TYPE_RETURN) {
auto entry = m_queue.popFront();
auto routine = entry->routine->m_parent;
delete entry->routine;
delete entry;
if(routine != nullptr) {
m_queue.pushBack(routine);
}
return;
} else if(a.getType() == Action::TYPE_ABORT){
abortCurrentRoutine();
return;
} else if(a.getType() == Action::TYPE_ROUTINE) {
auto entry = m_queue.popFront();
if(!a.m_routine->blocks.isEmpty()){
Routine* r = a.m_routine;
a.m_routine = nullptr;
r->m_parent = entry->routine;
entry->routine = r;
} else {
entry->routine->blocks.popNoData();
}
m_queue.pushBack(entry);
return;
}
throw std::runtime_error("Invalid action type");
}
void Processor::propagateError(Error& error){
Routine* curr = m_queue.peekFront()->routine;
while (curr != nullptr) {
if(!curr->blocks.isEmpty()) {
auto block = curr->blocks.peek();
if(block.errorHandler != nullptr) {
try {
auto action = block.errorHandler(error);
if(action.isErrorAction()) {
error = action.getError();
} else {
doAction(action);
return;
}
} catch(...) {
error = {"Unknown", true};
}
}
}
auto parent = curr->m_parent;
auto entry = m_queue.popFront();
delete curr;
delete entry;
curr = parent;
if(curr != nullptr) {
m_queue.pushFront(curr);
}
}
}
bool Processor::auditQueueSlow(){
m_auditTimer = 0;
Queue::Entry* curr = m_queueSlow.peekFront();
Queue::Entry* prev = nullptr;
bool hasActions = false;
while (curr != nullptr) {
auto& block = curr->routine->blocks.peek();
try{
Action action = block.function();
if(action.getType() != Action::TYPE_WAIT_RETRY){
curr->routine->pendingAction = action;
action.null();
Queue::moveEntryToQueue(m_queueSlow, m_queue, curr, prev);
hasActions = true;
if(prev != nullptr) {
curr = prev;
} else {
curr = m_queueSlow.peekFront();
}
}
} catch(...) {
Error error {"Unknown", true };
curr->routine->pendingAction = error;
Queue::moveEntryToQueue(m_queueSlow, m_queue, curr, prev);
hasActions = true;
if(prev != nullptr) {
curr = prev;
} else {
curr = m_queueSlow.peekFront();
}
}
prev = curr;
if(curr != nullptr) {
curr = curr->next;
}
}
return hasActions;
}
void Processor::checkAudit(){
m_sleepCountDown = 0;
m_auditTimer ++;
if(m_auditTimer > 100) {
auditQueueSlow();
}
}
bool Processor::countDownToSleep() {
m_sleepCountDown ++;
auditQueueSlow();
if(m_sleepCountDown > 100) {
return false;
}
return true;
}
void Processor::addRoutine(const Routine::Builder& routineBuilder){
m_queue.pushBack(routineBuilder.m_routine);
routineBuilder.m_routine = nullptr;
}
bool Processor::iterate() {
auto entry = m_queue.peekFront();
if(entry != nullptr) {
auto r = entry->routine;
if(r->blocks.isEmpty()){
returnFromCurrentRoutine();
checkAudit();
return true;
}
if(!r->pendingAction.isNone()) {
Action action = r->pendingAction;
r->pendingAction.null();
if(action.isErrorAction()){
propagateError(action.getError());
} else {
doAction(action);
}
checkAudit();
return true;
}
auto& block = r->blocks.peek();
try{
Action action = block.function();
if(action.isErrorAction()){
propagateError(action.getError());
} else {
doAction(action);
}
} catch(...) {
Error error {"Unknown", true };
propagateError(error);
}
checkAudit();
return true;
}
return countDownToSleep();
}
}}

View File

@ -37,304 +37,43 @@ private:
oatpp::concurrency::SpinLock::Atom m_atom;
Entry* m_first;
Entry* m_last;
v_int32 m_count;
v_int32 m_max;
public:
Queue()
: m_atom(false)
, m_first(nullptr)
, m_last(nullptr)
, m_count(0)
, m_max(0)
{}
Entry* peekFront() {
return m_first;
}
Entry* popFront() {
oatpp::concurrency::SpinLock lock(m_atom);
auto result = m_first;
if(m_first != nullptr) {
m_first = m_first->next;
if(m_first == nullptr) {
m_last = nullptr;
}
m_count --;
}
return result;
}
void pushFront(Routine* routine) {
pushFront(new Entry(routine, nullptr));
}
void pushFront(Entry* entry){
oatpp::concurrency::SpinLock lock(m_atom);
entry->next = m_first;
m_first = entry;
if(m_last == nullptr) {
m_last = entry;
}
m_count ++;
}
void pushBack(Routine* routine) {
pushBack(new Entry(routine, nullptr));
}
void pushBack(Entry* entry) {
oatpp::concurrency::SpinLock lock(m_atom);
entry->next = nullptr;
if(m_last != nullptr) {
m_last->next = entry;
m_last = entry;
} else {
m_first = entry;
m_last = entry;
}
m_count ++;
if(m_count > m_max) {
m_max = m_count;
//OATPP_LOGD("queue", "size=%d", m_max);
}
}
v_int32 getCount(){
return m_count;
}
static void moveEntryToQueue(Queue& from, Queue& to, Queue::Entry* curr, Queue::Entry* prev){
//OATPP_LOGD("proc", "moved to fast");
if(prev == nullptr) {
to.pushFront(from.popFront());
} else if(curr->next == nullptr) {
to.pushBack(curr);
from.m_last = prev;
prev->next = nullptr;
from.m_count --;
} else {
prev->next = curr->next;
to.pushBack(curr);
from.m_count --;
}
}
Queue();
Entry* peekFront();
Entry* popFront();
void pushFront(Routine* routine);
void pushFront(Entry* entry);
void pushBack(Routine* routine);
void pushBack(Entry* entry);
static void moveEntryToQueue(Queue& from, Queue& to, Queue::Entry* curr, Queue::Entry* prev);
};
private:
void abortCurrentRoutine(){
auto entry = m_queue.popFront();
auto curr = entry->routine;
while(curr != nullptr) {
auto parent = curr->m_parent;
delete curr;
curr = parent;
}
delete entry;
}
void returnFromCurrentRoutine(){
//OATPP_LOGD("R", "_return");
auto entry = m_queue.popFront();
auto routine = entry->routine->m_parent;
delete entry->routine;
if(routine != nullptr) {
entry->routine = routine;
routine->blocks.popNoData();
m_queue.pushBack(entry);
} else {
delete entry;
}
}
void doAction(Action& a){
if(a.getType() == Action::TYPE_REPEAT) {
m_queue.pushBack(m_queue.popFront());
return;
} else if(a.getType() == Action::TYPE_WAIT_RETRY) {
m_queueSlow.pushBack(m_queue.popFront());
return;
} else if(a.getType() == Action::TYPE_RETURN) {
auto entry = m_queue.popFront();
auto routine = entry->routine->m_parent;
delete entry->routine;
delete entry;
if(routine != nullptr) {
m_queue.pushBack(routine);
}
return;
} else if(a.getType() == Action::TYPE_ABORT){
abortCurrentRoutine();
return;
} else if(a.getType() == Action::TYPE_ROUTINE) {
auto entry = m_queue.popFront();
if(!a.m_routine->blocks.isEmpty()){
Routine* r = a.m_routine;
a.m_routine = nullptr;
r->m_parent = entry->routine;
entry->routine = r;
} else {
entry->routine->blocks.popNoData();
}
m_queue.pushBack(entry);
return;
}
throw std::runtime_error("Invalid action type");
}
void propagateError(Error& error){
Routine* curr = m_queue.peekFront()->routine;
while (curr != nullptr) {
if(!curr->blocks.isEmpty()) {
auto block = curr->blocks.peek();
if(block.errorHandler != nullptr) {
try {
auto action = block.errorHandler(error);
if(action.isErrorAction()) {
error = action.getError();
} else {
doAction(action);
return;
}
} catch(...) {
error = {"Unknown", true};
}
}
}
auto parent = curr->m_parent;
auto entry = m_queue.popFront();
delete curr;
delete entry;
curr = parent;
if(curr != nullptr) {
m_queue.pushFront(curr);
}
}
}
bool auditQueueSlow(){
m_auditTimer = 0;
Queue::Entry* curr = m_queueSlow.peekFront();
Queue::Entry* prev = nullptr;
bool hasActions = false;
while (curr != nullptr) {
auto& block = curr->routine->blocks.peek();
try{
Action action = block.function();
if(action.getType() != Action::TYPE_WAIT_RETRY){
curr->routine->pendingAction = action;
action.null();
Queue::moveEntryToQueue(m_queueSlow, m_queue, curr, prev);
hasActions = true;
if(prev != nullptr) {
curr = prev;
} else {
curr = m_queueSlow.peekFront();
}
}
} catch(...) {
Error error {"Unknown", true };
curr->routine->pendingAction = error;
Queue::moveEntryToQueue(m_queueSlow, m_queue, curr, prev);
hasActions = true;
if(prev != nullptr) {
curr = prev;
} else {
curr = m_queueSlow.peekFront();
}
}
prev = curr;
if(curr != nullptr) {
curr = curr->next;
}
}
return hasActions;
}
void checkAudit(){
m_auditTimer ++;
if(m_auditTimer > 100) {
auditQueueSlow();
}
}
void abortCurrentRoutine();
void returnFromCurrentRoutine();
void doAction(Action& a);
void propagateError(Error& error);
bool auditQueueSlow();
bool countDownToSleep();
void checkAudit();
private:
Queue m_queue;
Queue m_queueSlow;
v_int32 m_auditTimer; // in cycles
v_int32 m_auditTimer; // in iterations
v_int32 m_sleepCountDown; // in iterations
public:
Processor()
: m_auditTimer(0)
, m_sleepCountDown(0)
{}
void addRoutine(const Routine::Builder& routine){
m_queue.pushBack(routine.m_routine);
routine.m_routine = nullptr;
}
bool iterate() {
auto entry = m_queue.peekFront();
if(entry != nullptr) {
auto r = entry->routine;
if(r->blocks.isEmpty()){
returnFromCurrentRoutine();
checkAudit();
return true;
}
if(!r->pendingAction.isNone()) {
Action action = r->pendingAction;
r->pendingAction.null();
if(action.isErrorAction()){
propagateError(action.getError());
} else {
doAction(action);
}
checkAudit();
return true;
}
auto& block = r->blocks.peek();
try{
Action action = block.function();
if(action.isErrorAction()){
propagateError(action.getError());
} else {
doAction(action);
}
} catch(...) {
Error error {"Unknown", true };
propagateError(error);
}
checkAudit();
return true;
}
return auditQueueSlow();
}
void addRoutine(const Routine::Builder& routineBuilder);
bool iterate();
};

View File

@ -48,34 +48,6 @@ void AsyncHttpConnectionHandler::Task::run(){
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
/*
while(true) {
Backlog::Entry* entry = backlog.popFront();
if(entry != nullptr) {
auto& state = entry->connectionState;
auto response = HttpProcessor::processRequest(m_router, state->connection, m_errorHandler,
state->ioBuffer->getData(),
state->ioBuffer->getSize(),
state->inStream,
state->keepAlive);
if(response) {
state->outStream->setBufferPosition(0, 0);
response->send(state->outStream);
state->outStream->flush();
}
if(state->keepAlive){
backlog.pushBack(entry);
} else {
delete entry;
}
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
*/
}
@ -95,7 +67,6 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::d
[this, state]{
return HttpProcessor::processRequestAsync(m_router.get(), m_errorHandler, state);
}, [] (const oatpp::async::Error& error) {
//OATPP_LOGD("AsyncHttpConnectionHandler", "received error");
if(error.error == HttpProcessor::RETURN_KEEP_ALIVE) {
return oatpp::async::Action::_repeat();
}

View File

@ -174,21 +174,14 @@ HttpProcessor::processRequestAsync(HttpRouter* router,
[state] {
//static std::atomic<v_int32> maxRetries(0);
state->readCount = state->connectionState->connection->read(state->ioBuffer, state->ioBufferSize);
if(state->readCount > 0) {
return oatpp::async::Action(nullptr);
} else if(state->readCount == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
/*
state->retries ++;
if(state->retries > maxRetries){
maxRetries = state->retries;
OATPP_LOGD("Retry", "max=%d", maxRetries.load());
}*/
return oatpp::async::Action::_wait_retry();
}
return oatpp::async::Action::_abort();
}, nullptr
})._then({
@ -216,15 +209,17 @@ HttpProcessor::processRequestAsync(HttpRouter* router,
}, nullptr
})._then({
[state] {
//OATPP_LOGD("Connection Processor", "Connection finished");
if(state->connectionState->keepAlive){
//OATPP_LOGD("CP", "try-keep-alive");
oatpp::async::Error error {RETURN_KEEP_ALIVE, false};
oatpp::async::Error error { RETURN_KEEP_ALIVE, false };
return oatpp::async::Action(error);
}
return oatpp::async::Action(nullptr);
}, nullptr
});
}