diff --git a/core/async/Coroutine.hpp b/core/async/Coroutine.hpp index 4600a73a..9caecd6a 100644 --- a/core/async/Coroutine.hpp +++ b/core/async/Coroutine.hpp @@ -131,6 +131,9 @@ private: { AbstractCoroutine* savedCP = _CP; _CP = _CP->m_parent; + _FP = nullptr; + /* Please note that savedCP->m_parentReturnAction should not be "REPEAT nor WAIT_RETRY" */ + /* as funtion pointer (FP) is invalidated */ Action a = takeAction(savedCP->m_parentReturnAction); savedCP->m_parentReturnAction.m_coroutine = nullptr; savedCP->free(); diff --git a/core/data/stream/ChunkedBuffer.cpp b/core/data/stream/ChunkedBuffer.cpp index ae34f7e3..ccb9144a 100644 --- a/core/data/stream/ChunkedBuffer.cpp +++ b/core/data/stream/ChunkedBuffer.cpp @@ -261,7 +261,7 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor } Action writeCurrData() { - return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction); + return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction); } }; diff --git a/core/data/stream/ChunkedBuffer.hpp b/core/data/stream/ChunkedBuffer.hpp index 0d8db1bd..84ed3a7e 100644 --- a/core/data/stream/ChunkedBuffer.hpp +++ b/core/data/stream/ChunkedBuffer.hpp @@ -149,9 +149,14 @@ public: os::io::Library::v_size pos, os::io::Library::v_size count); - oatpp::String getSubstring(os::io::Library::v_size pos, - os::io::Library::v_size count); + /** + * return substring of the data written to stream; NOT NULL + */ + oatpp::String getSubstring(os::io::Library::v_size pos, os::io::Library::v_size count); + /** + * return data written to stream as oatpp::String; NOT NULL + */ oatpp::String toString() { return getSubstring(0, m_size); } diff --git a/core/data/stream/Stream.cpp b/core/data/stream/Stream.cpp index df276a64..39901d27 100644 --- a/core/data/stream/Stream.cpp +++ b/core/data/stream/Stream.cpp @@ -220,10 +220,10 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout } Action doWrite() { - return oatpp::data::stream::writeDataAsyncInline(m_toStream.get(), - m_writeBufferPtr, - m_bytesLeft, - yieldTo(&TransferCoroutine::act)); + return oatpp::data::stream::writeExactSizeDataAsyncInline(m_toStream.get(), + m_writeBufferPtr, + m_bytesLeft, + yieldTo(&TransferCoroutine::act)); } Action handleError(const oatpp::async::Error& error) override { @@ -241,10 +241,10 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout } -oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream, - const void*& data, - os::io::Library::v_size& size, - const oatpp::async::Action& nextAction) { +oatpp::async::Action writeSomeDataAsyncInline(oatpp::data::stream::OutputStream* stream, + const void*& data, + os::io::Library::v_size& size, + const oatpp::async::Action& nextAction) { auto res = stream->write(data, size); if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { return oatpp::async::Action::_WAIT_RETRY; @@ -252,11 +252,36 @@ oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* str return oatpp::async::Action::_REPEAT; } else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) { return oatpp::async::Action::_ABORT; - } else if( res < 0) { + } else if(res < 0) { return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA)); - } else if(res < size) { - data = &((p_char8) data)[res]; - size = size - res; + } + data = &((p_char8) data)[res]; + size = size - res; + if(res < size && res > 0) { + return oatpp::async::Action::_REPEAT; + } + return nextAction; +} + +oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream, + const void*& data, + os::io::Library::v_size& size, + const oatpp::async::Action& nextAction) { + auto res = stream->write(data, size); + if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { + return oatpp::async::Action::_WAIT_RETRY; + } else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { + return oatpp::async::Action::_REPEAT; + } else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) { + return oatpp::async::Action::_ABORT; + } else if(res < 0) { + return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA)); + } else if(res == 0) { + return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA)); + } + data = &((p_char8) data)[res]; + size = size - res; + if(res < size) { return oatpp::async::Action::_REPEAT; } return nextAction; @@ -273,11 +298,9 @@ oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* s return oatpp::async::Action::_REPEAT; } else if( res < 0) { return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA)); - } else if(res < bytesLeftToRead) { - data = &((p_char8) data)[res]; - bytesLeftToRead -= res; - return nextAction; - } + } + // res == 0 is not an error here + data = &((p_char8) data)[res]; bytesLeftToRead -= res; return nextAction; } @@ -295,15 +318,41 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre return oatpp::async::Action::_ABORT; } else if( res < 0) { return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA)); - } else if(res < bytesLeftToRead) { - data = &((p_char8) data)[res]; - bytesLeftToRead -= res; + } else if(res == 0) { // Connection is probably closed or eof is reached + return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA)); + } + data = &((p_char8) data)[res]; + bytesLeftToRead -= res; + if(res < bytesLeftToRead) { return oatpp::async::Action::_REPEAT; } - bytesLeftToRead -= res; return nextAction; } +oatpp::os::io::Library::v_size readExactSizeData(oatpp::data::stream::InputStream* stream, void* data, os::io::Library::v_size size) { + + char* buffer = (char*) data; + oatpp::os::io::Library::v_size progress = 0; + + while (progress < size) { + + auto res = stream->read(&buffer[progress], size - progress); + + if(res > 0) { + progress += res; + } else { // if res == 0 then probably stream handles read() error incorrectly. return. + if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { + continue; + } + return progress; + } + + } + + return progress; + +} + oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size) { const char* buffer = (char*)data; diff --git a/core/data/stream/Stream.hpp b/core/data/stream/Stream.hpp index 90ee694e..35a284ca 100644 --- a/core/data/stream/Stream.hpp +++ b/core/data/stream/Stream.hpp @@ -162,10 +162,15 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout * Async write data withot starting new Coroutine. * Should be called from a separate Coroutine method */ -oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream, - const void*& data, - os::io::Library::v_size& size, - const oatpp::async::Action& nextAction); +oatpp::async::Action writeSomeDataAsyncInline(oatpp::data::stream::OutputStream* stream, + const void*& data, + os::io::Library::v_size& size, + const oatpp::async::Action& nextAction); + +oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream, + const void*& data, + os::io::Library::v_size& size, + const oatpp::async::Action& nextAction); oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream, void*& data, @@ -177,6 +182,13 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre os::io::Library::v_size& bytesLeftToRead, const oatpp::async::Action& nextAction); +/** + * Read exact amount of bytes to stream + * returns exact amount of bytes was read. + * return result can be < size only in case of some disaster like connection reset by peer + */ +oatpp::os::io::Library::v_size readExactSizeData(oatpp::data::stream::InputStream* stream, void* data, os::io::Library::v_size size); + /** * Write exact amount of bytes to stream. * returns exact amount of bytes was written. diff --git a/test/core/base/CommandLineArgumentsTest.hpp b/test/core/base/CommandLineArgumentsTest.hpp index 3babf76a..041888f2 100644 --- a/test/core/base/CommandLineArgumentsTest.hpp +++ b/test/core/base/CommandLineArgumentsTest.hpp @@ -29,6 +29,9 @@ namespace oatpp { namespace test { namespace base { +/** + * Test command line arguments parsing. + */ class CommandLineArgumentsTest : public UnitTest{ public: diff --git a/test/core/base/memory/MemoryPoolTest.hpp b/test/core/base/memory/MemoryPoolTest.hpp index 37b8339b..a2323dfe 100644 --- a/test/core/base/memory/MemoryPoolTest.hpp +++ b/test/core/base/memory/MemoryPoolTest.hpp @@ -29,6 +29,9 @@ namespace oatpp { namespace test { namespace memory { +/** + * Test memory pool allocations + */ class MemoryPoolTest : public UnitTest{ public: diff --git a/web/protocol/http/Http.cpp b/web/protocol/http/Http.cpp index a0763af8..f33f7804 100644 --- a/web/protocol/http/Http.cpp +++ b/web/protocol/http/Http.cpp @@ -99,6 +99,7 @@ const Status Status::CODE_511(511, "Network Authentication Required"); const char* const Header::Value::CONNECTION_CLOSE = "close"; const char* const Header::Value::CONNECTION_KEEP_ALIVE = "keep-alive"; +const char* const Header::Value::CONNECTION_UPGRADE = "Upgrade"; const char* const Header::Value::SERVER = "oatpp/" OATPP_VERSION; const char* const Header::Value::USER_AGENT = "oatpp/" OATPP_VERSION; @@ -119,6 +120,7 @@ const char* const Header::RANGE = "Range"; const char* const Header::HOST = "Host"; const char* const Header::USER_AGENT = "User-Agent"; const char* const Header::SERVER = "Server"; +const char* const Header::UPGRADE = "Upgrade"; const char* const Range::UNIT_BYTES = "bytes"; const char* const ContentRange::UNIT_BYTES = "bytes"; diff --git a/web/protocol/http/Http.hpp b/web/protocol/http/Http.hpp index dd7004c0..4e087962 100644 --- a/web/protocol/http/Http.hpp +++ b/web/protocol/http/Http.hpp @@ -154,6 +154,7 @@ public: public: static const char* const CONNECTION_CLOSE; static const char* const CONNECTION_KEEP_ALIVE; + static const char* const CONNECTION_UPGRADE; static const char* const SERVER; static const char* const USER_AGENT; @@ -174,6 +175,7 @@ public: static const char* const HOST; // "Host" static const char* const USER_AGENT; // "User-Agent" static const char* const SERVER; // "Server" + static const char* const UPGRADE; // "Upgrade" }; class Range { diff --git a/web/protocol/http/incoming/Request.hpp b/web/protocol/http/incoming/Request.hpp index aeaf86f0..fdedb986 100644 --- a/web/protocol/http/incoming/Request.hpp +++ b/web/protocol/http/incoming/Request.hpp @@ -35,6 +35,17 @@ class Request : public oatpp::base::Controllable { public: OBJECT_POOL(Incoming_Request_Pool, Request, 32) SHARED_OBJECT_POOL(Shared_Incoming_Request_Pool, Request, 32) +private: + http::RequestStartingLine m_startingLine; + url::mapping::Pattern::MatchMap m_pathVariables; + http::Protocol::Headers m_headers; + std::shared_ptr m_bodyStream; + + /** + * Request should be preconfigured with default BodyDecoder. + * Custom BodyDecoder can be set on demand + */ + std::shared_ptr m_bodyDecoder; public: /* Request(const std::shared_ptr& pBodyDecoder) @@ -42,16 +53,16 @@ public: {} */ - Request(const http::RequestStartingLine& pStartingLine, - const url::mapping::Pattern::MatchMap& pPathVariables, - const http::Protocol::Headers& pHeaders, - const std::shared_ptr& pBodyStream, - const std::shared_ptr& pBodyDecoder) - : startingLine(pStartingLine) - , pathVariables(pPathVariables) - , headers(pHeaders) - , bodyStream(pBodyStream) - , bodyDecoder(pBodyDecoder) + Request(const http::RequestStartingLine& startingLine, + const url::mapping::Pattern::MatchMap& pathVariables, + const http::Protocol::Headers& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& bodyDecoder) + : m_startingLine(startingLine) + , m_pathVariables(pathVariables) + , m_headers(headers) + , m_bodyStream(bodyStream) + , m_bodyDecoder(bodyDecoder) {} public: @@ -63,50 +74,59 @@ public: return Shared_Incoming_Request_Pool::allocateShared(startingLine, pathVariables, headers, bodyStream, bodyDecoder); } - http::RequestStartingLine startingLine; - url::mapping::Pattern::MatchMap pathVariables; - http::Protocol::Headers headers; - std::shared_ptr bodyStream; + const http::RequestStartingLine& getStartingLine() const { + return m_startingLine; + } - /** - * Request should be preconfigured with default BodyDecoder. - * Custom BodyDecoder can be set on demand - */ - std::shared_ptr bodyDecoder; + const url::mapping::Pattern::MatchMap& getPathVariables() const { + return m_pathVariables; + } + + const http::Protocol::Headers& getHeaders() const { + return m_headers; + } + + std::shared_ptr getBodyStream() const { + return m_bodyStream; + } + + std::shared_ptr getBodyDecoder() const { + return m_bodyDecoder; + } oatpp::String getHeader(const oatpp::String& headerName) const{ - auto it = headers.find(headerName); - if(it != headers.end()) { + auto it = m_headers.find(headerName); + if(it != m_headers.end()) { return it->second.toString(); } return nullptr; } oatpp::String getPathVariable(const oatpp::data::share::StringKeyLabel& name) const { - return pathVariables.getVariable(name); + return m_pathVariables.getVariable(name); } oatpp::String getPathTail() const { - return pathVariables.getTail(); + return m_pathVariables.getTail(); } void streamBody(const std::shared_ptr& toStream) const { - bodyDecoder->decode(headers, bodyStream, toStream); + m_bodyDecoder->decode(m_headers, m_bodyStream, toStream); } oatpp::String readBodyToString() const { - return bodyDecoder->decodeToString(headers, bodyStream); + return m_bodyDecoder->decodeToString(m_headers, m_bodyStream); } template typename Type::ObjectWrapper readBodyToDto(const std::shared_ptr& objectMapper) const { - return objectMapper->readFromString(bodyDecoder->decodeToString(headers, bodyStream)); + return objectMapper->readFromString(m_bodyDecoder->decodeToString(m_headers, m_bodyStream)); } template void readBodyToDto(oatpp::data::mapping::type::PolymorphicWrapper& objectWrapper, const std::shared_ptr& objectMapper) const { - objectWrapper = objectMapper->readFromString(bodyDecoder->decodeToString(headers, bodyStream)); + objectWrapper = objectMapper->readFromString(m_bodyDecoder->decodeToString(m_headers, m_bodyStream)); } // Async @@ -114,20 +134,20 @@ public: oatpp::async::Action streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const oatpp::async::Action& actionOnReturn, const std::shared_ptr& toStream) const { - return bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream); + return m_bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, m_headers, m_bodyStream, toStream); } template oatpp::async::Action readBodyToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&)) const { - return bodyDecoder->decodeToStringAsync(parentCoroutine, callback, headers, bodyStream); + return m_bodyDecoder->decodeToStringAsync(parentCoroutine, callback, m_headers, m_bodyStream); } template oatpp::async::Action readBodyToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&), const std::shared_ptr& objectMapper) const { - return bodyDecoder->decodeToDtoAsync(parentCoroutine, callback, headers, bodyStream, objectMapper); + return m_bodyDecoder->decodeToDtoAsync(parentCoroutine, callback, m_headers, m_bodyStream, objectMapper); } }; diff --git a/web/protocol/http/incoming/RequestHeadersReader.cpp b/web/protocol/http/incoming/RequestHeadersReader.cpp index 190e54ba..731f68ce 100644 --- a/web/protocol/http/incoming/RequestHeadersReader.cpp +++ b/web/protocol/http/incoming/RequestHeadersReader.cpp @@ -100,7 +100,7 @@ RequestHeadersReader::Action RequestHeadersReader::readHeadersAsync(oatpp::async const std::shared_ptr& connection) { - class ReaderCoroutine : public oatpp::async::CoroutineWithResult { + class ReaderCoroutine : public oatpp::async::CoroutineWithResult { private: std::shared_ptr m_connection; p_char8 m_buffer; diff --git a/web/protocol/http/incoming/Response.hpp b/web/protocol/http/incoming/Response.hpp index 7f2486d7..8c6aaed3 100644 --- a/web/protocol/http/incoming/Response.hpp +++ b/web/protocol/http/incoming/Response.hpp @@ -34,17 +34,32 @@ class Response : public oatpp::base::Controllable { public: OBJECT_POOL(Incoming_Response_Pool, Response, 32) SHARED_OBJECT_POOL(Shared_Incoming_Response_Pool, Response, 32) +private: + v_int32 m_statusCode; + oatpp::String m_statusDescription; + http::Protocol::Headers m_headers; + std::shared_ptr m_bodyStream; + + /** + * Response should be preconfigured with default BodyDecoder. + * Entity that created response object is responsible for providing correct BodyDecoder. + * Custom BodyDecoder can be set on demand + */ + std::shared_ptr m_bodyDecoder; + + std::shared_ptr m_connection; + public: - Response(v_int32 pStatusCode, - const oatpp::String& pStatusDescription, - const http::Protocol::Headers& pHeaders, - const std::shared_ptr& pBodyStream, - const std::shared_ptr& pBodyDecoder) - : statusCode(pStatusCode) - , statusDescription(pStatusDescription) - , headers(pHeaders) - , bodyStream(pBodyStream) - , bodyDecoder(pBodyDecoder) + Response(v_int32 statusCode, + const oatpp::String& statusDescription, + const http::Protocol::Headers& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& bodyDecoder) + : m_statusCode(statusCode) + , m_statusDescription(statusDescription) + , m_headers(headers) + , m_bodyStream(bodyStream) + , m_bodyDecoder(bodyDecoder) {} public: @@ -56,29 +71,37 @@ public: return Shared_Incoming_Response_Pool::allocateShared(statusCode, statusDescription, headers, bodyStream, bodyDecoder); } - const v_int32 statusCode; - const oatpp::String statusDescription; - const http::Protocol::Headers headers; - const std::shared_ptr bodyStream; + v_int32 getStatusCode() const { + return m_statusCode; + } - /** - * Response should be preconfigured with default BodyDecoder. - * Entity that created response object is responsible for providing correct BodyDecoder. - * Custom BodyDecoder can be set on demand - */ - std::shared_ptr bodyDecoder; + oatpp::String getStatusDescription() const { + return m_statusDescription; + } + const http::Protocol::Headers& getHeaders() const { + return m_headers; + } + + std::shared_ptr getBodyStream() const { + return m_bodyStream; + } + + std::shared_ptr getBodyDecoder() const { + return m_bodyDecoder; + } + void streamBody(const std::shared_ptr& toStream) const { - bodyDecoder->decode(headers, bodyStream, toStream); + m_bodyDecoder->decode(m_headers, m_bodyStream, toStream); } oatpp::String readBodyToString() const { - return bodyDecoder->decodeToString(headers, bodyStream); + return m_bodyDecoder->decodeToString(m_headers, m_bodyStream); } template typename Type::ObjectWrapper readBodyToDto(const std::shared_ptr& objectMapper) const { - return bodyDecoder->decodeToDto(headers, bodyStream, objectMapper); + return m_bodyDecoder->decodeToDto(m_headers, m_bodyStream, objectMapper); } // Async @@ -86,20 +109,20 @@ public: oatpp::async::Action streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const oatpp::async::Action& actionOnReturn, const std::shared_ptr& toStream) const { - return bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream); + return m_bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, m_headers, m_bodyStream, toStream); } template oatpp::async::Action readBodyToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&)) const { - return bodyDecoder->decodeToStringAsync(parentCoroutine, callback, headers, bodyStream); + return m_bodyDecoder->decodeToStringAsync(parentCoroutine, callback, m_headers, m_bodyStream); } template oatpp::async::Action readBodyToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&), const std::shared_ptr& objectMapper) const { - return bodyDecoder->decodeToDtoAsync(parentCoroutine, callback, headers, bodyStream, objectMapper); + return m_bodyDecoder->decodeToDtoAsync(parentCoroutine, callback, m_headers, m_bodyStream, objectMapper); } }; diff --git a/web/protocol/http/incoming/ResponseHeadersReader.cpp b/web/protocol/http/incoming/ResponseHeadersReader.cpp index 4cf2fb43..e962746f 100644 --- a/web/protocol/http/incoming/ResponseHeadersReader.cpp +++ b/web/protocol/http/incoming/ResponseHeadersReader.cpp @@ -100,7 +100,7 @@ ResponseHeadersReader::Action ResponseHeadersReader::readHeadersAsync(oatpp::asy const std::shared_ptr& connection) { - class ReaderCoroutine : public oatpp::async::CoroutineWithResult { + class ReaderCoroutine : public oatpp::async::CoroutineWithResult { private: std::shared_ptr m_connection; p_char8 m_buffer; diff --git a/web/protocol/http/outgoing/BufferBody.hpp b/web/protocol/http/outgoing/BufferBody.hpp index ebda334a..3d0000f5 100644 --- a/web/protocol/http/outgoing/BufferBody.hpp +++ b/web/protocol/http/outgoing/BufferBody.hpp @@ -74,7 +74,7 @@ public: {} Action act() override { - return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish()); + return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish()); } }; diff --git a/web/protocol/http/outgoing/ChunkedBufferBody.hpp b/web/protocol/http/outgoing/ChunkedBufferBody.hpp index bdad5844..458d1004 100644 --- a/web/protocol/http/outgoing/ChunkedBufferBody.hpp +++ b/web/protocol/http/outgoing/ChunkedBufferBody.hpp @@ -145,7 +145,7 @@ public: } Action writeCurrData() { - return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction); + return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction); } }; diff --git a/web/protocol/http/outgoing/CommunicationUtils.cpp b/web/protocol/http/outgoing/CommunicationUtils.cpp index 4b21d6c8..696852e6 100644 --- a/web/protocol/http/outgoing/CommunicationUtils.cpp +++ b/web/protocol/http/outgoing/CommunicationUtils.cpp @@ -31,33 +31,46 @@ bool CommunicationUtils::headerEqualsCI_FAST(const oatpp::data::share::MemoryLab return size == headerValue.getSize() && oatpp::base::StrBuffer::equalsCI_FAST(headerValue.getData(), value, size); } -bool CommunicationUtils::considerConnectionKeepAlive(const std::shared_ptr& request, - const std::shared_ptr& response){ +v_int32 CommunicationUtils::considerConnectionState(const std::shared_ptr& request, + const std::shared_ptr& response){ + + auto outState = response->getHeaders().find(Header::CONNECTION); + if(outState != response->getHeaders().end() && headerEqualsCI_FAST(outState->second, Header::Value::CONNECTION_UPGRADE)) { + return CONNECTION_STATE_UPGRADE; + } if(request) { - /* Set keep-alive to value specified in the client's request, if no Connection header present in response. */ /* Set keep-alive to value specified in response otherwise */ - auto it = request->headers.find(Header::CONNECTION); - if(it != request->headers.end() && headerEqualsCI_FAST(it->second, Header::Value::CONNECTION_KEEP_ALIVE)) { - if(response->putHeaderIfNotExists(Header::CONNECTION, it->second)){ - return true; + auto it = request->getHeaders().find(Header::CONNECTION); + if(it != request->getHeaders().end() && headerEqualsCI_FAST(it->second, Header::Value::CONNECTION_KEEP_ALIVE)) { + if(outState != response->getHeaders().end()) { + if(headerEqualsCI_FAST(outState->second, Header::Value::CONNECTION_KEEP_ALIVE)) { + return CONNECTION_STATE_KEEP_ALIVE; + } else { + return CONNECTION_STATE_CLOSE; + } } else { - auto outKeepAlive = response->getHeaders().find(Header::CONNECTION); - return (outKeepAlive != response->getHeaders().end() && headerEqualsCI_FAST(outKeepAlive->second, Header::Value::CONNECTION_KEEP_ALIVE)); + response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE); + return CONNECTION_STATE_KEEP_ALIVE; } } /* If protocol == HTTP/1.1 */ /* Set HTTP/1.1 default Connection header value (Keep-Alive), if no Connection header present in response. */ /* Set keep-alive to value specified in response otherwise */ - auto& protocol = request->startingLine.protocol; + auto& protocol = request->getStartingLine().protocol; if(protocol.getData() != nullptr && headerEqualsCI_FAST(protocol, "HTTP/1.1")) { - if(!response->putHeaderIfNotExists(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE)) { - auto outKeepAlive = response->getHeaders().find(Header::CONNECTION); - return (outKeepAlive != response->getHeaders().end() && headerEqualsCI_FAST(outKeepAlive->second, Header::Value::CONNECTION_KEEP_ALIVE)); + if(outState != response->getHeaders().end()) { + if(headerEqualsCI_FAST(outState->second, Header::Value::CONNECTION_KEEP_ALIVE)) { + return CONNECTION_STATE_KEEP_ALIVE; + } else { + return CONNECTION_STATE_CLOSE; + } + } else { + response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE); + return CONNECTION_STATE_KEEP_ALIVE; } - return true; } } @@ -65,12 +78,18 @@ bool CommunicationUtils::considerConnectionKeepAlive(const std::shared_ptrputHeaderIfNotExists(Header::CONNECTION, Header::Value::CONNECTION_CLOSE)) { - auto outKeepAlive = response->getHeaders().find(Header::CONNECTION); - return (outKeepAlive != response->getHeaders().end() && headerEqualsCI_FAST(outKeepAlive->second, Header::Value::CONNECTION_KEEP_ALIVE)); + if(outState != response->getHeaders().end()) { + if(headerEqualsCI_FAST(outState->second, Header::Value::CONNECTION_KEEP_ALIVE)) { + return CONNECTION_STATE_KEEP_ALIVE; + } else { + return CONNECTION_STATE_CLOSE; + } + } else { + response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_CLOSE); + return CONNECTION_STATE_CLOSE; } - return false; + return CONNECTION_STATE_CLOSE; } diff --git a/web/protocol/http/outgoing/CommunicationUtils.hpp b/web/protocol/http/outgoing/CommunicationUtils.hpp index 9fdc5331..324a1f55 100644 --- a/web/protocol/http/outgoing/CommunicationUtils.hpp +++ b/web/protocol/http/outgoing/CommunicationUtils.hpp @@ -31,6 +31,10 @@ namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing { class CommunicationUtils { +public: + static constexpr v_int32 CONNECTION_STATE_CLOSE = 0; + static constexpr v_int32 CONNECTION_STATE_KEEP_ALIVE = 1; + static constexpr v_int32 CONNECTION_STATE_UPGRADE = 2; private: static bool headerEqualsCI_FAST(const oatpp::data::share::MemoryLabel& headerValue, const char* value); public: @@ -38,11 +42,10 @@ public: /** * Consider keep connection alive taking into account request headers, response headers and protocol version. * Corresponding header will be set to response if not existed before - * return true - keep-alive - * return false - close + * return one of (CONNECTION_STATE_CLOSE, CONNECTION_STATE_KEEP_ALIVE, CONNECTION_STATE_UPGRADE) */ - static bool considerConnectionKeepAlive(const std::shared_ptr& request, - const std::shared_ptr& response); + static v_int32 considerConnectionState(const std::shared_ptr& request, + const std::shared_ptr& response); }; diff --git a/web/protocol/http/outgoing/Request.hpp b/web/protocol/http/outgoing/Request.hpp index 244a085a..8862fd93 100644 --- a/web/protocol/http/outgoing/Request.hpp +++ b/web/protocol/http/outgoing/Request.hpp @@ -64,11 +64,11 @@ public: return Shared_Outgoing_Request_Pool::allocateShared(method, path, headers, body); } - oatpp::data::share::StringKeyLabel getMethod() { + const oatpp::data::share::StringKeyLabel& getMethod() const { return m_method; } - oatpp::data::share::StringKeyLabel getPath() { + const oatpp::data::share::StringKeyLabel& getPath() const { return m_path; } diff --git a/web/protocol/http/outgoing/Response.hpp b/web/protocol/http/outgoing/Response.hpp index 002fbc11..5219b123 100644 --- a/web/protocol/http/outgoing/Response.hpp +++ b/web/protocol/http/outgoing/Response.hpp @@ -27,6 +27,7 @@ #include "oatpp/web/protocol/http/outgoing/Body.hpp" #include "oatpp/web/protocol/http/Http.hpp" +#include "oatpp/network/server/ConnectionHandler.hpp" #include "oatpp/core/async/Coroutine.hpp" namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing { @@ -41,6 +42,7 @@ private: Status m_status; Headers m_headers; std::shared_ptr m_body; + std::shared_ptr m_connectionUpgradeHandler; public: Response(const Status& status, const std::shared_ptr& body) @@ -54,7 +56,7 @@ public: return Shared_Outgoing_Response_Pool::allocateShared(status, body); } - Status getStatus() { + const Status& getStatus() const { return m_status; } @@ -75,6 +77,14 @@ public: return false; } + void setConnectionUpgradeHandler(const std::shared_ptr& handler) { + m_connectionUpgradeHandler = handler; + } + + std::shared_ptr getConnectionUpgradeHandler() { + return m_connectionUpgradeHandler; + } + void send(const std::shared_ptr& stream); oatpp::async::Action sendAsync(oatpp::async::AbstractCoroutine* parentCoroutine, diff --git a/web/server/AsyncHttpConnectionHandler.cpp b/web/server/AsyncHttpConnectionHandler.cpp index a3363857..8d144ed1 100644 --- a/web/server/AsyncHttpConnectionHandler.cpp +++ b/web/server/AsyncHttpConnectionHandler.cpp @@ -42,14 +42,14 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr(m_router.get(), - m_bodyDecoder, - m_errorHandler, - &m_requestInterceptors, - connection, - ioBuffer, - outStream, - inStream); + m_executor->execute(m_router.get(), + m_bodyDecoder, + m_errorHandler, + &m_requestInterceptors, + connection, + ioBuffer, + outStream, + inStream); } diff --git a/web/server/AsyncHttpConnectionHandler.hpp b/web/server/AsyncHttpConnectionHandler.hpp index d712f8a6..a1e92241 100644 --- a/web/server/AsyncHttpConnectionHandler.hpp +++ b/web/server/AsyncHttpConnectionHandler.hpp @@ -44,26 +44,41 @@ class AsyncHttpConnectionHandler : public base::Controllable, public network::se private: typedef oatpp::web::protocol::http::incoming::BodyDecoder BodyDecoder; private: - oatpp::async::Executor m_executor; + std::shared_ptr m_executor; private: std::shared_ptr m_router; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors m_requestInterceptors; std::shared_ptr m_bodyDecoder; // TODO make bodyDecoder configurable here public: + AsyncHttpConnectionHandler(const std::shared_ptr& router, v_int32 threadCount = OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT) - : m_executor(threadCount) + : m_executor(std::make_shared(threadCount)) , m_router(router) , m_errorHandler(handler::DefaultErrorHandler::createShared()) , m_bodyDecoder(std::make_shared()) { - m_executor.detach(); // TODO consider making it configurable + m_executor->detach(); } + + AsyncHttpConnectionHandler(const std::shared_ptr& router, + const std::shared_ptr& executor) + : m_executor(executor) + , m_router(router) + , m_errorHandler(handler::DefaultErrorHandler::createShared()) + , m_bodyDecoder(std::make_shared()) + {} public: - static std::shared_ptr createShared(const std::shared_ptr& router){ - return std::make_shared(router); + static std::shared_ptr createShared(const std::shared_ptr& router, + v_int32 threadCount = OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT){ + return std::make_shared(router, threadCount); + } + + static std::shared_ptr createShared(const std::shared_ptr& router, + const std::shared_ptr& executor){ + return std::make_shared(router, executor); } void setErrorHandler(const std::shared_ptr& errorHandler){ @@ -80,7 +95,7 @@ public: void handleConnection(const std::shared_ptr& connection) override; void stop() { - m_executor.stop(); + m_executor->stop(); } }; diff --git a/web/server/HttpConnectionHandler.cpp b/web/server/HttpConnectionHandler.cpp index b3a689b4..7a5755c8 100644 --- a/web/server/HttpConnectionHandler.cpp +++ b/web/server/HttpConnectionHandler.cpp @@ -25,6 +25,7 @@ #include "./HttpConnectionHandler.hpp" #include "oatpp/web/protocol/http/outgoing/ChunkedBufferBody.hpp" +#include "oatpp/web/protocol/http/outgoing/CommunicationUtils.hpp" #include "oatpp/web/protocol/http/incoming/Request.hpp" #include "oatpp/web/protocol/http/Http.hpp" @@ -38,10 +39,11 @@ void HttpConnectionHandler::Task::run(){ auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize); auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize); - bool keepAlive = true; + v_int32 connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE; + std::shared_ptr response; do { - auto response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive); + response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, connectionState); if(response) { outStream->setBufferPosition(0, 0); @@ -51,7 +53,16 @@ void HttpConnectionHandler::Task::run(){ return; } - } while(keepAlive); + } while(connectionState == oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE); + + if(connectionState == oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_UPGRADE) { + auto handler = response->getConnectionUpgradeHandler(); + if(handler) { + handler->handleConnection(m_connection); + } else { + OATPP_LOGD("[oatpp::web::server::HttpConnectionHandler::Task::run()]", "Warning. ConnectionUpgradeHandler not set!"); + } + } } diff --git a/web/server/HttpProcessor.cpp b/web/server/HttpProcessor.cpp index 09e60854..5ec0253b 100644 --- a/web/server/HttpProcessor.cpp +++ b/web/server/HttpProcessor.cpp @@ -24,11 +24,7 @@ #include "HttpProcessor.hpp" -#include "oatpp/web/protocol/http/outgoing/CommunicationUtils.hpp" - namespace oatpp { namespace web { namespace server { - -const char* HttpProcessor::RETURN_KEEP_ALIVE = "RETURN_KEEP_ALIVE"; std::shared_ptr HttpProcessor::processRequest(HttpRouter* router, @@ -39,26 +35,26 @@ HttpProcessor::processRequest(HttpRouter* router, void* buffer, v_int32 bufferSize, const std::shared_ptr& inStream, - bool& keepAlive) { + v_int32& connectionState) { RequestHeadersReader headersReader(buffer, bufferSize, 4096); oatpp::web::protocol::http::HttpError::Info error; auto headersReadResult = headersReader.readHeaders(connection, error); if(error.status.code != 0) { - keepAlive = false; + connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE; return errorHandler->handleError(error.status, "Invalid request headers"); } if(error.ioStatus <= 0) { - keepAlive = false; + connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE; return nullptr; // connection is in invalid state. should be dropped } auto route = router->getRoute(headersReadResult.startingLine.method, headersReadResult.startingLine.path); if(!route) { - keepAlive = false; + connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE; return errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping"); } @@ -94,7 +90,7 @@ HttpProcessor::processRequest(HttpRouter* router, response->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER); - keepAlive = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionKeepAlive(request, response); + connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionState(request, response); return response; } @@ -152,7 +148,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponse(const std: HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() { m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER); - m_keepAlive = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionKeepAlive(m_currentRequest, m_currentResponse); + m_connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse); m_outStream->setBufferPosition(0, 0); return m_currentResponse->sendAsync(this, m_outStream->flushAsync( @@ -163,9 +159,20 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() { } HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestDone() { - if(m_keepAlive) { + + if(m_connectionState == oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE) { return yieldTo(&HttpProcessor::Coroutine::act); } + + if(m_connectionState == oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_UPGRADE) { + auto handler = m_currentResponse->getConnectionUpgradeHandler(); + if(handler) { + handler->handleConnection(m_connection); + } else { + OATPP_LOGD("[oatpp::web::server::HttpProcessor::Coroutine::onRequestDone()]", "Warning. ConnectionUpgradeHandler not set!"); + } + } + return abort(); } diff --git a/web/server/HttpProcessor.hpp b/web/server/HttpProcessor.hpp index 3f39fdf9..eebf67ae 100644 --- a/web/server/HttpProcessor.hpp +++ b/web/server/HttpProcessor.hpp @@ -34,6 +34,7 @@ #include "oatpp/web/protocol/http/incoming/Request.hpp" #include "oatpp/web/protocol/http/outgoing/Response.hpp" +#include "oatpp/web/protocol/http/outgoing/CommunicationUtils.hpp" #include "oatpp/core/data/stream/StreamBufferedProxy.hpp" #include "oatpp/core/async/Processor.hpp" @@ -41,8 +42,6 @@ namespace oatpp { namespace web { namespace server { class HttpProcessor { -public: - static const char* RETURN_KEEP_ALIVE; public: typedef oatpp::collection::LinkedList> RequestInterceptors; typedef oatpp::web::protocol::http::incoming::RequestHeadersReader RequestHeadersReader; @@ -76,7 +75,7 @@ public: std::shared_ptr m_ioBuffer; std::shared_ptr m_outStream; std::shared_ptr m_inStream; - bool m_keepAlive; + v_int32 m_connectionState; private: oatpp::web::server::HttpRouter::BranchRouter::Route m_currentRoute; std::shared_ptr m_currentRequest; @@ -99,7 +98,7 @@ public: , m_ioBuffer(ioBuffer) , m_outStream(outStream) , m_inStream(inStream) - , m_keepAlive(true) + , m_connectionState(oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE) {} Action act() override; @@ -126,7 +125,7 @@ public: void* buffer, v_int32 bufferSize, const std::shared_ptr& inStream, - bool& keepAlive); + v_int32& connectionState); };