new Coroutine concept

This commit is contained in:
lganzzzo 2018-03-22 12:00:46 +02:00
parent 3e05437a49
commit 87be65af4d
23 changed files with 880 additions and 110 deletions

View File

@ -0,0 +1,21 @@
//
// Coroutine.cpp
// crud
//
// Created by Leonid on 3/22/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "Coroutine.hpp"
namespace oatpp { namespace async {
const v_int32 Action2::TYPE_COROUTINE = 0;
const v_int32 Action2::TYPE_YIELD_TO = 1;
const v_int32 Action2::TYPE_WAIT_RETRY = 2;
const v_int32 Action2::TYPE_REPEAT = 3;
const v_int32 Action2::TYPE_FINISH = 4;
const v_int32 Action2::TYPE_ABORT = 5;
const v_int32 Action2::TYPE_ERROR = 6;
}}

View File

@ -0,0 +1,197 @@
//
// Coroutine.hpp
// crud
//
// Created by Leonid on 3/22/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_async_Coroutine_hpp
#define oatpp_async_Coroutine_hpp
#include "../collection/FastQueue.hpp"
#include "../base/Environment.hpp"
namespace oatpp { namespace async {
class AbstractCoroutine; // FWD
class Error2 {
public:
Error2(const char* pMessage, bool pIsExceptionThrown = false)
: message(pMessage)
, isExceptionThrown(pIsExceptionThrown)
{}
const char* message;
bool isExceptionThrown;
};
class Action2 {
friend AbstractCoroutine;
public:
typedef Action2 (AbstractCoroutine::*FunctionPtr)();
public:
static const v_int32 TYPE_COROUTINE;
static const v_int32 TYPE_YIELD_TO;
static const v_int32 TYPE_WAIT_RETRY;
static const v_int32 TYPE_REPEAT;
static const v_int32 TYPE_FINISH;
static const v_int32 TYPE_ABORT;
static const v_int32 TYPE_ERROR;
private:
v_int32 m_type;
AbstractCoroutine* m_coroutine;
FunctionPtr m_functionPtr;
Error2 m_error;
public:
Action2(v_int32 type,
AbstractCoroutine* coroutine,
FunctionPtr functionPtr)
: m_type(type)
, m_coroutine(coroutine)
, m_functionPtr(functionPtr)
, m_error(Error2(nullptr))
{}
Action2(const Error2& error)
: m_type(TYPE_ERROR)
, m_coroutine(nullptr)
, m_functionPtr(nullptr)
, m_error(error)
{}
bool isError(){
return m_type == TYPE_ERROR;
}
};
class AbstractCoroutine {
public:
typedef Action2 Action2;
typedef Action2 (AbstractCoroutine::*FunctionPtr)();
private:
AbstractCoroutine* _CP = this;
FunctionPtr _FP = &AbstractCoroutine::act;
AbstractCoroutine* _ref = nullptr;
const Action2& takeAction(const Action2& action){
if(action.m_type == Action2::TYPE_COROUTINE) {
action.m_coroutine->m_parent = _CP;
_CP = action.m_coroutine;
_CP->m_returnFP = action.m_functionPtr;
_FP = action.m_coroutine->_FP;
} else if(action.m_type == Action2::TYPE_FINISH) {
do {
_FP = _CP->m_returnFP;
_CP->free();
_CP = _CP->m_parent; // Should be fine here. As free() - return of the pointer to Bench. (Memory not changed)
} while (_FP == nullptr && _CP != nullptr);
} else if(action.m_type == Action2::TYPE_YIELD_TO) {
_FP = action.m_functionPtr;
} else if(action.m_type == Action2::TYPE_ABORT) {
while (_CP != nullptr) {
_CP->free();
_CP = _CP->m_parent;
}
}
return action;
}
private:
AbstractCoroutine* m_parent = nullptr;
FunctionPtr m_returnFP = nullptr; // should be set by processor on startCoroutine
public:
Action2 iterate() {
//try {
return takeAction(_CP->call(_FP));
//} catch (...) {
//}
};
virtual ~AbstractCoroutine(){
}
virtual Action2 act() = 0;
virtual Action2 call(FunctionPtr ptr) = 0;
virtual void free() = 0;
bool finished(){
return _CP == nullptr;
}
AbstractCoroutine* getParent(){
return m_parent;
}
};
template<class T>
class Coroutine : public AbstractCoroutine {
public:
typedef Action2 (T::*Function)();
typedef oatpp::collection::Bench<T> Bench;
public:
static Bench& getBench(){
static thread_local Bench bench(1024);
return bench;
}
public:
static FunctionPtr castFunctionPtr(Function function){
return static_cast<FunctionPtr>(function);
}
public:
virtual Action2 call(FunctionPtr ptr) override {
Function f = static_cast<Function>(ptr);
return (static_cast<T*>(this)->*f)();
}
virtual void free() override {
Coroutine<T>::getBench().free(static_cast<T*>(this));
}
template<typename C, typename F, typename ... Args>
Action2 startCoroutine(F returnToFunction, Args... args) {
C* coroutine = C::getBench().obtain(args...);
return Action2(Action2::TYPE_COROUTINE, coroutine, static_cast<FunctionPtr>(returnToFunction));
}
Action2 yieldTo(Function function) {
return Action2(Action2::TYPE_YIELD_TO, nullptr, static_cast<FunctionPtr>(function));
}
Action2 waitRetry() {
return Action2(Action2::TYPE_WAIT_RETRY, nullptr, nullptr);
}
Action2 repeat() {
return Action2(Action2::TYPE_REPEAT, nullptr, nullptr);
}
Action2 finish() {
return Action2(Action2::TYPE_FINISH, nullptr, nullptr);
}
Action2 abort() {
return Action2(Action2::TYPE_ABORT, nullptr, nullptr);
}
Action2 error(const char* message) {
return Action2(Error2(message));
}
};
}}
#endif /* oatpp_async_Coroutine_hpp */

View File

@ -90,14 +90,30 @@ void Processor::abortCurrentRoutine(){
delete entry;
}
Routine* Processor::returnFromRoutine(Routine* from) {
auto curr = from->m_parent;
delete from;
while (curr != nullptr) {
if(curr->blocks.isEmpty()){
auto parent = curr->m_parent;
delete curr;
curr = parent;
} else {
return curr;
}
}
return nullptr;
}
void Processor::returnFromCurrentRoutine(){
//OATPP_LOGD("R", "_return");
auto entry = m_queue.popFront();
auto routine = entry->routine->m_parent;
delete entry->routine;
if(routine != nullptr) {
entry->routine = routine;
routine->blocks.popNoData();
entry->routine = returnFromRoutine(entry->routine);
if(entry->routine != nullptr) {
entry->routine->blocks.popNoData();
m_queue.pushBack(entry);
} else {
delete entry;
@ -107,25 +123,15 @@ void Processor::returnFromCurrentRoutine(){
void Processor::doAction(Action& a){
if(a.getType() == Action::TYPE_REPEAT) {
m_queue.pushBack(m_queue.popFront());
return;
} else if(a.getType() == Action::TYPE_WAIT_RETRY) {
m_queueSlow.pushBack(m_queue.popFront());
return;
} else if(a.getType() == Action::TYPE_RETURN) {
auto entry = m_queue.popFront();
auto routine = entry->routine->m_parent;
delete entry->routine;
delete entry;
if(routine != nullptr) {
m_queue.pushBack(routine);
}
return;
returnFromCurrentRoutine();
} else if(a.getType() == Action::TYPE_ABORT){
abortCurrentRoutine();
return;
} else if(a.getType() == Action::TYPE_ROUTINE) {
auto entry = m_queue.popFront();
if(!a.m_routine->blocks.isEmpty()){
if(a.m_routine != nullptr && !a.m_routine->blocks.isEmpty()){
Routine* r = a.m_routine;
a.m_routine = nullptr;
r->m_parent = entry->routine;
@ -134,9 +140,9 @@ void Processor::doAction(Action& a){
entry->routine->blocks.popNoData();
}
m_queue.pushBack(entry);
return;
} else {
throw std::runtime_error("Invalid action type");
}
throw std::runtime_error("Invalid action type");
}
void Processor::propagateError(Error& error){

View File

@ -53,6 +53,7 @@ private:
private:
void abortCurrentRoutine();
Routine* returnFromRoutine(Routine* from);
void returnFromCurrentRoutine();
void doAction(Action& a);
void propagateError(Error& error);

View File

@ -0,0 +1,9 @@
//
// Processor2.cpp
// crud
//
// Created by Leonid on 3/20/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "Processor2.hpp"

View File

@ -0,0 +1,48 @@
//
// Processor2.hpp
// crud
//
// Created by Leonid on 3/20/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_async_Processor2_hpp
#define oatpp_async_Processor2_hpp
#include "../collection/FastQueue.hpp"
#include "../base/Environment.hpp"
namespace oatpp { namespace async {
class Processor2 {
public:
struct Routine {
public:
typedef oatpp::collection::FastQueue<Routine> FastQueue;
public:
Routine* _ref;
};
public:
struct Entry {
public:
typedef oatpp::collection::FastQueue<Entry> FastQueue;
public:
Routine* routine;
Entry* _ref;
};
private:
Entry::FastQueue m_activeQueue;
Entry::FastQueue m_waitingQueue;
public:
};
}}
#endif /* oatpp_async_Processor2_hpp */

View File

@ -55,6 +55,10 @@ Action& Action::_abort(){
static Action a(TYPE_ABORT);
return a;
}
Action& Action::_continue(){
static Action a(nullptr);
return a;
}
Action::Action(v_int32 type)
: m_type(type)
@ -83,7 +87,7 @@ Action::Action(const Routine::Builder& routine)
Action::Action(std::nullptr_t nullp)
: m_type(TYPE_ROUTINE)
, m_error({nullptr, false})
, m_routine(new Routine())
, m_routine(nullptr)
{}
Action::~Action() {

View File

@ -22,8 +22,20 @@ class Routine; // FWD
struct Error {
public:
Error()
: error(nullptr)
, isExceptionThrown(false)
{}
Error(const char* pError, bool isException = false)
: error(pError)
, isExceptionThrown(isException)
{}
const char* error;
bool isExceptionThrown;
};
struct Block {
@ -53,6 +65,7 @@ public:
class Action {
friend Processor;
friend Routine;
public:
static const v_int32 TYPE_NONE;
static const v_int32 TYPE_REPEAT;
@ -66,6 +79,7 @@ public:
static Action& _wait_retry();
static Action& _return();
static Action& _abort();
static Action& _continue();
private:
void null(){
m_type = TYPE_NONE;
@ -77,8 +91,9 @@ private:
Routine* m_routine;
private:
Action(v_int32 type);
public:
Action();
public:
Action(const Error& error);
Action(const RoutineBuilder& routine);
Action(std::nullptr_t nullp);

View File

@ -0,0 +1,10 @@
//
// FastQueue.cpp
// crud
//
// Created by Leonid on 3/21/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "FastQueue.hpp"

View File

@ -0,0 +1,177 @@
//
// FastQueue.hpp
// crud
//
// Created by Leonid on 3/21/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_collection_FastQueue_hpp
#define oatpp_collection_FastQueue_hpp
#include "../base/Environment.hpp"
namespace oatpp { namespace collection {
template<typename T>
class Bench {
private:
class Block {
public:
Block(p_char8 mem, Block* pNext)
: memory(mem)
, next(pNext)
{}
p_char8 memory;
Block* next;
};
private:
void grow(){
v_int32 newSize = m_size + m_growSize;
T** newIndex = new T*[newSize];
memcmp(newIndex, m_index, m_size);
Block* b = new Block(new v_char8 [m_growSize * sizeof(T)], m_blocks);
m_blocks = b;
for(v_int32 i = 0; i < m_growSize; i++) {
newIndex[m_size + i] = (T*) (&b->memory[i * sizeof(T)]);
}
delete [] m_index;
m_size = newSize;
m_index = newIndex;
}
private:
v_int32 m_growSize;
v_int32 m_size;
v_int32 m_indexPosition;
Block* m_blocks;
T** m_index;
public:
Bench(v_int32 growSize)
: m_growSize(growSize)
, m_size(0)
, m_indexPosition(0)
, m_blocks(nullptr)
, m_index(nullptr)
{
grow();
}
~Bench(){
auto curr = m_blocks;
while (curr != nullptr) {
auto next = curr->next;
delete curr;
curr = next;
}
delete [] m_index;
}
template<typename ... Args>
T* obtain(Args... args) {
if(m_indexPosition == m_size) {
grow();
}
return new (m_index[m_indexPosition ++]) T(args...);
}
void free(T* entry) {
entry->~T();
m_index[--m_indexPosition] = entry;
}
};
template<typename T>
class FastQueue {
public:
FastQueue(Bench<T>& pBench)
: bench(pBench)
, first(nullptr)
, last(nullptr)
{}
~FastQueue(){
clear();
}
Bench<T>& bench;
T* first;
T* last;
void pushFront(T* entry) {
entry->_ref = first;
first = entry;
if(last == nullptr) {
last = first;
}
}
void pushBack(T* entry) {
if(last == nullptr) {
first = entry;
last = entry;
} else {
last->_ref = entry;
last = entry;
}
}
T* popFront() {
T* result = first;
first = first->next;
if(first == nullptr) {
last = nullptr;
}
return result;
}
void popFrontNoData() {
T* result = first;
first = first->next;
if(first == nullptr) {
last = nullptr;
}
bench.free(result);
}
void moveEntry(FastQueue& fromQueue, FastQueue& toQueue, T* entry, T* prevEntry){
if(prevEntry == nullptr) {
toQueue.pushFront(fromQueue.popFront());
} else if(entry->next == nullptr) {
toQueue.pushBack(entry);
fromQueue.m_last = prevEntry;
prevEntry->next = nullptr;
} else {
prevEntry->next = entry->next;
toQueue.pushBack(entry);
}
}
void clear() {
T* curr = first;
while (curr != nullptr) {
T* next = curr->_ref;
bench.free(curr);
curr = next;
}
first = nullptr;
last = nullptr;
}
};
}}
#endif /* FastQueue_hpp */

View File

@ -26,6 +26,8 @@
namespace oatpp { namespace data{ namespace stream {
const char* ChunkedBuffer::ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA = "ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA";
const char* const ChunkedBuffer::CHUNK_POOL_NAME = "ChunkedBuffer_Chunk_Pool";
const os::io::Library::v_size ChunkedBuffer::CHUNK_ENTRY_SIZE_INDEX_SHIFT = 11;
@ -208,6 +210,64 @@ bool ChunkedBuffer::flushToStream(const std::shared_ptr<OutputStream>& stream){
return true;
}
oatpp::async::Action ChunkedBuffer::flushToStreamAsync(const std::shared_ptr<OutputStream>& stream) {
struct LocalState{
LocalState(const std::shared_ptr<OutputStream> pStream,
ChunkEntry* pCurr,
os::io::Library::v_size pPosition)
: stream(pStream)
, curr(pCurr)
, position(pPosition)
{}
const std::shared_ptr<OutputStream> stream;
ChunkEntry* curr;
os::io::Library::v_size position;
};
auto state = std::make_shared<LocalState>(stream, m_firstEntry, m_size);
return oatpp::async::Routine::_do({
[state] {
if(state->curr == nullptr) {
return oatpp::async::Action::_return();
}
if(state->position > CHUNK_ENTRY_SIZE) {
auto res = state->stream->write(state->curr->chunk, CHUNK_ENTRY_SIZE);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return oatpp::async::Action::_wait_retry();
} else if(res != CHUNK_ENTRY_SIZE) {
oatpp::async::Error e(ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA);
return oatpp::async::Action(e);
}
state->position -= res;
} else {
auto res = state->stream->write(state->curr->chunk, state->position);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return oatpp::async::Action::_wait_retry();
} else if(res != state->position) {
oatpp::async::Error e(ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA);
return oatpp::async::Action(e);
}
state->position -= res;
}
state->curr = state->curr->next;
if(state->curr == nullptr) {
return oatpp::async::Action::_return();
}
return oatpp::async::Action::_repeat();
}
, nullptr
});
}
std::shared_ptr<ChunkedBuffer::Chunks> ChunkedBuffer::getChunks() {
auto chunks = Chunks::createShared();
auto curr = m_firstEntry;

View File

@ -28,10 +28,13 @@
#include "./Stream.hpp"
#include "../../collection/LinkedList.hpp"
#include "../../async/Routine.hpp"
namespace oatpp { namespace data{ namespace stream {
class ChunkedBuffer : public oatpp::base::Controllable, public OutputStream {
public:
static const char* ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA;
public:
OBJECT_POOL(ChunkedBuffer_Pool, ChunkedBuffer, 32)
SHARED_OBJECT_POOL(Shared_ChunkedBuffer_Pool, ChunkedBuffer, 32)
@ -154,6 +157,7 @@ public:
}
bool flushToStream(const std::shared_ptr<OutputStream>& stream);
oatpp::async::Action flushToStreamAsync(const std::shared_ptr<OutputStream>& stream);
std::shared_ptr<Chunks> getChunks();

View File

@ -98,6 +98,34 @@ os::io::Library::v_size OutputStreamBufferedProxy::flush() {
return 0;
}
oatpp::async::Action OutputStreamBufferedProxy::flushAsync() {
auto _this = this->getSharedPtr<OutputStreamBufferedProxy>();
return oatpp::async::Routine::_do({
[_this] {
auto amount = _this->m_posEnd - _this->m_pos;
if(amount > 0){
os::io::Library::v_size result = _this->m_outputStream->write(&_this->m_buffer[_this->m_pos], amount);
if(result == amount){
_this->m_pos = 0;
_this->m_posEnd = 0;
return oatpp::async::Action::_return();
} else if(result == IOStream::ERROR_TRY_AGAIN) {
return oatpp::async::Action::_wait_retry();
} else if(result > 0){
_this->m_pos += (v_bufferSize) result;
}
return oatpp::async::Action(oatpp::async::Error("OutputStreamBufferedProxy. Failed to flush all data"));
}
return oatpp::async::Action::_return();
}, nullptr
});
}
os::io::Library::v_size InputStreamBufferedProxy::read(void *data, os::io::Library::v_size count) {
if (m_pos == 0 && m_posEnd == 0) {

View File

@ -27,6 +27,7 @@
#include "./Stream.hpp"
#include "../buffer/IOBuffer.hpp"
#include "../../async/Routine.hpp"
namespace oatpp { namespace data{ namespace stream {
@ -78,6 +79,7 @@ public:
os::io::Library::v_size write(const void *data, os::io::Library::v_size count) override;
os::io::Library::v_size flush();
oatpp::async::Action flushAsync();
void setBufferPosition(v_bufferSize pos, v_bufferSize posEnd){
m_pos = pos;

View File

@ -27,16 +27,29 @@
#include "../../../../../../oatpp-lib/core/src/data/stream/Stream.hpp"
#include "../../../../../../oatpp-lib/core/src/collection/ListMap.hpp"
#include "../../../../../../oatpp-lib/core/src/async/Routine.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
class Body {
protected:
typedef oatpp::async::Action Action;
protected:
typedef oatpp::collection::ListMap<base::String::PtrWrapper, base::String::PtrWrapper> Headers;
typedef oatpp::data::stream::OutputStream OutputStream;
public:
virtual void declareHeaders(const std::shared_ptr<Headers>& headers) = 0;
/**
* Do not call this method if stream::write is non blocking!
* For fast (not network) BLOCKING streams only!!!
*/
virtual void writeToStream(const std::shared_ptr<OutputStream>& stream) = 0;
/**
* For NON blocking streams only!!!
*/
virtual Action writeToStreamAsync(const std::shared_ptr<OutputStream>& stream) = 0;
};
}}}}}

View File

@ -57,6 +57,16 @@ public:
stream->write(m_buffer);
}
Action writeToStreamAsync(const std::shared_ptr<OutputStream>& stream) override {
auto readCount = stream->write(m_buffer);
if(readCount > 0) {
return oatpp::async::Action::_continue();
} else if(readCount == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return oatpp::async::Action::_wait_retry();
}
return oatpp::async::Action::_abort();
}
};
}}}}}

View File

@ -24,3 +24,8 @@
#include "ChunkedBufferBody.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
const char* ChunkedBufferBody::ERROR_FAILED_TO_WRITE_DATA = "ERROR_FAILED_TO_WRITE_DATA";
}}}}}

View File

@ -34,6 +34,8 @@
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
class ChunkedBufferBody : public oatpp::base::Controllable, public Body {
public:
static const char* ERROR_FAILED_TO_WRITE_DATA;
public:
OBJECT_POOL(Http_Outgoing_ChunkedBufferBody_Pool, ChunkedBufferBody, 32)
SHARED_OBJECT_POOL(Shared_Http_Outgoing_ChunkedBufferBody_Pool, ChunkedBufferBody, 32)
@ -84,6 +86,82 @@ public:
}
}
Action writeToStreamAsync(const std::shared_ptr<OutputStream>& stream) override {
if(m_chunked){
struct LocalState {
LocalState(const std::shared_ptr<OutputStream>& pStream,
const std::shared_ptr<oatpp::data::stream::ChunkedBuffer::Chunks>& pChunks,
oatpp::data::stream::ChunkedBuffer::Chunks::LinkedListNode* pCurr)
: stream(pStream)
, chunks(pChunks)
, curr(pCurr)
, whileState(0)
{}
std::shared_ptr<OutputStream> stream;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer::Chunks> chunks;
oatpp::data::stream::ChunkedBuffer::Chunks::LinkedListNode* curr;
v_int32 whileState; ///< introduced for optimization only
};
auto chunks = m_buffer->getChunks();
auto state = std::make_shared<LocalState>(stream, chunks, chunks->getFirstNode());
return oatpp::async::Routine::_do({
[state] {
auto curr = state->curr;
if(curr == nullptr) {
state->whileState = 3;
}
if(state->whileState == 0) {
auto res = state->stream->write(oatpp::utils::conversion::primitiveToStr(curr->getData()->size, "%X\r\n"));
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return oatpp::async::Action::_wait_retry();
} else if(res < 1) {
return oatpp::async::Action(oatpp::async::Error(ERROR_FAILED_TO_WRITE_DATA));
}
state->whileState = 1;
} else if(state->whileState == 1) {
auto res = state->stream->write(curr->getData()->data, curr->getData()->size);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return oatpp::async::Action::_wait_retry();
} else if(res < curr->getData()->size) {
return oatpp::async::Action(oatpp::async::Error(ERROR_FAILED_TO_WRITE_DATA));
}
state->whileState = 2;
} else if(state->whileState == 2) {
auto res = state->stream->write("\r\n", 2);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return oatpp::async::Action::_wait_retry();
} else if(res < 2) {
return oatpp::async::Action(oatpp::async::Error(ERROR_FAILED_TO_WRITE_DATA));
}
state->whileState = 3;
} else if(state->whileState == 3) {
auto res = state->stream->write("0\r\n\r\n", 5);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return oatpp::async::Action::_wait_retry();
} else if(res < 5) {
return oatpp::async::Action(oatpp::async::Error(ERROR_FAILED_TO_WRITE_DATA));
}
return oatpp::async::Action::_continue();
}
state->curr = curr->getNext();
return oatpp::async::Action::_repeat();
}, nullptr
});
} else {
return m_buffer->flushToStreamAsync(stream);
}
}
};
}}}}}

View File

@ -24,6 +24,8 @@
#include "./Response.hpp"
#include "../../../../../core/src/data/stream/ChunkedBuffer.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
void Response::send(const std::shared_ptr<data::stream::OutputStream>& stream){
@ -58,33 +60,58 @@ void Response::send(const std::shared_ptr<data::stream::OutputStream>& stream){
oatpp::async::Action Response::sendAsync(const std::shared_ptr<data::stream::OutputStream>& stream){
if(body){
body->declareHeaders(headers);
} else {
headers->put(Header::CONTENT_LENGTH, "0");
}
auto buffer = oatpp::data::stream::ChunkedBuffer::createShared();
stream->write("HTTP/1.1 ", 9);
stream->writeAsString(status.code);
stream->write(" ", 1);
stream->OutputStream::write(status.description);
stream->write("\r\n", 2);
auto _this = this->getSharedPtr<Response>();
auto curr = headers->getFirstEntry();
while(curr != nullptr){
stream->write(curr->getKey()->getData(), curr->getKey()->getSize());
stream->write(": ", 2);
stream->write(curr->getValue()->getData(), curr->getValue()->getSize());
stream->write("\r\n", 2);
curr = curr->getNext();
}
stream->write("\r\n", 2);
if(body) {
body->writeToStream(stream);
}
return nullptr; // TODO make Async
return oatpp::async::Routine::_do({
[_this, stream, buffer] {
if(_this->body){
_this->body->declareHeaders(_this->headers);
} else {
_this->headers->put(Header::CONTENT_LENGTH, "0");
}
buffer->write("HTTP/1.1 ", 9);
buffer->writeAsString(_this->status.code);
buffer->write(" ", 1);
buffer->OutputStream::write(_this->status.description);
buffer->write("\r\n", 2);
auto curr = _this->headers->getFirstEntry();
while(curr != nullptr){
buffer->write(curr->getKey()->getData(), curr->getKey()->getSize());
buffer->write(": ", 2);
buffer->write(curr->getValue()->getData(), curr->getValue()->getSize());
buffer->write("\r\n", 2);
curr = curr->getNext();
}
buffer->write("\r\n", 2);
return oatpp::async::Action::_continue();
}, nullptr
})._then({
[stream, buffer] {
return buffer->flushToStreamAsync(stream);
}, nullptr
})._then({
[_this, stream] {
if(_this->body) {
return _this->body->writeToStreamAsync(stream);
} else {
return oatpp::async::Action::_return();
}
}, nullptr
});
}

View File

@ -70,7 +70,7 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::d
if(error.error == HttpProcessor::RETURN_KEEP_ALIVE) {
return oatpp::async::Action::_repeat();
}
return oatpp::async::Action(nullptr);
return oatpp::async::Action::_abort();
}
});

View File

@ -98,54 +98,104 @@ HttpProcessor::processRequest(HttpRouter* router,
}
std::shared_ptr<protocol::http::outgoing::Response>
HttpProcessor::getResponse(HttpRouter* router,
oatpp::os::io::Library::v_size firstReadCount,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
const std::shared_ptr<ConnectionState>& connectionState) {
oatpp::async::Action
HttpProcessor::getResponseAsync(HttpRouter* router,
oatpp::os::io::Library::v_size firstReadCount,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
const std::shared_ptr<ConnectionState>& connectionState,
std::shared_ptr<protocol::http::outgoing::Response>& response) {
oatpp::parser::ParsingCaret caret((p_char8) connectionState->ioBuffer->getData(), connectionState->ioBuffer->getSize());
auto line = protocol::http::Protocol::parseRequestStartingLine(caret);
struct LocalState {
LocalState(HttpRouter* pRouter,
oatpp::os::io::Library::v_size pFirstReadCount,
std::shared_ptr<protocol::http::outgoing::Response>& pResponse)
: router(pRouter)
, firstReadCount(pFirstReadCount)
, response(pResponse)
{}
HttpRouter* router;
oatpp::os::io::Library::v_size firstReadCount;
std::shared_ptr<protocol::http::outgoing::Response>& response;
};
auto state = std::make_shared<LocalState>(router, firstReadCount, response);
return oatpp::async::Routine::_do({
[state, errorHandler, connectionState] {
oatpp::parser::ParsingCaret caret((p_char8) connectionState->ioBuffer->getData(), connectionState->ioBuffer->getSize());
auto line = protocol::http::Protocol::parseRequestStartingLine(caret);
if(!line){
state->response = errorHandler->handleError(protocol::http::Status::CODE_400, "Can't read starting line");
return oatpp::async::Action::_return();
}
auto route = state->router->getRoute(line->method, line->path);
if(route.isNull()) {
state->response = errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
return oatpp::async::Action::_return();
}
oatpp::web::protocol::http::Status error;
auto headers = protocol::http::Protocol::parseHeaders(caret, error);
if(error.code != 0){
state->response = errorHandler->handleError(error, " Can't parse headers");
return oatpp::async::Action::_return();
}
auto bodyStream = connectionState->inStream;
bodyStream->setBufferPosition(caret.getPosition(), (v_int32) state->firstReadCount);
auto request = protocol::http::incoming::Request::createShared(line, route.matchMap, headers, bodyStream);
return oatpp::async::Action(oatpp::async::Routine::_do({
[state, request, route] {
state->response = route.processUrl(request); // TODO // Should be async here
return oatpp::async::Action::_continue();
}, [state, errorHandler] (const oatpp::async::Error& error) {
if(error.isExceptionThrown) {
try{
throw;
} catch (HttpError& error) {
state->response = errorHandler->handleError(error.getStatus(), error.getMessage());
} catch (std::exception& error) {
state->response = errorHandler->handleError(protocol::http::Status::CODE_500, error.what());
} catch (...) {
state->response = errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error");
}
} else {
state->response = errorHandler->handleError(protocol::http::Status::CODE_500, error.error);
}
return oatpp::async::Action::_continue();
}
})._then({
[state, connectionState, request] {
state->response->headers->putIfNotExists(protocol::http::Header::SERVER,
protocol::http::Header::Value::SERVER);
connectionState->keepAlive = HttpProcessor::considerConnectionKeepAlive(request, state->response);
return oatpp::async::Action::_return();
}, nullptr
}));
}, nullptr
});
if(!line){
return errorHandler->handleError(protocol::http::Status::CODE_400, "Can't read starting line");
}
auto route = router->getRoute(line->method, line->path);
if(!route.isNull()) {
oatpp::web::protocol::http::Status error;
auto headers = protocol::http::Protocol::parseHeaders(caret, error);
if(error.code != 0){
return errorHandler->handleError(error, " Can't parse headers");
}
auto bodyStream = connectionState->inStream;
bodyStream->setBufferPosition(caret.getPosition(), (v_int32) firstReadCount);
auto request = protocol::http::incoming::Request::createShared(line, route.matchMap, headers, bodyStream);
std::shared_ptr<protocol::http::outgoing::Response> response;
try{
response = route.processUrl(request);
} catch (HttpError& error) {
return errorHandler->handleError(error.getStatus(), error.getMessage());
} catch (std::exception& error) {
return errorHandler->handleError(protocol::http::Status::CODE_500, error.what());
} catch (...) {
return errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error");
}
response->headers->putIfNotExists(protocol::http::Header::SERVER,
protocol::http::Header::Value::SERVER);
connectionState->keepAlive = HttpProcessor::considerConnectionKeepAlive(request, response);
return response;
} else {
return errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
}
}
oatpp::async::Action
@ -159,13 +209,13 @@ HttpProcessor::processRequestAsync(HttpRouter* router,
: connectionState(pConnectionState)
, ioBuffer(pConnectionState->ioBuffer->getData())
, ioBufferSize(pConnectionState->ioBuffer->getSize())
, retries(0)
, response(nullptr)
{}
const std::shared_ptr<ConnectionState>& connectionState;
void* ioBuffer;
v_int32 ioBufferSize;
oatpp::os::io::Library::v_size readCount;
v_int32 retries;
std::shared_ptr<protocol::http::outgoing::Response> response;
};
auto state = std::make_shared<LocaleState>(connectionState);
@ -176,7 +226,7 @@ HttpProcessor::processRequestAsync(HttpRouter* router,
state->readCount = state->connectionState->connection->read(state->ioBuffer, state->ioBufferSize);
if(state->readCount > 0) {
return oatpp::async::Action(nullptr);
return oatpp::async::Action::_continue();
} else if(state->readCount == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return oatpp::async::Action::_wait_retry();
}
@ -188,20 +238,23 @@ HttpProcessor::processRequestAsync(HttpRouter* router,
[state, router, errorHandler] {
auto response = getResponse(router, state->readCount, errorHandler, state->connectionState);
return oatpp::async::Routine::_do({
[state, response] {
state->connectionState->outStream->setBufferPosition(0, 0);
return response->sendAsync(state->connectionState->outStream);
[state, router, errorHandler] {
return getResponseAsync(router, state->readCount, errorHandler, state->connectionState, state->response);
}, nullptr
})._then({
[state] {
state->connectionState->outStream->flush();
return nullptr;
state->connectionState->outStream->setBufferPosition(0, 0);
return state->response->sendAsync(state->connectionState->outStream);
}, nullptr
})._then({
[state] {
return state->connectionState->outStream->flushAsync();
}, nullptr
});
@ -213,10 +266,10 @@ HttpProcessor::processRequestAsync(HttpRouter* router,
[state] {
if(state->connectionState->keepAlive){
oatpp::async::Error error { RETURN_KEEP_ALIVE, false };
oatpp::async::Error error(RETURN_KEEP_ALIVE);
return oatpp::async::Action(error);
}
return oatpp::async::Action(nullptr);
return oatpp::async::Action::_return();
}, nullptr

View File

@ -26,6 +26,7 @@ class HttpProcessor {
public:
static const char* RETURN_KEEP_ALIVE;
public:
class ConnectionState {
public:
SHARED_OBJECT_POOL(ConnectionState_Pool, ConnectionState, 32)
@ -45,11 +46,12 @@ public:
private:
static std::shared_ptr<protocol::http::outgoing::Response>
getResponse(HttpRouter* router,
oatpp::os::io::Library::v_size firstReadCount,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
const std::shared_ptr<ConnectionState>& connectionState);
static oatpp::async::Action
getResponseAsync(HttpRouter* router,
oatpp::os::io::Library::v_size firstReadCount,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
const std::shared_ptr<ConnectionState>& connectionState,
std::shared_ptr<protocol::http::outgoing::Response>& response);
public:
static bool considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,

View File

@ -54,11 +54,11 @@ public:
, matchMap(pMatchMap)
{}
ReturnType processUrl(const Param& param){
ReturnType processUrl(const Param& param) const {
return m_subscriber->processUrl(param);
}
bool isNull(){
bool isNull() const {
return m_subscriber == nullptr;
}