Merge pull request #1 from oatpp/async_io

Async IO
This commit is contained in:
Leonid Stryzhevskyi 2018-03-28 17:14:26 +03:00 committed by GitHub
commit cb84ad0553
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 2060 additions and 165 deletions

View File

@ -0,0 +1,61 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "Coroutine.hpp"
namespace oatpp { namespace async {
const Action Action::_WAIT_RETRY(TYPE_WAIT_RETRY, nullptr, nullptr);
const Action Action::_REPEAT(TYPE_REPEAT, nullptr, nullptr);
const Action Action::_FINISH(TYPE_FINISH, nullptr, nullptr);
const Action Action::_ABORT(TYPE_ABORT, nullptr, nullptr);
Action::Action(v_int32 type,
AbstractCoroutine* coroutine,
FunctionPtr functionPtr)
: m_type(type)
, m_coroutine(coroutine)
, m_functionPtr(functionPtr)
, m_error(Error(nullptr))
{}
Action::Action(const Error& error)
: m_type(TYPE_ERROR)
, m_coroutine(nullptr)
, m_functionPtr(nullptr)
, m_error(error)
{}
bool Action::isError(){
return m_type == TYPE_ERROR;
}
void Action::free() {
if(m_coroutine != nullptr) {
m_coroutine->free();
m_coroutine = nullptr;
}
}
}}

View File

@ -0,0 +1,355 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_async_Coroutine_hpp
#define oatpp_async_Coroutine_hpp
#include "../collection/FastQueue.hpp"
#include "../base/memory/MemoryPool.hpp"
#include "../base/Environment.hpp"
namespace oatpp { namespace async {
class AbstractCoroutine; // FWD
class Processor; // FWD
class Error {
public:
Error(const char* pMessage, bool pIsExceptionThrown = false)
: message(pMessage)
, isExceptionThrown(pIsExceptionThrown)
{}
const char* message;
bool isExceptionThrown;
};
class Action {
friend Processor;
friend AbstractCoroutine;
public:
typedef Action (AbstractCoroutine::*FunctionPtr)();
public:
static constexpr const v_int32 TYPE_COROUTINE = 0;
static constexpr const v_int32 TYPE_YIELD_TO = 1;
static constexpr const v_int32 TYPE_WAIT_RETRY = 2;
static constexpr const v_int32 TYPE_REPEAT = 3;
static constexpr const v_int32 TYPE_FINISH = 4;
static constexpr const v_int32 TYPE_ABORT = 5;
static constexpr const v_int32 TYPE_ERROR = 6;
public:
static const Action _WAIT_RETRY;
static const Action _REPEAT;
static const Action _FINISH;
static const Action _ABORT;
private:
v_int32 m_type;
AbstractCoroutine* m_coroutine;
FunctionPtr m_functionPtr;
Error m_error;
protected:
void free();
public:
Action(v_int32 type,
AbstractCoroutine* coroutine,
FunctionPtr functionPtr);
Action(const Error& error);
bool isError();
};
class AbstractCoroutine {
friend oatpp::collection::FastQueue<AbstractCoroutine>;
friend Processor;
public:
typedef Action Action;
typedef Action (AbstractCoroutine::*FunctionPtr)();
public:
class MemberCaller {
private:
void* m_objectPtr;
public:
MemberCaller(void* objectPtr)
: m_objectPtr(objectPtr)
{}
template<typename ReturnType, typename T, typename ...Args>
ReturnType call(ReturnType (T::*f)(), Args... args){
MemberCaller* caller = static_cast<MemberCaller*>(m_objectPtr);
return (caller->*reinterpret_cast<ReturnType (MemberCaller::*)(Args...)>(f))(args...);
}
};
private:
AbstractCoroutine* _CP = this;
FunctionPtr _FP = &AbstractCoroutine::act;
AbstractCoroutine* _ref = nullptr;
Action takeAction(const Action& action){
switch (action.m_type) {
case Action::TYPE_COROUTINE:
action.m_coroutine->m_parent = _CP;
_CP = action.m_coroutine;
_FP = action.m_coroutine->_FP;
break;
case Action::TYPE_FINISH:
if(_CP == this) {
_CP = nullptr;
return action;
}
{
AbstractCoroutine* savedCP = _CP;
_CP = _CP->m_parent;
Action a = takeAction(savedCP->m_parentReturnAction);
savedCP->m_parentReturnAction.m_coroutine = nullptr;
savedCP->free();
return a;
}
break;
case Action::TYPE_YIELD_TO:
_FP = action.m_functionPtr;
break;
case Action::TYPE_ABORT:
while (_CP != this) {
_CP->free();
_CP = _CP->m_parent;
}
_CP = nullptr;
break;
case Action::TYPE_ERROR:
Action a = action;
do {
a = _CP->handleError(a.m_error);
if(a.m_type == Action::TYPE_ERROR) {
if(_CP == this) {
_CP = nullptr;
return a;
} else {
_CP->free();
_CP = _CP->m_parent;
}
} else {
a = takeAction(a);
}
} while (a.m_type == Action::TYPE_ERROR && _CP != nullptr);
return a;
break;
};
return action;
}
private:
AbstractCoroutine* m_parent = nullptr;
protected:
Action m_parentReturnAction = Action::_FINISH;
public:
Action iterate() {
try {
return takeAction(_CP->call(_FP));
} catch (...) {
return takeAction(Action(Error("Exception", true)));
}
};
/*Action iterate(v_int32 numIterations) {
Action action(Action::TYPE_FINISH, nullptr, nullptr);
for(v_int32 i = 0; i < numIterations; i++) {
action = takeAction(_CP->call(_FP));
if(action.m_type == Action::TYPE_WAIT_RETRY || _CP == nullptr) {
return action;
}
}
return action;
};*/
virtual ~AbstractCoroutine(){
m_parentReturnAction.free();
}
virtual Action act() = 0;
virtual Action call(FunctionPtr ptr) = 0;
virtual void free() = 0;
virtual MemberCaller getMemberCaller() const = 0;
virtual Action handleError(const Error& error) {
return error;
}
template<typename ...Args>
Action callWithParams(FunctionPtr ptr, Args... args) {
return getMemberCaller().call<Action>(ptr, args...);
}
template<typename C, typename ... Args>
Action startCoroutine(const Action& actionOnReturn, Args... args) {
C* coroutine = C::getBench().obtain(args...);
coroutine->m_parentReturnAction = actionOnReturn;
return Action(Action::TYPE_COROUTINE, coroutine, nullptr);
}
template<typename CoroutineType, typename ParentCoroutineType, typename ... CallbackArgs, typename ...Args>
Action startCoroutineForResult(Action (ParentCoroutineType::*function)(CallbackArgs...), Args... args) {
CoroutineType* coroutine = CoroutineType::getBench().obtain(args...);
coroutine->m_callback = reinterpret_cast<FunctionPtr>(function);
return Action(Action::TYPE_COROUTINE, coroutine, nullptr);
}
bool finished() const {
return _CP == nullptr;
}
AbstractCoroutine* getParent() const {
return m_parent;
}
};
template<class T>
class Coroutine : public AbstractCoroutine {
public:
typedef Action (T::*Function)();
typedef oatpp::base::memory::Bench<T> Bench;
public:
static Bench& getBench(){
static thread_local Bench bench(512);
return bench;
}
public:
Action call(FunctionPtr ptr) override {
Function f = static_cast<Function>(ptr);
return (static_cast<T*>(this)->*f)();
}
void free() override {
Coroutine<T>::getBench().free(static_cast<T*>(this));
}
MemberCaller getMemberCaller() const override {
return MemberCaller((void*) this);
}
Action yieldTo(Function function) const {
return Action(Action::TYPE_YIELD_TO, nullptr, static_cast<FunctionPtr>(function));
}
const Action& waitRetry() const {
return Action::_WAIT_RETRY;
}
const Action& repeat() const {
return Action::_REPEAT;
}
const Action& finish() const {
return Action::_FINISH;
}
const Action& abort() const {
return Action::_ABORT;
}
Action error(const char* message) {
return Action(Error(message));
}
};
template<class T, typename ...Args>
class CoroutineWithResult : public AbstractCoroutine {
friend AbstractCoroutine;
public:
typedef Action (T::*Function)();
typedef oatpp::base::memory::Bench<T> Bench;
public:
static Bench& getBench(){
static thread_local Bench bench(512);
return bench;
}
private:
FunctionPtr m_callback;
public:
virtual Action call(FunctionPtr ptr) override {
Function f = static_cast<Function>(ptr);
return (static_cast<T*>(this)->*f)();
}
virtual void free() override {
CoroutineWithResult<T, Args...>::getBench().free(static_cast<T*>(this));
}
MemberCaller getMemberCaller() const override {
return MemberCaller((void*) this);
}
Action yieldTo(Function function) const {
return Action(Action::TYPE_YIELD_TO, nullptr, static_cast<FunctionPtr>(function));
}
const Action& waitRetry() const {
return Action::_WAIT_RETRY;
}
const Action& repeat() const {
return Action::_REPEAT;
}
const Action& _return(Args... args) {
m_parentReturnAction = getParent()->callWithParams(m_callback, args...);
return Action::_FINISH;
}
const Action& abort() const {
return Action::_ABORT;
}
Action error(const char* message) {
return Action(Error(message));
}
};
}}
#endif /* oatpp_async_Coroutine_hpp */

View File

@ -0,0 +1,100 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "Processor.hpp"
namespace oatpp { namespace async {
bool Processor::checkWaitingQueue() {
bool hasActions = false;
AbstractCoroutine* curr = m_waitingQueue.first;
AbstractCoroutine* prev = nullptr;
while (curr != nullptr) {
const Action& action = curr->iterate();
if(action.m_type == Action::TYPE_ABORT) {
m_waitingQueue.removeEntry(curr, prev);
if(prev != nullptr) {
curr = prev;
} else {
curr = m_waitingQueue.first;
}
} else if(action.m_type != Action::TYPE_WAIT_RETRY) {
oatpp::collection::FastQueue<AbstractCoroutine>::moveEntry(m_waitingQueue, m_activeQueue, curr, prev);
hasActions = true;
if(prev != nullptr) {
curr = prev;
} else {
curr = m_waitingQueue.first;
}
}
prev = curr;
if(curr != nullptr) {
curr = curr->_ref;
}
}
return hasActions;
}
bool Processor::countdownToSleep() {
++ m_sleepCountdown;
if(m_sleepCountdown > 1000) {
return checkWaitingQueue();
}
checkWaitingQueue();
std::this_thread::yield();
return true;
}
void Processor::addCoroutine(AbstractCoroutine* coroutine) {
m_activeQueue.pushBack(coroutine);
}
bool Processor::iterate(v_int32 numIterations) {
for(v_int32 i = 0; i < numIterations; i++) {
auto CP = m_activeQueue.first;
if(CP == nullptr) {
break;
} else {
m_sleepCountdown = 0;
}
if(!CP->finished()) {
const Action& action = CP->iterate();
if(action.m_type == Action::TYPE_WAIT_RETRY) {
m_waitingQueue.pushBack(m_activeQueue.popFront());
}
} else {
m_activeQueue.popFrontNoData();
}
}
return ((m_activeQueue.first != nullptr) || countdownToSleep());
}
}}

View File

@ -0,0 +1,53 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_async_Processor_hpp
#define oatpp_async_Processor_hpp
#include "./Coroutine.hpp"
#include "../collection/FastQueue.hpp"
namespace oatpp { namespace async {
class Processor {
private:
bool checkWaitingQueue();
bool countdownToSleep();
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_activeQueue;
oatpp::collection::FastQueue<AbstractCoroutine> m_waitingQueue;
private:
v_int32 m_sleepCountdown = 0;
public:
void addCoroutine(AbstractCoroutine* coroutine);
bool iterate(v_int32 numIterations);
};
}}
#endif /* oatpp_async_Processor_hpp */

View File

@ -92,7 +92,7 @@ public:
return pool;
}
public:
ThreadLocalPoolSharedObjectAllocator(const ThreadLocalPoolSharedObjectAllocator& info)
ThreadLocalPoolSharedObjectAllocator(const AllocatorPoolInfo& info)
: m_poolInfo(info)
{};

View File

@ -181,6 +181,83 @@ public:
void* obtain();
};
template<typename T>
class Bench {
private:
class Block {
public:
Block(p_char8 mem, Block* pNext)
: memory(mem)
, next(pNext)
{}
p_char8 memory;
Block* next;
};
private:
void grow(){
v_int32 newSize = m_size + m_growSize;
T** newIndex = new T*[newSize];
memcmp(newIndex, m_index, m_size);
Block* b = new Block(new v_char8 [m_growSize * sizeof(T)], m_blocks);
m_blocks = b;
for(v_int32 i = 0; i < m_growSize; i++) {
newIndex[m_size + i] = (T*) (&b->memory[i * sizeof(T)]);
}
delete [] m_index;
m_size = newSize;
m_index = newIndex;
}
private:
v_int32 m_growSize;
v_int32 m_size;
v_int32 m_indexPosition;
Block* m_blocks;
T** m_index;
public:
Bench(v_int32 growSize)
: m_growSize(growSize)
, m_size(0)
, m_indexPosition(0)
, m_blocks(nullptr)
, m_index(nullptr)
{
grow();
}
~Bench(){
auto curr = m_blocks;
while (curr != nullptr) {
auto next = curr->next;
delete curr;
curr = next;
}
delete [] m_index;
}
template<typename ... Args>
T* obtain(Args... args) {
if(m_indexPosition == m_size) {
grow();
}
return new (m_index[m_indexPosition ++]) T(args...);
}
void free(T* entry) {
entry->~T();
m_index[--m_indexPosition] = entry;
}
};
}}}
#endif /* oatpp_base_memory_MemoryPool_hpp */

View File

@ -0,0 +1,10 @@
//
// FastQueue.cpp
// crud
//
// Created by Leonid on 3/21/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "FastQueue.hpp"

View File

@ -0,0 +1,115 @@
//
// FastQueue.hpp
// crud
//
// Created by Leonid on 3/21/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_collection_FastQueue_hpp
#define oatpp_collection_FastQueue_hpp
#include "../concurrency/SpinLock.hpp"
#include "../base/Environment.hpp"
namespace oatpp { namespace collection {
template<typename T>
class FastQueue {
public:
FastQueue()
: first(nullptr)
, last(nullptr)
{}
~FastQueue(){
clear();
}
T* first;
T* last;
void pushFront(T* entry) {
entry->_ref = first;
first = entry;
if(last == nullptr) {
last = first;
}
}
void pushBack(T* entry) {
entry->_ref = nullptr;
if(last == nullptr) {
first = entry;
last = entry;
} else {
last->_ref = entry;
last = entry;
}
}
T* popFront() {
T* result = first;
first = first->_ref;
if(first == nullptr) {
last = nullptr;
}
return result;
}
void popFrontNoData() {
T* result = first;
first = first->_ref;
if(first == nullptr) {
last = nullptr;
}
result->free();
}
void removeEntry(T* entry, T* prevEntry){
if(prevEntry == nullptr) {
popFrontNoData();
} else if(entry->_ref == nullptr) {
prevEntry->_ref = nullptr;
last = prevEntry;
entry->free();
} else {
prevEntry->_ref = entry->_ref;
entry->free();
}
}
static void moveEntry(FastQueue& fromQueue, FastQueue& toQueue, T* entry, T* prevEntry){
if(prevEntry == nullptr) {
toQueue.pushFront(fromQueue.popFront());
} else if(entry->_ref == nullptr) {
toQueue.pushBack(entry);
fromQueue.last = prevEntry;
prevEntry->_ref = nullptr;
} else {
prevEntry->_ref = entry->_ref;
toQueue.pushBack(entry);
}
}
void clear() {
T* curr = first;
while (curr != nullptr) {
T* next = curr->_ref;
curr->free();
curr = next;
}
first = nullptr;
last = nullptr;
}
};
}}
#endif /* FastQueue_hpp */

View File

@ -53,30 +53,30 @@ public:
: m_info(info)
{}
Info& getInfo(){
const Info& getInfo() const {
return m_info;
}
virtual void write(const std::shared_ptr<oatpp::data::stream::OutputStream>& stream,
const type::AbstractPtrWrapper& variant) = 0;
const type::AbstractPtrWrapper& variant) const = 0;
virtual type::AbstractPtrWrapper read(const std::shared_ptr<oatpp::parser::ParsingCaret>& caret,
const type::Type* const type) = 0;
const type::Type* const type) const = 0;
std::shared_ptr<oatpp::base::String> writeToString(const type::AbstractPtrWrapper& variant){
std::shared_ptr<oatpp::base::String> writeToString(const type::AbstractPtrWrapper& variant) const {
auto stream = stream::ChunkedBuffer::createShared();
write(stream, variant);
return stream->toString();
}
template<class Class>
typename Class::PtrWrapper readFromCaret(const std::shared_ptr<oatpp::parser::ParsingCaret>& caret) {
typename Class::PtrWrapper readFromCaret(const std::shared_ptr<oatpp::parser::ParsingCaret>& caret) const {
auto type = Class::PtrWrapper::Class::getType();
return oatpp::base::static_wrapper_cast<typename Class::PtrWrapper::ObjectType>(read(caret, type));
}
template<class Class>
typename Class::PtrWrapper readFromString(const oatpp::base::String::PtrWrapper& str) {
typename Class::PtrWrapper readFromString(const oatpp::base::String::PtrWrapper& str) const {
auto type = Class::PtrWrapper::Class::getType();
auto caret = oatpp::parser::ParsingCaret::createShared(str.getPtr());
return oatpp::base::static_wrapper_cast<typename Class::PtrWrapper::ObjectType>(read(caret, type));

View File

@ -26,6 +26,8 @@
namespace oatpp { namespace data{ namespace stream {
const char* ChunkedBuffer::ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA = "ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA";
const char* const ChunkedBuffer::CHUNK_POOL_NAME = "ChunkedBuffer_Chunk_Pool";
const os::io::Library::v_size ChunkedBuffer::CHUNK_ENTRY_SIZE_INDEX_SHIFT = 11;
@ -208,6 +210,67 @@ bool ChunkedBuffer::flushToStream(const std::shared_ptr<OutputStream>& stream){
return true;
}
oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
const std::shared_ptr<OutputStream>& stream) {
class FlushCoroutine : public oatpp::async::Coroutine<FlushCoroutine> {
private:
std::shared_ptr<ChunkedBuffer> m_chunkedBuffer;
std::shared_ptr<OutputStream> m_stream;
ChunkEntry* m_currEntry;
os::io::Library::v_size m_bytesLeft;
public:
FlushCoroutine(const std::shared_ptr<ChunkedBuffer>& chunkedBuffer,
const std::shared_ptr<OutputStream>& stream)
: m_chunkedBuffer(chunkedBuffer)
, m_stream(stream)
, m_currEntry(chunkedBuffer->m_firstEntry)
, m_bytesLeft(chunkedBuffer->m_size)
{}
Action act() override {
if(m_currEntry == nullptr) {
return finish();
}
if(m_bytesLeft > CHUNK_ENTRY_SIZE) {
auto res = m_stream->write(m_currEntry->chunk, CHUNK_ENTRY_SIZE);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return waitRetry();
} else if(res != CHUNK_ENTRY_SIZE) {
return error(ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA);
}
m_bytesLeft -= res;
} else {
auto res = m_stream->write(m_currEntry->chunk, m_bytesLeft);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return waitRetry();
} else if(res != m_bytesLeft) {
return error(ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA);
}
m_bytesLeft -= res;
}
m_currEntry = m_currEntry->next;
if(m_currEntry == nullptr) {
return finish();
}
return repeat();
}
};
return parentCoroutine->startCoroutine<FlushCoroutine>(actionOnFinish,
getSharedPtr<ChunkedBuffer>(),
stream);
}
std::shared_ptr<ChunkedBuffer::Chunks> ChunkedBuffer::getChunks() {
auto chunks = Chunks::createShared();
auto curr = m_firstEntry;

View File

@ -28,10 +28,13 @@
#include "./Stream.hpp"
#include "../../collection/LinkedList.hpp"
#include "../../async/Coroutine.hpp"
namespace oatpp { namespace data{ namespace stream {
class ChunkedBuffer : public oatpp::base::Controllable, public OutputStream {
public:
static const char* ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA;
public:
OBJECT_POOL(ChunkedBuffer_Pool, ChunkedBuffer, 32)
SHARED_OBJECT_POOL(Shared_ChunkedBuffer_Pool, ChunkedBuffer, 32)
@ -154,6 +157,9 @@ public:
}
bool flushToStream(const std::shared_ptr<OutputStream>& stream);
oatpp::async::Action flushToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
const std::shared_ptr<OutputStream>& stream);
std::shared_ptr<Chunks> getChunks();

View File

@ -27,6 +27,10 @@
namespace oatpp { namespace data{ namespace stream {
os::io::Library::v_size IOStream::ERROR_NOTHING_TO_READ = -1001;
os::io::Library::v_size IOStream::ERROR_CLOSED = -1002;
os::io::Library::v_size IOStream::ERROR_TRY_AGAIN = -1003;
os::io::Library::v_size OutputStream::writeAsString(v_int32 value){
v_char8 a[100];
v_int32 size = utils::conversion::int32ToCharSequence(value, &a[0]);

View File

@ -67,6 +67,10 @@ public:
};
class IOStream : public InputStream, public OutputStream {
public:
static os::io::Library::v_size ERROR_NOTHING_TO_READ;
static os::io::Library::v_size ERROR_CLOSED;
static os::io::Library::v_size ERROR_TRY_AGAIN;
public:
typedef os::io::Library::v_size v_size;
};

View File

@ -98,6 +98,44 @@ os::io::Library::v_size OutputStreamBufferedProxy::flush() {
return 0;
}
oatpp::async::Action OutputStreamBufferedProxy::flushAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish) {
class FlushCoroutine : public oatpp::async::Coroutine<FlushCoroutine> {
private:
std::shared_ptr<OutputStreamBufferedProxy> m_stream;
public:
FlushCoroutine(const std::shared_ptr<OutputStreamBufferedProxy>& stream)
: m_stream(stream)
{}
Action act() override {
auto amount = m_stream->m_posEnd - m_stream->m_pos;
if(amount > 0){
os::io::Library::v_size result = m_stream->m_outputStream->write(&m_stream->m_buffer[m_stream->m_pos], amount);
if(result == amount){
m_stream->m_pos = 0;
m_stream->m_posEnd = 0;
return finish();
} else if(result == IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if(result > 0){
m_stream->m_pos += (v_bufferSize) result;
}
return error("OutputStreamBufferedProxy. Failed to flush all data");
}
return finish();
}
};
return parentCoroutine->startCoroutine<FlushCoroutine>(actionOnFinish, getSharedPtr<OutputStreamBufferedProxy>());
}
os::io::Library::v_size InputStreamBufferedProxy::read(void *data, os::io::Library::v_size count) {
if (m_pos == 0 && m_posEnd == 0) {

View File

@ -27,6 +27,7 @@
#include "./Stream.hpp"
#include "../buffer/IOBuffer.hpp"
#include "../../async/Coroutine.hpp"
namespace oatpp { namespace data{ namespace stream {
@ -78,6 +79,8 @@ public:
os::io::Library::v_size write(const void *data, os::io::Library::v_size count) override;
os::io::Library::v_size flush();
oatpp::async::Action flushAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish);
void setBufferPosition(v_bufferSize pos, v_bufferSize posEnd){
m_pos = pos;

View File

@ -0,0 +1,53 @@
//
// AsyncConnection.cpp
// crud
//
// Created by Leonid on 3/16/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "AsyncConnection.hpp"
namespace oatpp { namespace network {
AsyncConnection::AsyncConnection(Library::v_handle handle)
: m_handle(handle)
{}
AsyncConnection::~AsyncConnection(){
close();
}
AsyncConnection::Library::v_size AsyncConnection::write(const void *buff, Library::v_size count){
auto result = Library::handle_write(m_handle, buff, count); // Socket should be non blocking!!!
if(result < 0) {
auto e = errno;
//OATPP_LOGD("write", "errno=%d", e);
if(e == EAGAIN || e == EWOULDBLOCK){
return ERROR_TRY_AGAIN;
}
}
return result;
}
AsyncConnection::Library::v_size AsyncConnection::read(void *buff, Library::v_size count){
//OATPP_LOGD("AsyncConnection", "read. handler=%d", m_handle);
auto result = Library::handle_read(m_handle, buff, count); // Socket should be non blocking!!!
if(result < 0) {
auto e = errno;
//OATPP_LOGD("read", "errno=%d", e);
if(e == EAGAIN || e == EWOULDBLOCK){
return ERROR_TRY_AGAIN;
}
}
return result;
}
void AsyncConnection::close(){
//OATPP_LOGD("Connection", "close()");
Library::handle_close(m_handle);
}
}}

View File

@ -0,0 +1,49 @@
//
// AsyncConnection.hpp
// crud
//
// Created by Leonid on 3/16/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_network_AsyncConnection_hpp
#define oatpp_network_AsyncConnection_hpp
#include "../../../oatpp-lib/core/src/base/memory/ObjectPool.hpp"
#include "../../../oatpp-lib/core/src/data/stream/Stream.hpp"
#include "../../../oatpp-lib/core/src/os/io/Library.hpp"
namespace oatpp { namespace network {
class AsyncConnection : public oatpp::base::Controllable, public oatpp::data::stream::IOStream {
public:
typedef oatpp::os::io::Library Library;
public:
OBJECT_POOL(AsyncConnection_Pool, AsyncConnection, 32);
SHARED_OBJECT_POOL(Shared_AsyncConnection_Pool, AsyncConnection, 32);
private:
Library::v_handle m_handle;
public:
AsyncConnection(Library::v_handle handle);
public:
static std::shared_ptr<AsyncConnection> createShared(Library::v_handle handle){
return Shared_AsyncConnection_Pool::allocateShared(handle);
}
~AsyncConnection();
Library::v_size write(const void *buff, Library::v_size count) override;
Library::v_size read(void *buff, Library::v_size count) override;
void close();
Library::v_handle getHandle(){
return m_handle;
}
};
}}
#endif /* oatpp_network_AsyncConnection_hpp */

View File

@ -59,24 +59,7 @@ v_int32 Connection::shutdownWrite(){
return ::shutdown(m_handle, SHUT_WR);
}
void Connection::prepareGracefulDisconnect(){
if(::shutdown(m_handle, SHUT_WR) == 0){
v_int32 times = 0;
while(::shutdown(m_handle, SHUT_WR) == 0){
times++;
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
if(times > 0){
OATPP_LOGD("Server", "Connection tries to shutdown = %d", times);
}
}
}
void Connection::close(){
//prepareGracefulDisconnect(); // TODO remove this
Library::handle_close(m_handle);
}

View File

@ -45,7 +45,6 @@ public:
SHARED_OBJECT_POOL(Shared_Connection_Pool, Connection, 32);
private:
Library::v_handle m_handle;
void prepareGracefulDisconnect();
public:
Connection(Library::v_handle handle);
public:

View File

@ -0,0 +1,17 @@
//
// IORequest.cpp
// crud
//
// Created by Leonid on 3/15/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "IORequest.hpp"
namespace oatpp { namespace network { namespace io {
const v_int32 IORequest::TYPE_DONE = 0;
const v_int32 IORequest::TYPE_READ = 1;
const v_int32 IORequest::TYPE_WRITE = 2;
}}}

View File

@ -0,0 +1,46 @@
//
// IORequest.hpp
// crud
//
// Created by Leonid on 3/15/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_web_server_io_IORequest_hpp
#define oatpp_web_server_io_IORequest_hpp
#include "../../../core/src/data/stream/Stream.hpp"
#include <atomic>
namespace oatpp { namespace network { namespace io {
class IORequest {
public:
static const v_int32 TYPE_DONE;
static const v_int32 TYPE_READ;
static const v_int32 TYPE_WRITE;
public:
IORequest(const std::shared_ptr<oatpp::data::stream::IOStream>& pStream,
void* pData,
os::io::Library::v_size pSize,
v_int32 pType)
: stream(pStream)
, data(pData)
, size(pSize)
, type(pType)
, actualSize(0)
{}
const std::shared_ptr<oatpp::data::stream::IOStream>& stream;
void* data;
const os::io::Library::v_size size;
v_int32 type;
os::io::Library::v_size actualSize;
};
}}}
#endif /* oatpp_web_server_io_IORequest_hpp */

9
network/src/io/Queue.cpp Normal file
View File

@ -0,0 +1,9 @@
//
// Queue.cpp
// crud
//
// Created by Leonid on 3/15/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "Queue.hpp"

81
network/src/io/Queue.hpp Normal file
View File

@ -0,0 +1,81 @@
//
// Queue.hpp
// crud
//
// Created by Leonid on 3/15/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_web_server_io_Queue_hpp
#define oatpp_web_server_io_Queue_hpp
#include "./IORequest.hpp"
#include "../../../core/src/base/memory/ObjectPool.hpp"
#include "../../../core/src/concurrency/SpinLock.hpp"
namespace oatpp { namespace network { namespace io {
class Queue {
public:
static Queue& getInstance(){
static Queue queue;
return queue;
}
public:
class Entry {
public:
OBJECT_POOL_THREAD_LOCAL(IO_Queue_Entry_Pool, Entry, 32)
public:
Entry(IORequest& pRequest, Entry* pNext)
: request(pRequest)
, next(pNext)
{}
IORequest& request;
Entry* next;
};
private:
oatpp::concurrency::SpinLock::Atom m_atom;
Entry* m_first;
Entry* m_last;
public:
Queue()
: m_atom(false)
, m_first(nullptr)
, m_last(nullptr)
{}
Entry* popFront() {
oatpp::concurrency::SpinLock lock(m_atom);
auto result = m_first;
if(m_first != nullptr) {
m_first = m_first->next;
if(m_first == nullptr) {
m_last = nullptr;
}
}
return result;
}
void pushBack(IORequest& request) {
pushBack(new Entry(request, nullptr));
}
void pushBack(Entry* entry) {
oatpp::concurrency::SpinLock lock(m_atom);
entry->next = nullptr;
if(m_last != nullptr) {
m_last->next = entry;
m_last = entry;
} else {
m_first = entry;
m_last = entry;
}
}
};
}}}
#endif /* oatpp_web_server_io_Queue_hpp */

View File

@ -24,6 +24,7 @@
#include "./SimpleTCPConnectionProvider.hpp"
#include "../AsyncConnection.hpp"
#include "../Connection.hpp"
#include "../../../../oatpp-lib/core/src/utils/ConversionUtils.hpp"
@ -69,7 +70,7 @@ oatpp::os::io::Library::v_handle SimpleTCPConnectionProvider::instantiateServer(
return -1 ;
}
ret = listen(serverHandle, 128);
ret = listen(serverHandle, 10000);
if(ret < 0) {
oatpp::os::io::Library::handle_close(serverHandle);
return -1 ;
@ -97,9 +98,18 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getC
}
}
fcntl(handle, F_SETFL, 0);//O_NONBLOCK);
int flags = 0;
if(m_nonBlocking) {
flags |= O_NONBLOCK;
}
return Connection::createShared(handle);
fcntl(handle, F_SETFL, flags);
if(m_nonBlocking) {
return AsyncConnection::createShared(handle);
} else {
return Connection::createShared(handle);
}
}

View File

@ -36,18 +36,20 @@ namespace oatpp { namespace network { namespace server {
class SimpleTCPConnectionProvider : public base::Controllable, public ServerConnectionProvider {
private:
oatpp::os::io::Library::v_handle m_serverHandle;
bool m_nonBlocking;
private:
oatpp::os::io::Library::v_handle instantiateServer();
public:
SimpleTCPConnectionProvider(v_word16 port)
SimpleTCPConnectionProvider(v_word16 port, bool nonBlocking = false)
: ServerConnectionProvider(port)
, m_nonBlocking(nonBlocking)
{
m_serverHandle = instantiateServer();
}
public:
static std::shared_ptr<SimpleTCPConnectionProvider> createShared(v_word16 port){
return std::shared_ptr<SimpleTCPConnectionProvider>(new SimpleTCPConnectionProvider(port));
static std::shared_ptr<SimpleTCPConnectionProvider> createShared(v_word16 port, bool nonBlocking = false){
return std::shared_ptr<SimpleTCPConnectionProvider>(new SimpleTCPConnectionProvider(port, nonBlocking));
}
std::shared_ptr<IOStream> getConnection() override;

View File

@ -34,7 +34,7 @@ namespace oatpp { namespace parser { namespace json { namespace mapping {
class ObjectMapper : public oatpp::base::Controllable, public oatpp::data::mapping::ObjectMapper {
private:
static Info& getMapperInfo(){
static Info& getMapperInfo() {
static Info info("application/json");
return info;
}
@ -54,13 +54,13 @@ public:
}
void write(const std::shared_ptr<oatpp::data::stream::OutputStream>& stream,
const oatpp::data::mapping::type::AbstractPtrWrapper& variant) override {
const oatpp::data::mapping::type::AbstractPtrWrapper& variant) const override {
Serializer::serialize(stream, variant);
}
oatpp::data::mapping::type::AbstractPtrWrapper
read(const std::shared_ptr<oatpp::parser::ParsingCaret>& caret,
const oatpp::data::mapping::type::Type* const type) override {
const oatpp::data::mapping::type::Type* const type) const override {
return Deserializer::deserialize(caret, deserializerConfig, type);
}

View File

@ -55,8 +55,8 @@ public:
template<class Type>
static typename Type::PtrWrapper decodeToDTO(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper){
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper){
return objectMapper->readFromString<Type>(decodeToString(headers, bodyStream));
}

View File

@ -36,6 +36,7 @@ public:
OBJECT_POOL(Incoming_Request_Pool, Request, 32)
SHARED_OBJECT_POOL(Shared_Incoming_Request_Pool, Request, 32)
public:
Request(){}
Request(const std::shared_ptr<http::RequestStartingLine>& pStartingLine,
const std::shared_ptr<url::mapping::Pattern::MatchMap>& pPathVariables,
const std::shared_ptr<http::Protocol::Headers>& pHeaders,
@ -54,10 +55,10 @@ public:
return Shared_Incoming_Request_Pool::allocateShared(startingLine, pathVariables, headers, bodyStream);
}
const std::shared_ptr<http::RequestStartingLine> startingLine;
const std::shared_ptr<url::mapping::Pattern::MatchMap> pathVariables;
const std::shared_ptr<http::Protocol::Headers> headers;
const std::shared_ptr<oatpp::data::stream::InputStream> bodyStream;
std::shared_ptr<http::RequestStartingLine> startingLine;
std::shared_ptr<url::mapping::Pattern::MatchMap> pathVariables;
std::shared_ptr<http::Protocol::Headers> headers;
std::shared_ptr<oatpp::data::stream::InputStream> bodyStream;
base::String::PtrWrapper getHeader(const base::String::PtrWrapper& headerName) const{
auto entry = headers->find(headerName);

View File

@ -27,16 +27,29 @@
#include "../../../../../../oatpp-lib/core/src/data/stream/Stream.hpp"
#include "../../../../../../oatpp-lib/core/src/collection/ListMap.hpp"
#include "../../../../../../oatpp-lib/core/src/async/Coroutine.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
class Body {
protected:
typedef oatpp::async::Action Action;
protected:
typedef oatpp::collection::ListMap<base::String::PtrWrapper, base::String::PtrWrapper> Headers;
typedef oatpp::data::stream::OutputStream OutputStream;
public:
virtual void declareHeaders(const std::shared_ptr<Headers>& headers) = 0;
/**
* Do not call this method if stream::write is non blocking!
* For fast (not network) BLOCKING streams only!!!
*/
virtual void writeToStream(const std::shared_ptr<OutputStream>& stream) = 0;
virtual Action writeToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
const std::shared_ptr<OutputStream>& stream) = 0;
};
}}}}}

View File

@ -57,6 +57,40 @@ public:
stream->write(m_buffer);
}
public:
class WriteToStreamCoroutine : public oatpp::async::Coroutine<WriteToStreamCoroutine> {
private:
std::shared_ptr<BufferBody> m_body;
std::shared_ptr<OutputStream> m_stream;
public:
WriteToStreamCoroutine(const std::shared_ptr<BufferBody>& body,
const std::shared_ptr<OutputStream>& stream)
: m_body(body)
, m_stream(stream)
{}
Action act() override {
auto readCount = m_stream->write(m_body->m_buffer);
if(readCount > 0) {
return finish();
} else if(readCount == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return waitRetry();
}
return abort();
}
};
public:
Action writeToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
const std::shared_ptr<OutputStream>& stream) override {
return parentCoroutine->startCoroutine<WriteToStreamCoroutine>(actionOnReturn, getSharedPtr<BufferBody>(), stream);
}
};
}}}}}

View File

@ -24,3 +24,8 @@
#include "ChunkedBufferBody.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
const char* ChunkedBufferBody::ERROR_FAILED_TO_WRITE_DATA = "ERROR_FAILED_TO_WRITE_DATA";
}}}}}

View File

@ -34,6 +34,8 @@
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
class ChunkedBufferBody : public oatpp::base::Controllable, public Body {
public:
static const char* ERROR_FAILED_TO_WRITE_DATA;
public:
OBJECT_POOL(Http_Outgoing_ChunkedBufferBody_Pool, ChunkedBufferBody, 32)
SHARED_OBJECT_POOL(Shared_Http_Outgoing_ChunkedBufferBody_Pool, ChunkedBufferBody, 32)
@ -84,6 +86,82 @@ public:
}
}
public:
class WriteToStreamCoroutine : public oatpp::async::Coroutine<WriteToStreamCoroutine> {
private:
std::shared_ptr<ChunkedBufferBody> m_body;
std::shared_ptr<OutputStream> m_stream;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer::Chunks> m_chunks;
oatpp::data::stream::ChunkedBuffer::Chunks::LinkedListNode* m_currChunk;
v_int32 m_whileState;
public:
WriteToStreamCoroutine(const std::shared_ptr<ChunkedBufferBody>& body,
const std::shared_ptr<OutputStream>& stream)
: m_body(body)
, m_stream(stream)
, m_chunks(m_body->m_buffer->getChunks())
, m_currChunk(m_chunks->getFirstNode())
, m_whileState(0)
{}
Action act() override {
if(m_currChunk == nullptr) {
m_whileState = 3;
}
if(m_whileState == 0) {
auto res = m_stream->write(oatpp::utils::conversion::primitiveToStr(m_currChunk->getData()->size, "%X\r\n"));
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if(res < 1) {
return error(ERROR_FAILED_TO_WRITE_DATA);
}
m_whileState = 1;
} else if(m_whileState == 1) {
auto res = m_stream->write(m_currChunk->getData()->data, m_currChunk->getData()->size);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if(res < m_currChunk->getData()->size) {
return error(ERROR_FAILED_TO_WRITE_DATA);
}
m_whileState = 2;
} else if(m_whileState == 2) {
auto res = m_stream->write("\r\n", 2);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if(res < 2) {
return error(ERROR_FAILED_TO_WRITE_DATA);
}
m_whileState = 3;
} else if(m_whileState == 3) {
auto res = m_stream->write("0\r\n\r\n", 5);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if(res < 5) {
return error(ERROR_FAILED_TO_WRITE_DATA);
}
return finish();
}
m_currChunk = m_currChunk->getNext();
return repeat();
}
};
Action writeToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnFinish,
const std::shared_ptr<OutputStream>& stream) override {
if(m_chunked) {
return parentCoroutine->startCoroutine<WriteToStreamCoroutine>(actionOnFinish, getSharedPtr<ChunkedBufferBody>(), stream);
} else {
return m_buffer->flushToStreamAsync(parentCoroutine, actionOnFinish, stream);
}
}
};
}}}}}

View File

@ -37,6 +37,8 @@ public:
OBJECT_POOL(Outgoing_Request_Pool, Request, 32)
SHARED_OBJECT_POOL(Shared_Outgoing_Request_Pool, Request, 32)
public:
Request() {}
Request(const base::String::PtrWrapper& pMethod,
const base::String::PtrWrapper& pPath,
const std::shared_ptr<Headers>& pHeaders,
@ -46,6 +48,7 @@ public:
, headers((!pHeaders) ? (Headers::createShared()) : (pHeaders))
, body(pBody)
{}
public:
static std::shared_ptr<Request> createShared(const base::String::PtrWrapper& method,

View File

@ -24,6 +24,8 @@
#include "./Response.hpp"
#include "../../../../../core/src/data/stream/ChunkedBuffer.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
void Response::send(const std::shared_ptr<data::stream::OutputStream>& stream){
@ -56,4 +58,68 @@ void Response::send(const std::shared_ptr<data::stream::OutputStream>& stream){
}
oatpp::async::Action Response::sendAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
const std::shared_ptr<data::stream::OutputStream>& stream){
class SendAsyncCoroutine : public oatpp::async::Coroutine<SendAsyncCoroutine> {
private:
std::shared_ptr<Response> m_response;
std::shared_ptr<data::stream::OutputStream> m_stream;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_buffer;
public:
SendAsyncCoroutine(const std::shared_ptr<Response>& response,
const std::shared_ptr<data::stream::OutputStream>& stream)
: m_response(response)
, m_stream(stream)
, m_buffer(oatpp::data::stream::ChunkedBuffer::createShared())
{}
Action act() {
if(m_response->body){
m_response->body->declareHeaders(m_response->headers);
} else {
m_response->headers->put(Header::CONTENT_LENGTH, "0");
}
m_buffer->write("HTTP/1.1 ", 9);
m_buffer->writeAsString(m_response->status.code);
m_buffer->write(" ", 1);
m_buffer->OutputStream::write(m_response->status.description);
m_buffer->write("\r\n", 2);
auto curr = m_response->headers->getFirstEntry();
while(curr != nullptr){
m_buffer->write(curr->getKey()->getData(), curr->getKey()->getSize());
m_buffer->write(": ", 2);
m_buffer->write(curr->getValue()->getData(), curr->getValue()->getSize());
m_buffer->write("\r\n", 2);
curr = curr->getNext();
}
m_buffer->write("\r\n", 2);
return yieldTo(&SendAsyncCoroutine::writeHeaders);
}
Action writeHeaders() {
return m_buffer->flushToStreamAsync(this, yieldTo(&SendAsyncCoroutine::writeBody), m_stream);
}
Action writeBody() {
if(m_response->body) {
return m_response->body->writeToStreamAsync(this, finish(), m_stream);
}
return finish();
}
};
return parentCoroutine->startCoroutine<SendAsyncCoroutine>(actionOnFinish, getSharedPtr<Response>(), stream);
}
}}}}}

View File

@ -27,6 +27,7 @@
#include "./Body.hpp"
#include "./../Http.hpp"
#include "../../../../../core/src/async/Coroutine.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
@ -56,6 +57,10 @@ public:
void send(const std::shared_ptr<data::stream::OutputStream>& stream);
oatpp::async::Action sendAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
const std::shared_ptr<data::stream::OutputStream>& stream);
};
}}}}}

View File

@ -0,0 +1,97 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "./AsyncHttpConnectionHandler.hpp"
#include "../protocol/http/outgoing/ChunkedBufferBody.hpp"
#include "../protocol/http/incoming/Request.hpp"
#include "../protocol/http/Http.hpp"
#include "./HttpError.hpp"
#include "../../../../oatpp-lib/network/src/AsyncConnection.hpp"
#include "../../../../oatpp-lib/core/test/Checker.hpp"
#include <errno.h>
namespace oatpp { namespace web { namespace server {
void AsyncHttpConnectionHandler::Task::consumeConnections(oatpp::async::Processor& processor) {
oatpp::concurrency::SpinLock lock(m_atom);
auto curr = m_connections.getFirstNode();
while (curr != nullptr) {
auto coroutine = HttpProcessor::Coroutine::getBench().obtain(m_router,
m_errorHandler,
curr->getData()->connection,
curr->getData()->ioBuffer,
curr->getData()->outStream,
curr->getData()->inStream);
processor.addCoroutine(coroutine);
curr = curr->getNext();
}
m_connections.clear();
}
void AsyncHttpConnectionHandler::Task::run(){
oatpp::async::Processor processor;
while(true) {
consumeConnections(processor);
while (processor.iterate(100)) {
consumeConnections(processor);
}
//OATPP_LOGD("task", "sleeping");
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){
auto task = m_tasks[m_taskBalancer % m_threadCount];
auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
auto state = HttpProcessor::ConnectionState::createShared();
state->connection = connection;
state->ioBuffer = ioBuffer;
state->outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, ioBuffer);
state->inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, ioBuffer);
task->addConnection(state);
m_taskBalancer ++;
}
}}}

View File

@ -0,0 +1,131 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_web_server_AsyncHttpConnectionHandler_hpp
#define oatpp_web_server_AsyncHttpConnectionHandler_hpp
#include "./HttpProcessor.hpp"
#include "./handler/ErrorHandler.hpp"
#include "./HttpRouter.hpp"
#include "../protocol/http/incoming/Request.hpp"
#include "../protocol/http/outgoing/Response.hpp"
#include "../../../../oatpp-lib/network/src/server/Server.hpp"
#include "../../../../oatpp-lib/network/src/Connection.hpp"
#include "../../../../oatpp-lib/core/src/concurrency/Thread.hpp"
#include "../../../../oatpp-lib/core/src/concurrency/Runnable.hpp"
#include "../../../../oatpp-lib/core/src/data/stream/StreamBufferedProxy.hpp"
#include "../../../../oatpp-lib/core/src/data/buffer/IOBuffer.hpp"
#include "../../../../oatpp-lib/core/src/async/Processor.hpp"
namespace oatpp { namespace web { namespace server {
class AsyncHttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler {
private:
class Task : public base::Controllable, public concurrency::Runnable {
public:
typedef oatpp::collection::LinkedList<std::shared_ptr<HttpProcessor::ConnectionState>> Connections;
private:
void consumeConnections(oatpp::async::Processor& processor);
private:
oatpp::concurrency::SpinLock::Atom m_atom;
Connections m_connections;
private:
HttpRouter* m_router;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
public:
Task(HttpRouter* router,
const std::shared_ptr<handler::ErrorHandler>& errorHandler)
: m_atom(false)
, m_router(router)
, m_errorHandler(errorHandler)
{}
public:
static std::shared_ptr<Task> createShared(HttpRouter* router,
const std::shared_ptr<handler::ErrorHandler>& errorHandler){
return std::make_shared<Task>(router, errorHandler);
}
void run() override;
void addConnection(const std::shared_ptr<HttpProcessor::ConnectionState>& connectionState){
oatpp::concurrency::SpinLock lock(m_atom);
m_connections.pushBack(connectionState);
}
};
private:
std::shared_ptr<HttpRouter> m_router;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
v_int32 m_taskBalancer;
v_int32 m_threadCount;
std::shared_ptr<Task>* m_tasks;
public:
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router)
: m_router(router)
, m_errorHandler(handler::DefaultErrorHandler::createShared())
, m_taskBalancer(0)
, m_threadCount(2)
{
m_tasks = new std::shared_ptr<Task>[m_threadCount];
for(v_int32 i = 0; i < m_threadCount; i++) {
auto task = Task::createShared(m_router.get(), m_errorHandler);
m_tasks[i] = task;
concurrency::Thread thread(task);
thread.detach();
}
}
public:
static std::shared_ptr<AsyncHttpConnectionHandler> createShared(const std::shared_ptr<HttpRouter>& router){
return std::shared_ptr<AsyncHttpConnectionHandler>(new AsyncHttpConnectionHandler(router));
}
~AsyncHttpConnectionHandler(){
delete [] m_tasks;
}
void setErrorHandler(const std::shared_ptr<handler::ErrorHandler>& errorHandler){
m_errorHandler = errorHandler;
if(!m_errorHandler) {
m_errorHandler = handler::DefaultErrorHandler::createShared();
}
}
void handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) override;
};
}}}
#endif /* oatpp_web_server_AsyncHttpConnectionHandler_hpp */

View File

@ -24,6 +24,8 @@
#include "./HttpConnectionHandler.hpp"
#include "./HttpProcessor.hpp"
#include "../protocol/http/outgoing/ChunkedBufferBody.hpp"
#include "../protocol/http/incoming/Request.hpp"
@ -37,91 +39,6 @@
namespace oatpp { namespace web { namespace server {
bool HttpConnectionHandler::considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response){
auto& inKeepAlive = request->headers->get(protocol::http::Header::CONNECTION, nullptr);
if(!inKeepAlive.isNull() && base::String::equalsCI_FAST(inKeepAlive, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE)) {
if(response->headers->putIfNotExists(protocol::http::Header::CONNECTION, inKeepAlive)){
return true;
} else {
auto& outKeepAlive = response->headers->get(protocol::http::Header::CONNECTION, nullptr);
return (!outKeepAlive.isNull() && base::String::equalsCI_FAST(outKeepAlive, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE));
}
} else if(!response->headers->putIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE)) {
auto& outKeepAlive = response->headers->get(protocol::http::Header::CONNECTION, nullptr);
return (!outKeepAlive.isNull() && base::String::equalsCI_FAST(outKeepAlive, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE));
}
return false;
}
std::shared_ptr<protocol::http::outgoing::Response>
HttpConnectionHandler::Task::handleError(const protocol::http::Status& status, const base::String::PtrWrapper& message){
return m_errorHandler->handleError(status, message);
}
std::shared_ptr<protocol::http::outgoing::Response>
HttpConnectionHandler::Task::processRequest(p_char8 buffer,
v_int32 bufferSize,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
bool& keepAlive) {
keepAlive = false;
auto readCount = m_connection->read(buffer, bufferSize);
if(readCount > 0) {
oatpp::parser::ParsingCaret caret(buffer, bufferSize);
auto line = protocol::http::Protocol::parseRequestStartingLine(caret);
if(!line){
return handleError(protocol::http::Status::CODE_400, "Can't read starting line");
}
auto route = m_router->getRoute(line->method, line->path);
if(!route.isNull()) {
oatpp::web::protocol::http::Status error;
auto headers = protocol::http::Protocol::parseHeaders(caret, error);
if(error.code != 0){
return handleError(error, " Can't parse headers");
}
auto bodyStream = inStream;
bodyStream->setBufferPosition(caret.getPosition(), (v_int32) readCount);
auto request = protocol::http::incoming::Request::createShared(line, route.matchMap, headers, bodyStream);
std::shared_ptr<protocol::http::outgoing::Response> response;
try{
response = route.processUrl(request);
} catch (HttpError& error) {
return handleError(error.getStatus(), error.getMessage());
} catch (std::exception& error) {
return handleError(protocol::http::Status::CODE_500, error.what());
} catch (...) {
return handleError(protocol::http::Status::CODE_500, "Unknown error");
}
response->headers->putIfNotExists(protocol::http::Header::SERVER,
protocol::http::Header::Value::SERVER);
keepAlive = considerConnectionKeepAlive(request, response);
return response;
} else {
return handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
}
} else {
return nullptr;
}
}
void HttpConnectionHandler::Task::run(){
//oatpp::test::PerformanceChecker checker("task checker");
@ -135,7 +52,7 @@ void HttpConnectionHandler::Task::run(){
do {
auto response = processRequest(buffer, bufferSize, inStream, keepAlive);
auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, buffer, bufferSize, inStream, keepAlive);
if(response) {
outStream->setBufferPosition(0, 0);
response->send(outStream);

View File

@ -22,8 +22,8 @@
*
***************************************************************************/
#ifndef oatpp_web_server_ConnectionHandler_hpp
#define oatpp_web_server_ConnectionHandler_hpp
#ifndef oatpp_web_server_HttpConnectionHandler_hpp
#define oatpp_web_server_HttpConnectionHandler_hpp
#include "./handler/ErrorHandler.hpp"
@ -44,25 +44,12 @@
namespace oatpp { namespace web { namespace server {
class HttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler {
private:
static bool considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response);
private:
class Task : public base::Controllable, public concurrency::Runnable{
private:
HttpRouter* m_router;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
private:
std::shared_ptr<protocol::http::outgoing::Response>
handleError(const protocol::http::Status& status, const base::String::PtrWrapper& message);
std::shared_ptr<protocol::http::outgoing::Response>
processRequest(p_char8,
v_int32,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>&,
bool& keepAlive);
public:
Task(HttpRouter* router,
@ -112,4 +99,4 @@ public:
}}}
#endif /* oatpp_web_server_ConnectionHandler_hpp */
#endif /* oatpp_web_server_HttpConnectionHandler_hpp */

View File

@ -0,0 +1,198 @@
//
// HttpProcessor.cpp
// crud
//
// Created by Leonid on 3/16/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "HttpProcessor.hpp"
#include "./HttpError.hpp"
namespace oatpp { namespace web { namespace server {
const char* HttpProcessor::RETURN_KEEP_ALIVE = "RETURN_KEEP_ALIVE";
bool HttpProcessor::considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response){
if(request) {
auto& inKeepAlive = request->headers->get(protocol::http::Header::CONNECTION, nullptr);
if(!inKeepAlive.isNull() && base::String::equalsCI_FAST(inKeepAlive, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE)) {
if(response->headers->putIfNotExists(protocol::http::Header::CONNECTION, inKeepAlive)){
return true;
} else {
auto& outKeepAlive = response->headers->get(protocol::http::Header::CONNECTION, nullptr);
return (!outKeepAlive.isNull() && base::String::equalsCI_FAST(outKeepAlive, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE));
}
}
}
if(!response->headers->putIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE)) {
auto& outKeepAlive = response->headers->get(protocol::http::Header::CONNECTION, nullptr);
return (!outKeepAlive.isNull() && base::String::equalsCI_FAST(outKeepAlive, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE));
}
return false;
}
std::shared_ptr<protocol::http::outgoing::Response>
HttpProcessor::processRequest(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
void* buffer,
v_int32 bufferSize,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
bool& keepAlive) {
keepAlive = false;
auto readCount = connection->read(buffer, bufferSize);
if(readCount > 0) {
oatpp::parser::ParsingCaret caret((p_char8)buffer, bufferSize);
auto line = protocol::http::Protocol::parseRequestStartingLine(caret);
if(!line){
return errorHandler->handleError(protocol::http::Status::CODE_400, "Can't read starting line");
}
auto route = router->getRoute(line->method, line->path);
if(!route.isNull()) {
oatpp::web::protocol::http::Status error;
auto headers = protocol::http::Protocol::parseHeaders(caret, error);
if(error.code != 0){
return errorHandler->handleError(error, " Can't parse headers");
}
auto bodyStream = inStream;
bodyStream->setBufferPosition(caret.getPosition(), (v_int32) readCount);
auto request = protocol::http::incoming::Request::createShared(line, route.matchMap, headers, bodyStream);
std::shared_ptr<protocol::http::outgoing::Response> response;
try{
response = route.processUrl(request);
} catch (HttpError& error) {
return errorHandler->handleError(error.getStatus(), error.getMessage());
} catch (std::exception& error) {
return errorHandler->handleError(protocol::http::Status::CODE_500, error.what());
} catch (...) {
return errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error");
}
response->headers->putIfNotExists(protocol::http::Header::SERVER,
protocol::http::Header::Value::SERVER);
keepAlive = HttpProcessor::considerConnectionKeepAlive(request, response);
return response;
} else {
return errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
}
} if(readCount == oatpp::data::stream::IOStream::ERROR_NOTHING_TO_READ) {
keepAlive = true;
}
return nullptr;
}
// HttpProcessor::Coroutine
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::parseRequest(v_int32 readCount) {
oatpp::parser::ParsingCaret caret((p_char8) m_ioBuffer->getData(), readCount);
auto line = protocol::http::Protocol::parseRequestStartingLine(caret);
if(!line){
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_400, "Can't read starting line");
return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
}
m_currentRoute = m_router->getRoute(line->method, line->path);
if(m_currentRoute.isNull()) {
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
}
oatpp::web::protocol::http::Status error;
auto headers = protocol::http::Protocol::parseHeaders(caret, error);
if(error.code != 0){
m_currentResponse = m_errorHandler->handleError(error, " Can't parse headers");
return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
}
auto bodyStream = m_inStream;
bodyStream->setBufferPosition(caret.getPosition(), readCount);
m_currentRequest = protocol::http::incoming::Request::createShared(line, m_currentRoute.matchMap, headers, bodyStream);
return yieldTo(&HttpProcessor::Coroutine::onRequestFormed);
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
auto readCount = m_connection->read(m_ioBuffer->getData(), m_ioBuffer->getSize());
if(readCount > 0) {
return parseRequest((v_int32)readCount);
} else if(readCount == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
}
return abort();
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestFormed() {
HttpRouter::BranchRouter::UrlSubscriber::AsyncCallback callback =
static_cast<HttpRouter::BranchRouter::UrlSubscriber::AsyncCallback>(&HttpProcessor::Coroutine::onResponse);
return m_currentRoute.processUrlAsync(this, callback, m_currentRequest);
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponse(const std::shared_ptr<protocol::http::outgoing::Response>& response) {
m_currentResponse = response;
return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
m_currentResponse->headers->putIfNotExists(protocol::http::Header::SERVER,
protocol::http::Header::Value::SERVER);
m_keepAlive = HttpProcessor::considerConnectionKeepAlive(m_currentRequest, m_currentResponse);
m_outStream->setBufferPosition(0, 0);
return m_currentResponse->sendAsync(this,
m_outStream->flushAsync(
this,
yieldTo(&HttpProcessor::Coroutine::onRequestDone)),
m_outStream);
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestDone() {
if(m_keepAlive) {
return yieldTo(&HttpProcessor::Coroutine::act);
}
return abort();
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::handleError(const oatpp::async::Error& error) {
if (error.isExceptionThrown) {
try{
throw;
} catch (HttpError& error) {
m_currentResponse = m_errorHandler->handleError(error.getStatus(), error.getMessage());
} catch (std::exception& error) {
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_500, error.what());
} catch (...) {
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error");
}
} else {
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_500, error.message);
}
return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
}
}}}

View File

@ -0,0 +1,108 @@
//
// HttpProcessor.hpp
// crud
//
// Created by Leonid on 3/16/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_web_server_HttpProcessor_hpp
#define oatpp_web_server_HttpProcessor_hpp
#include "./HttpRouter.hpp"
#include "./handler/ErrorHandler.hpp"
#include "../protocol/http/incoming/Request.hpp"
#include "../protocol/http/outgoing/Response.hpp"
#include "../../../../oatpp-lib/core/src/data/stream/StreamBufferedProxy.hpp"
#include "../../../../oatpp-lib/core/src/async/Processor.hpp"
namespace oatpp { namespace web { namespace server {
class HttpProcessor {
public:
static const char* RETURN_KEEP_ALIVE;
public:
class ConnectionState {
public:
SHARED_OBJECT_POOL(ConnectionState_Pool, ConnectionState, 32)
public:
static std::shared_ptr<ConnectionState> createShared(){
return ConnectionState_Pool::allocateShared();
}
std::shared_ptr<oatpp::data::stream::IOStream> connection;
std::shared_ptr<oatpp::data::buffer::IOBuffer> ioBuffer;
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> outStream;
std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy> inStream;
};
public:
class Coroutine : public oatpp::async::Coroutine<HttpProcessor::Coroutine> {
private:
Action parseRequest(v_int32 readCount);
private:
HttpRouter* m_router;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_ioBuffer;
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> m_outStream;
std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy> m_inStream;
bool m_keepAlive;
private:
oatpp::web::server::HttpRouter::BranchRouter::Route m_currentRoute;
std::shared_ptr<protocol::http::incoming::Request> m_currentRequest;
std::shared_ptr<protocol::http::outgoing::Response> m_currentResponse;
public:
Coroutine(HttpRouter* router,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& ioBuffer,
const std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy>& outStream,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream)
: m_router(router)
, m_errorHandler(errorHandler)
, m_connection(connection)
, m_ioBuffer(ioBuffer)
, m_outStream(outStream)
, m_inStream(inStream)
, m_keepAlive(true)
{}
Action act() override;
Action onRequestFormed();
Action onResponse(const std::shared_ptr<protocol::http::outgoing::Response>& response);
Action onResponseFormed();
Action onRequestDone();
Action handleError(const oatpp::async::Error& error) override;
};
public:
static bool considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response);
static std::shared_ptr<protocol::http::outgoing::Response>
processRequest(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
void* buffer,
v_int32 bufferSize,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
bool& keepAlive);
};
}}}
#endif /* oatpp_web_server_HttpProcessor_hpp */

View File

@ -34,15 +34,11 @@ namespace oatpp { namespace web { namespace server {
class HttpRouter : public oatpp::base::Controllable {
public:
typedef oatpp::web::url::mapping::Subscriber<
std::shared_ptr<oatpp::web::protocol::http::incoming::Request>,
std::shared_ptr<oatpp::web::protocol::http::outgoing::Response>
> Subscriber;
protected:
typedef oatpp::web::url::mapping::Router<
std::shared_ptr<oatpp::web::protocol::http::incoming::Request>,
std::shared_ptr<oatpp::web::protocol::http::outgoing::Response>
> BranchRouter;
typedef BranchRouter::UrlSubscriber Subscriber;
typedef oatpp::collection::ListMap<oatpp::base::String::PtrWrapper, std::shared_ptr<BranchRouter>> BranchMap;
protected:
std::shared_ptr<BranchMap> m_branchMap;

View File

@ -39,6 +39,8 @@
namespace oatpp { namespace web { namespace server { namespace api {
class ApiController : public oatpp::base::Controllable {
protected:
typedef ApiController __ControllerType;
public:
typedef oatpp::web::server::HttpRouter Router;
typedef oatpp::web::protocol::http::outgoing::ResponseFactory OutgoingResponseFactory;
@ -59,6 +61,23 @@ public:
typedef oatpp::data::mapping::type::Boolean::PtrWrapper Boolean;
protected:
typedef oatpp::async::Action (oatpp::async::AbstractCoroutine::*AsyncCallback)(const std::shared_ptr<OutgoingResponse>&);
protected:
template<class CoroutineT, class ControllerT>
class HandlerCoroutine : public oatpp::async::CoroutineWithResult<CoroutineT, std::shared_ptr<OutgoingResponse>> {
public:
HandlerCoroutine(ControllerT* pController,
const std::shared_ptr<protocol::http::incoming::Request>& pRequest)
: controller(pController)
, request(pRequest)
{}
const ControllerT* controller;
std::shared_ptr<protocol::http::incoming::Request> request;
};
template<class T>
class Handler :
@ -69,24 +88,41 @@ protected:
> {
public:
typedef std::shared_ptr<OutgoingResponse> (T::*Method)(const std::shared_ptr<protocol::http::incoming::Request>&);
typedef Action (T::*MethodAsync)(oatpp::async::AbstractCoroutine*,
AsyncCallback callback,
const std::shared_ptr<protocol::http::incoming::Request>&);
private:
T* m_controller;
Method m_method;
MethodAsync m_methodAsync;
protected:
Handler(T* controller, Method method)
Handler(T* controller, Method method, MethodAsync methodAsync)
: m_controller(controller)
, m_method(method)
, m_methodAsync(methodAsync)
{}
public:
static std::shared_ptr<Handler> createShared(T* controller, Method method){
return std::shared_ptr<Handler>(new Handler(controller, method));
static std::shared_ptr<Handler> createShared(T* controller, Method method, MethodAsync methodAsync){
return std::shared_ptr<Handler>(new Handler(controller, method, methodAsync));
}
std::shared_ptr<OutgoingResponse> processUrl(const std::shared_ptr<protocol::http::incoming::Request>& request) override {
return (m_controller->*m_method)(request);
}
Action processUrlAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
AsyncCallback callback,
const std::shared_ptr<protocol::http::incoming::Request>& request) override {
if(m_methodAsync != nullptr) {
return (m_controller->*m_methodAsync)(parentCoroutine, callback, request);
} else {
return parentCoroutine->callWithParams(reinterpret_cast<oatpp::async::AbstractCoroutine::FunctionPtr>(callback),
m_controller->handleError(Status::CODE_500,
"Using Async model for non async enpoint"));
}
}
};
protected:
@ -106,8 +142,9 @@ public:
static std::shared_ptr<Endpoint> createEndpoint(const std::shared_ptr<Endpoints>& endpoints,
T* controller,
typename Handler<T>::Method method,
typename Handler<T>::MethodAsync methodAsync,
const std::shared_ptr<Endpoint::Info>& info){
auto handler = Handler<T>::createShared(controller, method);
auto handler = Handler<T>::createShared(controller, method, methodAsync);
auto endpoint = Endpoint::createShared(handler, info);
endpoints->pushBack(endpoint);
return endpoint;
@ -145,30 +182,30 @@ public:
}
}
std::shared_ptr<oatpp::data::mapping::ObjectMapper>& getDefaultObjectMapper(){
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& getDefaultObjectMapper() const {
return m_defaultObjectMapper;
}
// Helper methods
std::shared_ptr<OutgoingResponse> createResponse(const Status& status,
const oatpp::base::String::PtrWrapper& str) {
const oatpp::base::String::PtrWrapper& str) const {
return OutgoingResponseFactory::createShared(status, str);
}
std::shared_ptr<OutgoingResponse> createResponse(const Status& status,
const std::shared_ptr<oatpp::data::stream::ChunkedBuffer>& chunkedBuffer) {
const std::shared_ptr<oatpp::data::stream::ChunkedBuffer>& chunkedBuffer) const {
return OutgoingResponseFactory::createShared(status, chunkedBuffer);
}
std::shared_ptr<OutgoingResponse> createDtoResponse(const Status& status,
const oatpp::data::mapping::type::AbstractPtrWrapper& dto,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) {
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return OutgoingResponseFactory::createShared(status, dto, objectMapper.get());
}
std::shared_ptr<OutgoingResponse> createDtoResponse(const Status& status,
const oatpp::data::mapping::type::AbstractPtrWrapper& dto) {
const oatpp::data::mapping::type::AbstractPtrWrapper& dto) const {
return OutgoingResponseFactory::createShared(status, dto, m_defaultObjectMapper.get());
}

View File

@ -95,7 +95,7 @@ public:
public:
static std::shared_ptr<Endpoint> createShared(const std::shared_ptr<RequestHandler>& handler,
const std::shared_ptr<Info>& info){
const std::shared_ptr<Info>& info){
return std::shared_ptr<Endpoint>(new Endpoint(handler, info));
}

View File

@ -252,8 +252,7 @@ std::shared_ptr<Endpoint::Info> Z__EDNPOINT_INFO_GET_INSTANCE_##NAME() { \
setEndpointInfo(#NAME, info); \
} \
return info; \
} \
\
}
#define OATPP_MACRO_API_CONTROLLER_ENDPOINT_DECL_0(NAME, METHOD, PATH, LIST) \
\
@ -268,6 +267,7 @@ std::shared_ptr<Endpoint::Info> Z__CREATE_ENDPOINT_INFO_##NAME() { \
const std::shared_ptr<Endpoint> Z__ENDPOINT_##NAME = createEndpoint(m_endpoints, \
this, \
Z__ENDPOINT_METHOD_##NAME(this), \
nullptr, \
Z__CREATE_ENDPOINT_INFO_##NAME());
#define OATPP_MACRO_API_CONTROLLER_ENDPOINT_0(NAME, METHOD, PATH, LIST) \
@ -298,6 +298,7 @@ std::shared_ptr<Endpoint::Info> Z__CREATE_ENDPOINT_INFO_##NAME() { \
const std::shared_ptr<Endpoint> Z__ENDPOINT_##NAME = createEndpoint(m_endpoints, \
this, \
Z__ENDPOINT_METHOD_##NAME(this), \
nullptr, \
Z__CREATE_ENDPOINT_INFO_##NAME());
#define OATPP_MACRO_API_CONTROLLER_ENDPOINT_1(NAME, METHOD, PATH, LIST) \
@ -337,3 +338,65 @@ OATPP_MACRO_API_CONTROLLER_ENDPOINT__(OATPP_MACRO_HAS_ARGS LIST, NAME, METHOD, P
#define ENDPOINT(METHOD, PATH, NAME, ...) \
OATPP_MACRO_API_CONTROLLER_ENDPOINT___(NAME, METHOD, PATH, (__VA_ARGS__))
// ENDPOINT ASYNC MACRO // ------------------------------------------------------
/**
* 1 - Method to obtain endpoint call function ptr
* 2 - Endpoint info singleton
*/
#define OATPP_MACRO_API_CONTROLLER_ENDPOINT_ASYNC_DECL_DEFAULTS(NAME, METHOD, PATH) \
template<class T> \
static typename Handler<T>::MethodAsync Z__ENDPOINT_METHOD_##NAME(T* controller) { \
return &T::Z__PROXY_METHOD_##NAME; \
} \
\
std::shared_ptr<Endpoint::Info> Z__EDNPOINT_INFO_GET_INSTANCE_##NAME() { \
std::shared_ptr<Endpoint::Info> info = getEndpointInfo(#NAME); \
if(!info){ \
info = Endpoint::Info::createShared(); \
setEndpointInfo(#NAME, info); \
} \
return info; \
}
/**
* 1 - Endpoint info instance
* 2 - Endpoint instance
*/
#define OATPP_MACRO_API_CONTROLLER_ENDPOINT_ASYNC_DECL(NAME, METHOD, PATH) \
\
std::shared_ptr<Endpoint::Info> Z__CREATE_ENDPOINT_INFO_##NAME() { \
auto info = Z__EDNPOINT_INFO_GET_INSTANCE_##NAME(); \
info->name = #NAME; \
info->path = PATH; \
info->method = METHOD; \
return info; \
} \
\
const std::shared_ptr<Endpoint> Z__ENDPOINT_##NAME = createEndpoint(m_endpoints, \
this, \
nullptr, \
Z__ENDPOINT_METHOD_##NAME(this), \
Z__CREATE_ENDPOINT_INFO_##NAME());
#define ENDPOINT_ASYNC(METHOD, PATH, NAME) \
OATPP_MACRO_API_CONTROLLER_ENDPOINT_ASYNC_DECL_DEFAULTS(NAME, METHOD, PATH) \
OATPP_MACRO_API_CONTROLLER_ENDPOINT_ASYNC_DECL(NAME, METHOD, PATH) \
\
oatpp::async::Action Z__PROXY_METHOD_##NAME(oatpp::async::AbstractCoroutine* parentCoroutine, \
oatpp::web::server::api::ApiController::AsyncCallback asyncCallback, \
const std::shared_ptr<oatpp::web::protocol::http::incoming::Request>& __request) \
{ \
return parentCoroutine->startCoroutineForResult<NAME>(asyncCallback, this, __request); \
} \
\
class NAME : public HandlerCoroutine<NAME, __ControllerType>
#define ENDPOINT_ASYNC_INIT(NAME) \
public: \
\
NAME(__ControllerType* pController, \
const std::shared_ptr<IncomingRequest>& pRequest) \
: HandlerCoroutine(pController, pRequest) \
{}

View File

@ -30,7 +30,7 @@
namespace oatpp { namespace web { namespace server { namespace handler {
std::shared_ptr<protocol::http::outgoing::Response>
DefaultErrorHandler::handleError(const protocol::http::Status& status, const base::PtrWrapper<base::String>& message) {
DefaultErrorHandler::handleError(const protocol::http::Status& status, const base::String::PtrWrapper& message) {
auto stream = oatpp::data::stream::ChunkedBuffer::createShared();
stream << "server=" << protocol::http::Header::Value::SERVER << "\n";

View File

@ -35,7 +35,7 @@ public:
virtual
std::shared_ptr<protocol::http::outgoing::Response>
handleError(const protocol::http::Status& status, const base::PtrWrapper<base::String>& message) = 0;
handleError(const protocol::http::Status& status, const base::String::PtrWrapper& message) = 0;
};
@ -50,7 +50,7 @@ public:
}
std::shared_ptr<protocol::http::outgoing::Response>
handleError(const protocol::http::Status& status, const base::PtrWrapper<base::String>& message) override;
handleError(const protocol::http::Status& status, const base::String::PtrWrapper& message) override;
};

View File

@ -49,20 +49,31 @@ public:
UrlSubscriber* m_subscriber;
public:
Route()
: m_subscriber(nullptr)
, matchMap(nullptr)
{}
Route(UrlSubscriber* subscriber, const std::shared_ptr<Pattern::MatchMap>& pMatchMap)
: m_subscriber(subscriber)
, matchMap(pMatchMap)
{}
ReturnType processUrl(const Param& param){
ReturnType processUrl(const Param& param) const {
return m_subscriber->processUrl(param);
}
bool isNull(){
oatpp::async::Action processUrlAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
typename UrlSubscriber::AsyncCallback callback,
const Param& param) const {
return m_subscriber->processUrlAsync(parentCoroutine, callback, param);
}
bool isNull() const {
return m_subscriber == nullptr;
}
const std::shared_ptr<Pattern::MatchMap> matchMap;
std::shared_ptr<Pattern::MatchMap> matchMap;
};

View File

@ -26,13 +26,20 @@
#define oatpp_network_url_Subscriber_hpp
#include "../../../../../oatpp-lib/core/src/base/PtrWrapper.hpp"
#include "../../../../../oatpp-lib/core/src/async/Coroutine.hpp"
namespace oatpp { namespace web { namespace url { namespace mapping {
template<class Param, class ReturnType>
class Subscriber {
public:
typedef oatpp::async::Action Action;
typedef Action (oatpp::async::AbstractCoroutine::*AsyncCallback)(const ReturnType&);
public:
virtual ReturnType processUrl(const Param& param) = 0;
virtual Action processUrlAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
AsyncCallback callback,
const Param& param) = 0;
};
}}}}