mirror of
https://gitee.com/zyjblog/oatpp.git
synced 2024-12-22 22:16:37 +08:00
Integrating Processor2 in progress
This commit is contained in:
parent
87be65af4d
commit
0a49124f94
@ -10,12 +10,9 @@
|
||||
|
||||
namespace oatpp { namespace async {
|
||||
|
||||
const v_int32 Action2::TYPE_COROUTINE = 0;
|
||||
const v_int32 Action2::TYPE_YIELD_TO = 1;
|
||||
const v_int32 Action2::TYPE_WAIT_RETRY = 2;
|
||||
const v_int32 Action2::TYPE_REPEAT = 3;
|
||||
const v_int32 Action2::TYPE_FINISH = 4;
|
||||
const v_int32 Action2::TYPE_ABORT = 5;
|
||||
const v_int32 Action2::TYPE_ERROR = 6;
|
||||
const Action2 Action2::_WAIT_RETRY(TYPE_WAIT_RETRY, nullptr, nullptr);
|
||||
const Action2 Action2::_REPEAT(TYPE_REPEAT, nullptr, nullptr);
|
||||
const Action2 Action2::_FINISH(TYPE_FINISH, nullptr, nullptr);
|
||||
const Action2 Action2::_ABORT(TYPE_ABORT, nullptr, nullptr);
|
||||
|
||||
}}
|
||||
|
@ -10,11 +10,13 @@
|
||||
#define oatpp_async_Coroutine_hpp
|
||||
|
||||
#include "../collection/FastQueue.hpp"
|
||||
#include "../base/memory/MemoryPool.hpp"
|
||||
#include "../base/Environment.hpp"
|
||||
|
||||
namespace oatpp { namespace async {
|
||||
|
||||
class AbstractCoroutine; // FWD
|
||||
class Processor2; // FWD
|
||||
|
||||
class Error2 {
|
||||
public:
|
||||
@ -30,17 +32,23 @@ public:
|
||||
};
|
||||
|
||||
class Action2 {
|
||||
friend Processor2;
|
||||
friend AbstractCoroutine;
|
||||
public:
|
||||
typedef Action2 (AbstractCoroutine::*FunctionPtr)();
|
||||
public:
|
||||
static const v_int32 TYPE_COROUTINE;
|
||||
static const v_int32 TYPE_YIELD_TO;
|
||||
static const v_int32 TYPE_WAIT_RETRY;
|
||||
static const v_int32 TYPE_REPEAT;
|
||||
static const v_int32 TYPE_FINISH;
|
||||
static const v_int32 TYPE_ABORT;
|
||||
static const v_int32 TYPE_ERROR;
|
||||
static constexpr const v_int32 TYPE_COROUTINE = 0;
|
||||
static constexpr const v_int32 TYPE_YIELD_TO = 1;
|
||||
static constexpr const v_int32 TYPE_WAIT_RETRY = 2;
|
||||
static constexpr const v_int32 TYPE_REPEAT = 3;
|
||||
static constexpr const v_int32 TYPE_FINISH = 4;
|
||||
static constexpr const v_int32 TYPE_ABORT = 5;
|
||||
static constexpr const v_int32 TYPE_ERROR = 6;
|
||||
public:
|
||||
static const Action2 _WAIT_RETRY;
|
||||
static const Action2 _REPEAT;
|
||||
static const Action2 _FINISH;
|
||||
static const Action2 _ABORT;
|
||||
private:
|
||||
v_int32 m_type;
|
||||
AbstractCoroutine* m_coroutine;
|
||||
@ -71,6 +79,8 @@ public:
|
||||
};
|
||||
|
||||
class AbstractCoroutine {
|
||||
friend oatpp::collection::FastQueue<AbstractCoroutine>;
|
||||
friend Processor2;
|
||||
public:
|
||||
typedef Action2 Action2;
|
||||
typedef Action2 (AbstractCoroutine::*FunctionPtr)();
|
||||
@ -81,25 +91,43 @@ private:
|
||||
|
||||
const Action2& takeAction(const Action2& action){
|
||||
|
||||
if(action.m_type == Action2::TYPE_COROUTINE) {
|
||||
action.m_coroutine->m_parent = _CP;
|
||||
_CP = action.m_coroutine;
|
||||
_CP->m_returnFP = action.m_functionPtr;
|
||||
_FP = action.m_coroutine->_FP;
|
||||
} else if(action.m_type == Action2::TYPE_FINISH) {
|
||||
do {
|
||||
_FP = _CP->m_returnFP;
|
||||
_CP->free();
|
||||
_CP = _CP->m_parent; // Should be fine here. As free() - return of the pointer to Bench. (Memory not changed)
|
||||
} while (_FP == nullptr && _CP != nullptr);
|
||||
} else if(action.m_type == Action2::TYPE_YIELD_TO) {
|
||||
_FP = action.m_functionPtr;
|
||||
} else if(action.m_type == Action2::TYPE_ABORT) {
|
||||
while (_CP != nullptr) {
|
||||
_CP->free();
|
||||
_CP = _CP->m_parent;
|
||||
}
|
||||
}
|
||||
switch (action.m_type) {
|
||||
|
||||
case Action2::TYPE_COROUTINE:
|
||||
action.m_coroutine->m_parent = _CP;
|
||||
_CP = action.m_coroutine;
|
||||
_FP = action.m_coroutine->_FP;
|
||||
break;
|
||||
|
||||
case Action2::TYPE_FINISH:
|
||||
if(_CP == this) {
|
||||
_CP = nullptr;
|
||||
return action;
|
||||
}
|
||||
do {
|
||||
_CP->free();
|
||||
_CP = _CP->m_parent; // Should be fine here. As free() - return of the pointer to Bench. (Memory not changed)
|
||||
takeAction(_CP->m_savedAction);
|
||||
} while (_FP == nullptr && _CP != this);
|
||||
if(_FP == nullptr &&_CP == this) {
|
||||
_CP = nullptr;
|
||||
return action;
|
||||
}
|
||||
break;
|
||||
|
||||
case Action2::TYPE_YIELD_TO:
|
||||
_FP = action.m_functionPtr;
|
||||
break;
|
||||
|
||||
case Action2::TYPE_ABORT:
|
||||
while (_CP != this) {
|
||||
_CP->free();
|
||||
_CP = _CP->m_parent;
|
||||
}
|
||||
_CP = nullptr;
|
||||
break;
|
||||
|
||||
};
|
||||
|
||||
return action;
|
||||
|
||||
@ -107,15 +135,23 @@ private:
|
||||
|
||||
private:
|
||||
AbstractCoroutine* m_parent = nullptr;
|
||||
FunctionPtr m_returnFP = nullptr; // should be set by processor on startCoroutine
|
||||
protected:
|
||||
Action2 m_savedAction = Action2::_FINISH;
|
||||
public:
|
||||
|
||||
Action2 iterate() {
|
||||
//try {
|
||||
return takeAction(_CP->call(_FP));
|
||||
//} catch (...) {
|
||||
|
||||
//}
|
||||
return takeAction(_CP->call(_FP));
|
||||
};
|
||||
|
||||
Action2 iterate(v_int32 numIterations) {
|
||||
Action2 action(Action2::TYPE_FINISH, nullptr, nullptr);
|
||||
for(v_int32 i = 0; i < numIterations; i++) {
|
||||
action = takeAction(_CP->call(_FP));
|
||||
if(action.m_type == Action2::TYPE_WAIT_RETRY || _CP == nullptr) {
|
||||
return action;
|
||||
}
|
||||
}
|
||||
return action;
|
||||
};
|
||||
|
||||
virtual ~AbstractCoroutine(){
|
||||
@ -139,10 +175,10 @@ template<class T>
|
||||
class Coroutine : public AbstractCoroutine {
|
||||
public:
|
||||
typedef Action2 (T::*Function)();
|
||||
typedef oatpp::collection::Bench<T> Bench;
|
||||
typedef oatpp::base::memory::Bench<T> Bench;
|
||||
public:
|
||||
static Bench& getBench(){
|
||||
static thread_local Bench bench(1024);
|
||||
static thread_local Bench bench(512);
|
||||
return bench;
|
||||
}
|
||||
public:
|
||||
@ -160,30 +196,31 @@ public:
|
||||
Coroutine<T>::getBench().free(static_cast<T*>(this));
|
||||
}
|
||||
|
||||
template<typename C, typename F, typename ... Args>
|
||||
Action2 startCoroutine(F returnToFunction, Args... args) {
|
||||
template<typename C, typename ... Args>
|
||||
Action2 startCoroutine(const Action2& actionOnReturn, Args... args) {
|
||||
m_savedAction = actionOnReturn;
|
||||
C* coroutine = C::getBench().obtain(args...);
|
||||
return Action2(Action2::TYPE_COROUTINE, coroutine, static_cast<FunctionPtr>(returnToFunction));
|
||||
return Action2(Action2::TYPE_COROUTINE, coroutine, nullptr);
|
||||
}
|
||||
|
||||
Action2 yieldTo(Function function) {
|
||||
Action2 yieldTo(Function function) const {
|
||||
return Action2(Action2::TYPE_YIELD_TO, nullptr, static_cast<FunctionPtr>(function));
|
||||
}
|
||||
|
||||
Action2 waitRetry() {
|
||||
return Action2(Action2::TYPE_WAIT_RETRY, nullptr, nullptr);
|
||||
const Action2& waitRetry() const {
|
||||
return Action2::_WAIT_RETRY;
|
||||
}
|
||||
|
||||
Action2 repeat() {
|
||||
return Action2(Action2::TYPE_REPEAT, nullptr, nullptr);
|
||||
const Action2& repeat() const {
|
||||
return Action2::_REPEAT;
|
||||
}
|
||||
|
||||
Action2 finish() {
|
||||
return Action2(Action2::TYPE_FINISH, nullptr, nullptr);
|
||||
const Action2& finish() const {
|
||||
return Action2::_FINISH;
|
||||
}
|
||||
|
||||
Action2 abort() {
|
||||
return Action2(Action2::TYPE_ABORT, nullptr, nullptr);
|
||||
const Action2& abort() const {
|
||||
return Action2::_ABORT;
|
||||
}
|
||||
|
||||
Action2 error(const char* message) {
|
||||
|
@ -9,36 +9,73 @@
|
||||
#ifndef oatpp_async_Processor2_hpp
|
||||
#define oatpp_async_Processor2_hpp
|
||||
|
||||
#include "./Coroutine.hpp"
|
||||
#include "../collection/FastQueue.hpp"
|
||||
#include "../base/Environment.hpp"
|
||||
|
||||
namespace oatpp { namespace async {
|
||||
|
||||
class Processor2 {
|
||||
public:
|
||||
private:
|
||||
|
||||
struct Routine {
|
||||
public:
|
||||
typedef oatpp::collection::FastQueue<Routine> FastQueue;
|
||||
public:
|
||||
Routine* _ref;
|
||||
};
|
||||
|
||||
public:
|
||||
|
||||
struct Entry {
|
||||
public:
|
||||
typedef oatpp::collection::FastQueue<Entry> FastQueue;
|
||||
public:
|
||||
Routine* routine;
|
||||
Entry* _ref;
|
||||
};
|
||||
bool checkWaitingQueue() {
|
||||
bool hasActions = false;
|
||||
AbstractCoroutine* curr = m_waitingQueue.first;
|
||||
AbstractCoroutine* prev = nullptr;
|
||||
while (curr != nullptr) {
|
||||
|
||||
const Action2& action = curr->iterate();
|
||||
if(action.m_type != Action2::TYPE_WAIT_RETRY) {
|
||||
m_waitingQueue.pushBack(m_activeQueue.popFront());
|
||||
hasActions = true;
|
||||
oatpp::collection::FastQueue<AbstractCoroutine>::moveEntry(m_waitingQueue, m_activeQueue, curr, prev);
|
||||
hasActions = true;
|
||||
if(prev != nullptr) {
|
||||
curr = prev;
|
||||
} else {
|
||||
curr = m_waitingQueue.first;
|
||||
}
|
||||
}
|
||||
|
||||
prev = curr;
|
||||
if(curr != nullptr) {
|
||||
curr = curr->_ref;
|
||||
}
|
||||
|
||||
}
|
||||
return hasActions;
|
||||
}
|
||||
|
||||
private:
|
||||
Entry::FastQueue m_activeQueue;
|
||||
Entry::FastQueue m_waitingQueue;
|
||||
oatpp::collection::FastQueue<AbstractCoroutine> m_activeQueue;
|
||||
oatpp::collection::FastQueue<AbstractCoroutine> m_waitingQueue;
|
||||
public:
|
||||
|
||||
void addCoroutine(AbstractCoroutine* coroutine) {
|
||||
m_activeQueue.pushBack(coroutine);
|
||||
}
|
||||
|
||||
bool iterate(v_int32 numIterations) {
|
||||
|
||||
for(v_int32 i = 0; i < numIterations; i++) {
|
||||
|
||||
auto CP = m_activeQueue.first;
|
||||
if(CP == nullptr) {
|
||||
break;
|
||||
}
|
||||
if(!CP->finished()) {
|
||||
const Action2& action = CP->iterate();
|
||||
if(action.m_type == Action2::TYPE_WAIT_RETRY) {
|
||||
m_waitingQueue.pushBack(m_activeQueue.popFront());
|
||||
}
|
||||
} else {
|
||||
m_activeQueue.popFrontNoData();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return (m_activeQueue.first != nullptr);
|
||||
|
||||
}
|
||||
|
||||
|
||||
};
|
||||
|
@ -181,6 +181,83 @@ public:
|
||||
void* obtain();
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
class Bench {
|
||||
private:
|
||||
|
||||
class Block {
|
||||
public:
|
||||
Block(p_char8 mem, Block* pNext)
|
||||
: memory(mem)
|
||||
, next(pNext)
|
||||
{}
|
||||
p_char8 memory;
|
||||
Block* next;
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
void grow(){
|
||||
|
||||
v_int32 newSize = m_size + m_growSize;
|
||||
T** newIndex = new T*[newSize];
|
||||
memcmp(newIndex, m_index, m_size);
|
||||
|
||||
Block* b = new Block(new v_char8 [m_growSize * sizeof(T)], m_blocks);
|
||||
m_blocks = b;
|
||||
for(v_int32 i = 0; i < m_growSize; i++) {
|
||||
newIndex[m_size + i] = (T*) (&b->memory[i * sizeof(T)]);
|
||||
}
|
||||
|
||||
delete [] m_index;
|
||||
m_size = newSize;
|
||||
m_index = newIndex;
|
||||
|
||||
}
|
||||
|
||||
private:
|
||||
v_int32 m_growSize;
|
||||
v_int32 m_size;
|
||||
v_int32 m_indexPosition;
|
||||
Block* m_blocks;
|
||||
T** m_index;
|
||||
public:
|
||||
|
||||
Bench(v_int32 growSize)
|
||||
: m_growSize(growSize)
|
||||
, m_size(0)
|
||||
, m_indexPosition(0)
|
||||
, m_blocks(nullptr)
|
||||
, m_index(nullptr)
|
||||
{
|
||||
grow();
|
||||
}
|
||||
|
||||
~Bench(){
|
||||
auto curr = m_blocks;
|
||||
while (curr != nullptr) {
|
||||
auto next = curr->next;
|
||||
delete curr;
|
||||
curr = next;
|
||||
}
|
||||
delete [] m_index;
|
||||
}
|
||||
|
||||
template<typename ... Args>
|
||||
T* obtain(Args... args) {
|
||||
if(m_indexPosition == m_size) {
|
||||
grow();
|
||||
}
|
||||
return new (m_index[m_indexPosition ++]) T(args...);
|
||||
}
|
||||
|
||||
void free(T* entry) {
|
||||
entry->~T();
|
||||
m_index[--m_indexPosition] = entry;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}}}
|
||||
|
||||
#endif /* oatpp_base_memory_MemoryPool_hpp */
|
||||
|
@ -9,94 +9,17 @@
|
||||
#ifndef oatpp_collection_FastQueue_hpp
|
||||
#define oatpp_collection_FastQueue_hpp
|
||||
|
||||
#include "../concurrency/SpinLock.hpp"
|
||||
#include "../base/Environment.hpp"
|
||||
|
||||
namespace oatpp { namespace collection {
|
||||
|
||||
template<typename T>
|
||||
class Bench {
|
||||
private:
|
||||
|
||||
class Block {
|
||||
public:
|
||||
Block(p_char8 mem, Block* pNext)
|
||||
: memory(mem)
|
||||
, next(pNext)
|
||||
{}
|
||||
p_char8 memory;
|
||||
Block* next;
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
void grow(){
|
||||
|
||||
v_int32 newSize = m_size + m_growSize;
|
||||
T** newIndex = new T*[newSize];
|
||||
memcmp(newIndex, m_index, m_size);
|
||||
|
||||
Block* b = new Block(new v_char8 [m_growSize * sizeof(T)], m_blocks);
|
||||
m_blocks = b;
|
||||
for(v_int32 i = 0; i < m_growSize; i++) {
|
||||
newIndex[m_size + i] = (T*) (&b->memory[i * sizeof(T)]);
|
||||
}
|
||||
|
||||
delete [] m_index;
|
||||
m_size = newSize;
|
||||
m_index = newIndex;
|
||||
|
||||
}
|
||||
|
||||
private:
|
||||
v_int32 m_growSize;
|
||||
v_int32 m_size;
|
||||
v_int32 m_indexPosition;
|
||||
Block* m_blocks;
|
||||
T** m_index;
|
||||
public:
|
||||
|
||||
Bench(v_int32 growSize)
|
||||
: m_growSize(growSize)
|
||||
, m_size(0)
|
||||
, m_indexPosition(0)
|
||||
, m_blocks(nullptr)
|
||||
, m_index(nullptr)
|
||||
{
|
||||
grow();
|
||||
}
|
||||
|
||||
~Bench(){
|
||||
auto curr = m_blocks;
|
||||
while (curr != nullptr) {
|
||||
auto next = curr->next;
|
||||
delete curr;
|
||||
curr = next;
|
||||
}
|
||||
delete [] m_index;
|
||||
}
|
||||
|
||||
template<typename ... Args>
|
||||
T* obtain(Args... args) {
|
||||
if(m_indexPosition == m_size) {
|
||||
grow();
|
||||
}
|
||||
return new (m_index[m_indexPosition ++]) T(args...);
|
||||
}
|
||||
|
||||
void free(T* entry) {
|
||||
entry->~T();
|
||||
m_index[--m_indexPosition] = entry;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
class FastQueue {
|
||||
public:
|
||||
|
||||
FastQueue(Bench<T>& pBench)
|
||||
: bench(pBench)
|
||||
, first(nullptr)
|
||||
FastQueue()
|
||||
: first(nullptr)
|
||||
, last(nullptr)
|
||||
{}
|
||||
|
||||
@ -104,7 +27,6 @@ public:
|
||||
clear();
|
||||
}
|
||||
|
||||
Bench<T>& bench;
|
||||
T* first;
|
||||
T* last;
|
||||
|
||||
@ -128,7 +50,7 @@ public:
|
||||
|
||||
T* popFront() {
|
||||
T* result = first;
|
||||
first = first->next;
|
||||
first = first->_ref;
|
||||
if(first == nullptr) {
|
||||
last = nullptr;
|
||||
}
|
||||
@ -137,23 +59,23 @@ public:
|
||||
|
||||
void popFrontNoData() {
|
||||
T* result = first;
|
||||
first = first->next;
|
||||
first = first->_ref;
|
||||
if(first == nullptr) {
|
||||
last = nullptr;
|
||||
}
|
||||
bench.free(result);
|
||||
result->free();
|
||||
}
|
||||
|
||||
void moveEntry(FastQueue& fromQueue, FastQueue& toQueue, T* entry, T* prevEntry){
|
||||
static void moveEntry(FastQueue& fromQueue, FastQueue& toQueue, T* entry, T* prevEntry){
|
||||
|
||||
if(prevEntry == nullptr) {
|
||||
toQueue.pushFront(fromQueue.popFront());
|
||||
} else if(entry->next == nullptr) {
|
||||
} else if(entry->_ref == nullptr) {
|
||||
toQueue.pushBack(entry);
|
||||
fromQueue.m_last = prevEntry;
|
||||
prevEntry->next = nullptr;
|
||||
fromQueue.last = prevEntry;
|
||||
prevEntry->_ref = nullptr;
|
||||
} else {
|
||||
prevEntry->next = entry->next;
|
||||
prevEntry->_ref = entry->_ref;
|
||||
toQueue.pushBack(entry);
|
||||
}
|
||||
|
||||
@ -163,7 +85,7 @@ public:
|
||||
T* curr = first;
|
||||
while (curr != nullptr) {
|
||||
T* next = curr->_ref;
|
||||
bench.free(curr);
|
||||
curr->free();
|
||||
curr = next;
|
||||
}
|
||||
first = nullptr;
|
||||
|
@ -36,6 +36,7 @@ public:
|
||||
OBJECT_POOL(Incoming_Request_Pool, Request, 32)
|
||||
SHARED_OBJECT_POOL(Shared_Incoming_Request_Pool, Request, 32)
|
||||
public:
|
||||
Request(){}
|
||||
Request(const std::shared_ptr<http::RequestStartingLine>& pStartingLine,
|
||||
const std::shared_ptr<url::mapping::Pattern::MatchMap>& pPathVariables,
|
||||
const std::shared_ptr<http::Protocol::Headers>& pHeaders,
|
||||
@ -54,10 +55,10 @@ public:
|
||||
return Shared_Incoming_Request_Pool::allocateShared(startingLine, pathVariables, headers, bodyStream);
|
||||
}
|
||||
|
||||
const std::shared_ptr<http::RequestStartingLine> startingLine;
|
||||
const std::shared_ptr<url::mapping::Pattern::MatchMap> pathVariables;
|
||||
const std::shared_ptr<http::Protocol::Headers> headers;
|
||||
const std::shared_ptr<oatpp::data::stream::InputStream> bodyStream;
|
||||
std::shared_ptr<http::RequestStartingLine> startingLine;
|
||||
std::shared_ptr<url::mapping::Pattern::MatchMap> pathVariables;
|
||||
std::shared_ptr<http::Protocol::Headers> headers;
|
||||
std::shared_ptr<oatpp::data::stream::InputStream> bodyStream;
|
||||
|
||||
base::String::PtrWrapper getHeader(const base::String::PtrWrapper& headerName) const{
|
||||
auto entry = headers->find(headerName);
|
||||
|
@ -37,6 +37,8 @@ public:
|
||||
OBJECT_POOL(Outgoing_Request_Pool, Request, 32)
|
||||
SHARED_OBJECT_POOL(Shared_Outgoing_Request_Pool, Request, 32)
|
||||
public:
|
||||
Request() {}
|
||||
|
||||
Request(const base::String::PtrWrapper& pMethod,
|
||||
const base::String::PtrWrapper& pPath,
|
||||
const std::shared_ptr<Headers>& pHeaders,
|
||||
@ -46,6 +48,7 @@ public:
|
||||
, headers((!pHeaders) ? (Headers::createShared()) : (pHeaders))
|
||||
, body(pBody)
|
||||
{}
|
||||
|
||||
public:
|
||||
|
||||
static std::shared_ptr<Request> createShared(const base::String::PtrWrapper& method,
|
||||
|
@ -24,8 +24,6 @@
|
||||
|
||||
#include "./AsyncHttpConnectionHandler.hpp"
|
||||
|
||||
#include "./HttpProcessor.hpp"
|
||||
|
||||
#include "../protocol/http/outgoing/ChunkedBufferBody.hpp"
|
||||
|
||||
#include "../protocol/http/incoming/Request.hpp"
|
||||
@ -41,10 +39,41 @@
|
||||
|
||||
namespace oatpp { namespace web { namespace server {
|
||||
|
||||
void AsyncHttpConnectionHandler::Task::consumeConnections(oatpp::async::Processor2& processor) {
|
||||
|
||||
class ConnectionCoroutine : public oatpp::async::Coroutine<ConnectionCoroutine> {
|
||||
public:
|
||||
|
||||
ConnectionCoroutine(const std::shared_ptr<HttpProcessor::ConnectionState>& pState)
|
||||
: state(pState)
|
||||
{}
|
||||
|
||||
std::shared_ptr<HttpProcessor::ConnectionState> state;
|
||||
|
||||
Action2 act() override {
|
||||
return finish();
|
||||
};
|
||||
|
||||
};
|
||||
|
||||
oatpp::concurrency::SpinLock lock(m_atom);
|
||||
|
||||
auto curr = m_connections.getFirstNode();
|
||||
while (curr != nullptr) {
|
||||
processor.addCoroutine(ConnectionCoroutine::getBench().obtain(curr->getData()));
|
||||
curr = curr->getNext();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void AsyncHttpConnectionHandler::Task::run(){
|
||||
|
||||
oatpp::async::Processor2 processor;
|
||||
|
||||
while(true) {
|
||||
while (m_processor.iterate()) {
|
||||
consumeConnections(processor);
|
||||
while (processor.iterate(100)) {
|
||||
consumeConnections(processor);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
}
|
||||
@ -74,7 +103,7 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::d
|
||||
}
|
||||
});
|
||||
|
||||
task->getProcessor().addRoutine(routine);
|
||||
task->addConnection(state);
|
||||
|
||||
m_taskBalancer ++;
|
||||
|
||||
|
@ -25,6 +25,8 @@
|
||||
#ifndef oatpp_web_server_AsyncHttpConnectionHandler_hpp
|
||||
#define oatpp_web_server_AsyncHttpConnectionHandler_hpp
|
||||
|
||||
#include "./HttpProcessor.hpp"
|
||||
|
||||
#include "./handler/ErrorHandler.hpp"
|
||||
|
||||
#include "./HttpRouter.hpp"
|
||||
@ -40,36 +42,35 @@
|
||||
|
||||
#include "../../../../oatpp-lib/core/src/data/stream/StreamBufferedProxy.hpp"
|
||||
#include "../../../../oatpp-lib/core/src/data/buffer/IOBuffer.hpp"
|
||||
|
||||
#include "../../../../oatpp-lib/core/src/async/Processor.hpp"
|
||||
#include "../../../../oatpp-lib/core/src/async/Processor2.hpp"
|
||||
|
||||
namespace oatpp { namespace web { namespace server {
|
||||
|
||||
class AsyncHttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler {
|
||||
private:
|
||||
|
||||
class Task : public base::Controllable, public concurrency::Runnable{
|
||||
private:
|
||||
HttpRouter* m_router;
|
||||
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
|
||||
oatpp::async::Processor m_processor;
|
||||
class Task : public base::Controllable, public concurrency::Runnable {
|
||||
public:
|
||||
Task(HttpRouter* router,
|
||||
const std::shared_ptr<handler::ErrorHandler>& errorHandler)
|
||||
: m_router(router)
|
||||
, m_errorHandler(errorHandler)
|
||||
typedef oatpp::collection::LinkedList<std::shared_ptr<HttpProcessor::ConnectionState>> Connections;
|
||||
private:
|
||||
void consumeConnections(oatpp::async::Processor2& processor);
|
||||
private:
|
||||
oatpp::concurrency::SpinLock::Atom m_atom;
|
||||
Connections m_connections;
|
||||
public:
|
||||
Task()
|
||||
{}
|
||||
public:
|
||||
|
||||
static std::shared_ptr<Task> createShared(HttpRouter* router,
|
||||
const std::shared_ptr<handler::ErrorHandler>& errorHandler){
|
||||
return std::shared_ptr<Task>(new Task(router, errorHandler));
|
||||
static std::shared_ptr<Task> createShared(){
|
||||
return std::make_shared<Task>();
|
||||
}
|
||||
|
||||
void run() override;
|
||||
|
||||
oatpp::async::Processor& getProcessor(){
|
||||
return m_processor;
|
||||
void addConnection(const std::shared_ptr<HttpProcessor::ConnectionState>& connectionState){
|
||||
oatpp::concurrency::SpinLock lock(m_atom);
|
||||
m_connections.pushBack(connectionState);
|
||||
}
|
||||
|
||||
};
|
||||
@ -89,7 +90,7 @@ public:
|
||||
{
|
||||
m_tasks = new std::shared_ptr<Task>[m_threadCount];
|
||||
for(v_int32 i = 0; i < m_threadCount; i++) {
|
||||
auto task = Task::createShared(m_router.get(), m_errorHandler);
|
||||
auto task = Task::createShared();
|
||||
m_tasks[i] = task;
|
||||
concurrency::Thread thread(task);
|
||||
thread.detach();
|
||||
|
@ -277,4 +277,59 @@ HttpProcessor::processRequestAsync(HttpRouter* router,
|
||||
|
||||
}
|
||||
|
||||
// HttpProcessor2
|
||||
|
||||
HttpProcessor2::Action2 HttpProcessor2::parseRequest(v_int32 readCount) {
|
||||
|
||||
oatpp::parser::ParsingCaret caret((p_char8) m_ioBuffer->getData(), readCount);
|
||||
auto line = protocol::http::Protocol::parseRequestStartingLine(caret);
|
||||
|
||||
if(!line){
|
||||
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_400, "Can't read starting line");
|
||||
return yieldTo(&HttpProcessor2::onResponseFormed);
|
||||
}
|
||||
|
||||
auto route = m_router->getRoute(line->method, line->path);
|
||||
|
||||
if(route.isNull()) {
|
||||
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
|
||||
return yieldTo(&HttpProcessor2::onResponseFormed);
|
||||
}
|
||||
|
||||
oatpp::web::protocol::http::Status error;
|
||||
auto headers = protocol::http::Protocol::parseHeaders(caret, error);
|
||||
|
||||
if(error.code != 0){
|
||||
m_currentResponse = m_errorHandler->handleError(error, " Can't parse headers");
|
||||
return yieldTo(&HttpProcessor2::onResponseFormed);
|
||||
}
|
||||
|
||||
auto bodyStream = m_inStream;
|
||||
bodyStream->setBufferPosition(caret.getPosition(), readCount);
|
||||
|
||||
m_currentRequest = protocol::http::incoming::Request::createShared(line, route.matchMap, headers, bodyStream);
|
||||
return yieldTo(&HttpProcessor2::onRequestFormed);
|
||||
}
|
||||
|
||||
HttpProcessor2::Action2 HttpProcessor2::act() {
|
||||
|
||||
auto readCount = m_connection->read(m_ioBuffer->getData(), m_ioBuffer->getSize());
|
||||
if(readCount > 0) {
|
||||
parseRequest((v_int32)readCount);
|
||||
|
||||
} else if(readCount == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
|
||||
return waitRetry();
|
||||
}
|
||||
return abort();
|
||||
|
||||
}
|
||||
|
||||
HttpProcessor2::Action2 HttpProcessor2::onRequestFormed() {
|
||||
return finish();
|
||||
}
|
||||
|
||||
HttpProcessor2::Action2 HttpProcessor2::onResponseFormed() {
|
||||
return finish();
|
||||
}
|
||||
|
||||
}}}
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "../../../../oatpp-lib/core/src/data/stream/StreamBufferedProxy.hpp"
|
||||
|
||||
#include "../../../../oatpp-lib/core/src/async/Processor.hpp"
|
||||
#include "../../../../oatpp-lib/core/src/async/Processor2.hpp"
|
||||
|
||||
namespace oatpp { namespace web { namespace server {
|
||||
|
||||
@ -73,6 +74,61 @@ public:
|
||||
|
||||
};
|
||||
|
||||
class HttpProcessor2 : public oatpp::async::Coroutine<HttpProcessor2> {
|
||||
public:
|
||||
|
||||
class ConnectionState {
|
||||
public:
|
||||
SHARED_OBJECT_POOL(ConnectionState_Pool, ConnectionState, 32)
|
||||
public:
|
||||
|
||||
static std::shared_ptr<ConnectionState> createShared(){
|
||||
return ConnectionState_Pool::allocateShared();
|
||||
}
|
||||
|
||||
std::shared_ptr<oatpp::data::stream::IOStream> connection;
|
||||
std::shared_ptr<oatpp::data::buffer::IOBuffer> ioBuffer;
|
||||
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> outStream;
|
||||
std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy> inStream;
|
||||
|
||||
};
|
||||
|
||||
private:
|
||||
Action2 parseRequest(v_int32 readCount);
|
||||
private:
|
||||
HttpRouter* m_router;
|
||||
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
|
||||
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
|
||||
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_ioBuffer;
|
||||
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> m_outStream;
|
||||
std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy> m_inStream;
|
||||
bool m_keepAlive;
|
||||
private:
|
||||
std::shared_ptr<protocol::http::incoming::Request> m_currentRequest;
|
||||
std::shared_ptr<protocol::http::outgoing::Response> m_currentResponse;
|
||||
public:
|
||||
|
||||
HttpProcessor2(HttpRouter* router,
|
||||
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
|
||||
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
|
||||
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& ioBuffer,
|
||||
const std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy>& outStream,
|
||||
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream)
|
||||
: m_router(router)
|
||||
, m_errorHandler(errorHandler)
|
||||
, m_ioBuffer(ioBuffer)
|
||||
, m_outStream(outStream)
|
||||
, m_inStream(inStream)
|
||||
, m_keepAlive(true)
|
||||
{}
|
||||
|
||||
Action2 act() override;
|
||||
|
||||
Action2 onRequestFormed();
|
||||
Action2 onResponseFormed();
|
||||
|
||||
};
|
||||
|
||||
}}}
|
||||
|
||||
#endif /* oatpp_web_server_HttpProcessor_hpp */
|
||||
|
Loading…
Reference in New Issue
Block a user