better AsyncIO error handling

This commit is contained in:
lganzzzo 2019-03-31 01:58:27 +02:00
parent 52e1754d12
commit 5be7637c64
13 changed files with 71 additions and 37 deletions

View File

@ -46,6 +46,7 @@ add_library(oatpp
oatpp/core/concurrency/SpinLock.hpp oatpp/core/concurrency/SpinLock.hpp
oatpp/core/concurrency/Thread.cpp oatpp/core/concurrency/Thread.cpp
oatpp/core/concurrency/Thread.hpp oatpp/core/concurrency/Thread.hpp
oatpp/core/data/IODefinitions.cpp
oatpp/core/data/IODefinitions.hpp oatpp/core/data/IODefinitions.hpp
oatpp/core/data/buffer/FIFOBuffer.cpp oatpp/core/data/buffer/FIFOBuffer.cpp
oatpp/core/data/buffer/FIFOBuffer.hpp oatpp/core/data/buffer/FIFOBuffer.hpp

View File

@ -46,14 +46,14 @@ Action::Action(FunctionPtr functionPtr)
Action::Action(v_int32 type) Action::Action(v_int32 type)
: m_type(type) : m_type(type)
, m_data()
{} {}
Action::Action(Action&& other) Action::Action(Action&& other)
: m_type(other.m_type) : m_type(other.m_type)
, m_data(other.m_data) , m_data(other.m_data)
{ {
other.m_data.fptr = nullptr; other.m_type = TYPE_NONE;
//OATPP_LOGD("aaa", "moving");
} }
Action::~Action() { Action::~Action() {
@ -85,8 +85,8 @@ Action AbstractCoroutine::takeAction(Action&& action) {
switch (action.m_type) { switch (action.m_type) {
case Action::TYPE_REPEAT: return std::forward<oatpp::async::Action>(action); case Action::TYPE_REPEAT: return Action::TYPE_REPEAT;//std::forward<oatpp::async::Action>(action);
case Action::TYPE_WAIT_RETRY: return std::forward<oatpp::async::Action>(action); case Action::TYPE_WAIT_RETRY: return Action::TYPE_WAIT_RETRY;//std::forward<oatpp::async::Action>(action);
case Action::TYPE_COROUTINE: case Action::TYPE_COROUTINE:
action.m_data.coroutine->m_parent = _CP; action.m_data.coroutine->m_parent = _CP;

View File

@ -201,10 +201,10 @@ public:
return takeAction(_CP->call(_FP)); return takeAction(_CP->call(_FP));
} catch (std::exception& e) { } catch (std::exception& e) {
*m_propagatedError = std::make_shared<Error>(e.what()); *m_propagatedError = std::make_shared<Error>(e.what());
return takeAction(Action(Action::TYPE_ERROR)); return takeAction(Action::TYPE_ERROR);
} catch (...) { } catch (...) {
*m_propagatedError = ERROR_UNKNOWN; *m_propagatedError = ERROR_UNKNOWN;
return takeAction(Action(Action::TYPE_ERROR)); return takeAction(Action::TYPE_ERROR);
} }
}; };

View File

@ -41,6 +41,11 @@ public:
*/ */
Error(const char* what); Error(const char* what);
/**
* Virtual destructor.
*/
virtual ~Error() = default;
/** /**
* Error explanation. * Error explanation.
* @return * @return
@ -54,7 +59,7 @@ public:
*/ */
template<class ErrorClass> template<class ErrorClass>
bool is() const { bool is() const {
return dynamic_cast<ErrorClass*>(this) != nullptr; return dynamic_cast<const ErrorClass*>(this) != nullptr;
} }
}; };

View File

@ -0,0 +1,32 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "IODefinitions.hpp"
namespace oatpp { namespace data {
const std::shared_ptr<const oatpp::async::Error> AsyncIOError::ERROR_BROKEN_PIPE = std::make_shared<AsyncIOError>(IOError::BROKEN_PIPE);
const std::shared_ptr<const oatpp::async::Error> AsyncIOError::ERROR_ZERO_VALUE = std::make_shared<AsyncIOError>(IOError::ZERO_VALUE);
}}

View File

@ -87,6 +87,9 @@ enum IOError : v_io_size {
}; };
class AsyncIOError : public oatpp::async::Error { class AsyncIOError : public oatpp::async::Error {
public:
static const std::shared_ptr<const Error> ERROR_BROKEN_PIPE;
static const std::shared_ptr<const Error> ERROR_ZERO_VALUE;
private: private:
v_io_size m_code; v_io_size m_code;
public: public:
@ -101,7 +104,7 @@ public:
, m_code(code) , m_code(code)
{} {}
v_io_size getCode() { v_io_size getCode() const {
return m_code; return m_code;
} }

View File

@ -266,11 +266,7 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
} }
Action doRead() { Action doRead() {
return oatpp::data::stream::readSomeDataAsyncInline(this, return oatpp::data::stream::readSomeDataAsyncInline(this, m_fromStream.get(), m_readBufferPtr, m_bytesLeft, yieldTo(&TransferCoroutine::prepareWrite));
m_fromStream.get(),
m_readBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::prepareWrite));
} }
Action prepareWrite() { Action prepareWrite() {
@ -280,11 +276,7 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
} }
Action doWrite() { Action doWrite() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_toStream.get(), m_writeBufferPtr, m_bytesLeft, yieldTo(&TransferCoroutine::act));
m_toStream.get(),
m_writeBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::act));
} }
Action handleError(const std::shared_ptr<const Error>& error) override { Action handleError(const std::shared_ptr<const Error>& error) override {
@ -302,9 +294,6 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
namespace { namespace {
std::shared_ptr<const AsyncIOError> ERROR_ASYNC_BROKEN_PIPE = std::make_shared<AsyncIOError>(IOError::BROKEN_PIPE);
std::shared_ptr<const AsyncIOError> ERROR_ASYNC_ZERO_VALUE = std::make_shared<AsyncIOError>(IOError::ZERO_VALUE);
oatpp::async::Action asyncActionOnIOError(oatpp::async::AbstractCoroutine* coroutine, data::v_io_size res) { oatpp::async::Action asyncActionOnIOError(oatpp::async::AbstractCoroutine* coroutine, data::v_io_size res) {
switch (res) { switch (res) {
case IOError::WAIT_RETRY: case IOError::WAIT_RETRY:
@ -312,9 +301,9 @@ namespace {
case IOError::RETRY: case IOError::RETRY:
return oatpp::async::Action::TYPE_REPEAT; return oatpp::async::Action::TYPE_REPEAT;
case IOError::BROKEN_PIPE: case IOError::BROKEN_PIPE:
return coroutine->error(ERROR_ASYNC_BROKEN_PIPE); return coroutine->error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
case IOError::ZERO_VALUE: case IOError::ZERO_VALUE:
return coroutine->error(ERROR_ASYNC_ZERO_VALUE); return coroutine->error(oatpp::data::AsyncIOError::ERROR_ZERO_VALUE);
} }
return coroutine->error<AsyncIOError>("Unknown IO Error result", res); return coroutine->error<AsyncIOError>("Unknown IO Error result", res);
} }

View File

@ -152,7 +152,9 @@ RequestHeadersReader::Action RequestHeadersReader::readHeadersAsync(oatpp::async
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) { } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return waitRetry(); return waitRetry();
} else if(res == data::IOError::BROKEN_PIPE){ } else if(res == data::IOError::BROKEN_PIPE){
return error<Error>("Connection Closed."); return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
} else if(res == data::IOError::ZERO_VALUE){
return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
} else { } else {
return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Error reading connection stream."); return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Error reading connection stream.");
} }

View File

@ -152,7 +152,9 @@ ResponseHeadersReader::Action ResponseHeadersReader::readHeadersAsync(oatpp::asy
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) { } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return waitRetry(); return waitRetry();
} else if(res == data::IOError::BROKEN_PIPE) { } else if(res == data::IOError::BROKEN_PIPE) {
return error<Error>("Connection closed"); return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
} else if(res == data::IOError::ZERO_VALUE) {
return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
} else { } else {
return error<Error>("[oatpp::web::protocol::http::incoming::ResponseHeadersReader::readHeadersAsync()]: Error. Error reading connection stream."); return error<Error>("[oatpp::web::protocol::http::incoming::ResponseHeadersReader::readHeadersAsync()]: Error. Error reading connection stream.");
} }

View File

@ -199,17 +199,9 @@ oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::Abs
Action skipRN() { Action skipRN() {
if(m_done) { if(m_done) {
return oatpp::data::stream::readExactSizeDataAsyncInline(this, return oatpp::data::stream::readExactSizeDataAsyncInline(this, m_fromStream.get(), m_skipData, m_skipSize, finish());
m_fromStream.get(),
m_skipData,
m_skipSize,
finish());
} else { } else {
return oatpp::data::stream::readExactSizeDataAsyncInline(this, return oatpp::data::stream::readExactSizeDataAsyncInline(this, m_fromStream.get(), m_skipData, m_skipSize, yieldTo(&ChunkedDecoder::readLineChar));
m_fromStream.get(),
m_skipData,
m_skipSize,
yieldTo(&ChunkedDecoder::readLineChar));
} }
} }

View File

@ -184,6 +184,13 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::handleError(const std
if(error) { if(error) {
if(error->is<oatpp::data::AsyncIOError>()) {
auto aioe = static_cast<const oatpp::data::AsyncIOError*>(error.get());
if(aioe->getCode() == oatpp::data::IOError::BROKEN_PIPE) {
return Action::TYPE_ERROR; // do not report BROKEN_PIPE error
}
}
if(m_currentResponse) { if(m_currentResponse) {
OATPP_LOGE("[oatpp::web::server::HttpProcessor::Coroutine::handleError()]", "Unhandled error. '%s'. Dropping connection", error->what()); OATPP_LOGE("[oatpp::web::server::HttpProcessor::Coroutine::handleError()]", "Unhandled error. '%s'. Dropping connection", error->what());
return Action::TYPE_ERROR; return Action::TYPE_ERROR;

View File

@ -85,6 +85,7 @@ void runTests() {
OATPP_RUN_TEST(oatpp::test::web::server::api::ApiControllerTest); OATPP_RUN_TEST(oatpp::test::web::server::api::ApiControllerTest);
OATPP_RUN_TEST(oatpp::test::web::FullTest); OATPP_RUN_TEST(oatpp::test::web::FullTest);
OATPP_RUN_TEST(oatpp::test::web::FullAsyncTest); OATPP_RUN_TEST(oatpp::test::web::FullAsyncTest);
OATPP_RUN_TEST(oatpp::test::web::FullAsyncClientTest); OATPP_RUN_TEST(oatpp::test::web::FullAsyncClientTest);

View File

@ -58,7 +58,7 @@ class TestComponent {
public: public:
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor)([] { OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor)([] {
return std::make_shared<oatpp::async::Executor>(1); return std::make_shared<oatpp::async::Executor>(10);
}()); }());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, virtualInterface)([] { OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, virtualInterface)([] {
@ -206,7 +206,7 @@ void FullAsyncClientTest::onRun() {
ClientCoroutine_getRootAsync::SUCCESS_COUNTER = 0; ClientCoroutine_getRootAsync::SUCCESS_COUNTER = 0;
ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER = 0; ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER = 0;
v_int32 iterations = 1; v_int32 iterations = 10000;
for(v_int32 i = 0; i < iterations; i++) { for(v_int32 i = 0; i < iterations; i++) {
executor->execute<ClientCoroutine_getRootAsync>(); executor->execute<ClientCoroutine_getRootAsync>();