Merge pull request #33 from oatpp/connection_upgrade_mechanism

Connection upgrade mechanism
This commit is contained in:
Leonid Stryzhevskyi 2019-01-19 19:03:10 +02:00 committed by GitHub
commit 3c1e3b2f5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 332 additions and 146 deletions

View File

@ -131,6 +131,9 @@ private:
{
AbstractCoroutine* savedCP = _CP;
_CP = _CP->m_parent;
_FP = nullptr;
/* Please note that savedCP->m_parentReturnAction should not be "REPEAT nor WAIT_RETRY" */
/* as funtion pointer (FP) is invalidated */
Action a = takeAction(savedCP->m_parentReturnAction);
savedCP->m_parentReturnAction.m_coroutine = nullptr;
savedCP->free();

View File

@ -261,7 +261,7 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor
}
Action writeCurrData() {
return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
}
};

View File

@ -149,9 +149,14 @@ public:
os::io::Library::v_size pos,
os::io::Library::v_size count);
oatpp::String getSubstring(os::io::Library::v_size pos,
os::io::Library::v_size count);
/**
* return substring of the data written to stream; NOT NULL
*/
oatpp::String getSubstring(os::io::Library::v_size pos, os::io::Library::v_size count);
/**
* return data written to stream as oatpp::String; NOT NULL
*/
oatpp::String toString() {
return getSubstring(0, m_size);
}

View File

@ -220,10 +220,10 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
}
Action doWrite() {
return oatpp::data::stream::writeDataAsyncInline(m_toStream.get(),
m_writeBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::act));
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_toStream.get(),
m_writeBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::act));
}
Action handleError(const oatpp::async::Error& error) override {
@ -241,10 +241,10 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
}
oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction) {
oatpp::async::Action writeSomeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction) {
auto res = stream->write(data, size);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY;
@ -252,11 +252,36 @@ oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* str
return oatpp::async::Action::_REPEAT;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) {
return oatpp::async::Action::_ABORT;
} else if( res < 0) {
} else if(res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA));
} else if(res < size) {
data = &((p_char8) data)[res];
size = size - res;
}
data = &((p_char8) data)[res];
size = size - res;
if(res < size && res > 0) {
return oatpp::async::Action::_REPEAT;
}
return nextAction;
}
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction) {
auto res = stream->write(data, size);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
return oatpp::async::Action::_REPEAT;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) {
return oatpp::async::Action::_ABORT;
} else if(res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA));
} else if(res == 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA));
}
data = &((p_char8) data)[res];
size = size - res;
if(res < size) {
return oatpp::async::Action::_REPEAT;
}
return nextAction;
@ -273,11 +298,9 @@ oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* s
return oatpp::async::Action::_REPEAT;
} else if( res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
} else if(res < bytesLeftToRead) {
data = &((p_char8) data)[res];
bytesLeftToRead -= res;
return nextAction;
}
// res == 0 is not an error here
data = &((p_char8) data)[res];
bytesLeftToRead -= res;
return nextAction;
}
@ -295,15 +318,41 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre
return oatpp::async::Action::_ABORT;
} else if( res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
} else if(res < bytesLeftToRead) {
data = &((p_char8) data)[res];
bytesLeftToRead -= res;
} else if(res == 0) { // Connection is probably closed or eof is reached
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
}
data = &((p_char8) data)[res];
bytesLeftToRead -= res;
if(res < bytesLeftToRead) {
return oatpp::async::Action::_REPEAT;
}
bytesLeftToRead -= res;
return nextAction;
}
oatpp::os::io::Library::v_size readExactSizeData(oatpp::data::stream::InputStream* stream, void* data, os::io::Library::v_size size) {
char* buffer = (char*) data;
oatpp::os::io::Library::v_size progress = 0;
while (progress < size) {
auto res = stream->read(&buffer[progress], size - progress);
if(res > 0) {
progress += res;
} else { // if res == 0 then probably stream handles read() error incorrectly. return.
if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
continue;
}
return progress;
}
}
return progress;
}
oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size) {
const char* buffer = (char*)data;

View File

@ -162,10 +162,15 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
* Async write data withot starting new Coroutine.
* Should be called from a separate Coroutine method
*/
oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction);
oatpp::async::Action writeSomeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction);
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction);
oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream,
void*& data,
@ -177,6 +182,13 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre
os::io::Library::v_size& bytesLeftToRead,
const oatpp::async::Action& nextAction);
/**
* Read exact amount of bytes to stream
* returns exact amount of bytes was read.
* return result can be < size only in case of some disaster like connection reset by peer
*/
oatpp::os::io::Library::v_size readExactSizeData(oatpp::data::stream::InputStream* stream, void* data, os::io::Library::v_size size);
/**
* Write exact amount of bytes to stream.
* returns exact amount of bytes was written.

View File

@ -29,6 +29,9 @@
namespace oatpp { namespace test { namespace base {
/**
* Test command line arguments parsing.
*/
class CommandLineArgumentsTest : public UnitTest{
public:

View File

@ -29,6 +29,9 @@
namespace oatpp { namespace test { namespace memory {
/**
* Test memory pool allocations
*/
class MemoryPoolTest : public UnitTest{
public:

View File

@ -99,6 +99,7 @@ const Status Status::CODE_511(511, "Network Authentication Required");
const char* const Header::Value::CONNECTION_CLOSE = "close";
const char* const Header::Value::CONNECTION_KEEP_ALIVE = "keep-alive";
const char* const Header::Value::CONNECTION_UPGRADE = "Upgrade";
const char* const Header::Value::SERVER = "oatpp/" OATPP_VERSION;
const char* const Header::Value::USER_AGENT = "oatpp/" OATPP_VERSION;
@ -119,6 +120,7 @@ const char* const Header::RANGE = "Range";
const char* const Header::HOST = "Host";
const char* const Header::USER_AGENT = "User-Agent";
const char* const Header::SERVER = "Server";
const char* const Header::UPGRADE = "Upgrade";
const char* const Range::UNIT_BYTES = "bytes";
const char* const ContentRange::UNIT_BYTES = "bytes";

View File

@ -154,6 +154,7 @@ public:
public:
static const char* const CONNECTION_CLOSE;
static const char* const CONNECTION_KEEP_ALIVE;
static const char* const CONNECTION_UPGRADE;
static const char* const SERVER;
static const char* const USER_AGENT;
@ -174,6 +175,7 @@ public:
static const char* const HOST; // "Host"
static const char* const USER_AGENT; // "User-Agent"
static const char* const SERVER; // "Server"
static const char* const UPGRADE; // "Upgrade"
};
class Range {

View File

@ -35,6 +35,17 @@ class Request : public oatpp::base::Controllable {
public:
OBJECT_POOL(Incoming_Request_Pool, Request, 32)
SHARED_OBJECT_POOL(Shared_Incoming_Request_Pool, Request, 32)
private:
http::RequestStartingLine m_startingLine;
url::mapping::Pattern::MatchMap m_pathVariables;
http::Protocol::Headers m_headers;
std::shared_ptr<oatpp::data::stream::InputStream> m_bodyStream;
/**
* Request should be preconfigured with default BodyDecoder.
* Custom BodyDecoder can be set on demand
*/
std::shared_ptr<const http::incoming::BodyDecoder> m_bodyDecoder;
public:
/*
Request(const std::shared_ptr<const http::incoming::BodyDecoder>& pBodyDecoder)
@ -42,16 +53,16 @@ public:
{}
*/
Request(const http::RequestStartingLine& pStartingLine,
const url::mapping::Pattern::MatchMap& pPathVariables,
const http::Protocol::Headers& pHeaders,
const std::shared_ptr<oatpp::data::stream::InputStream>& pBodyStream,
const std::shared_ptr<const http::incoming::BodyDecoder>& pBodyDecoder)
: startingLine(pStartingLine)
, pathVariables(pPathVariables)
, headers(pHeaders)
, bodyStream(pBodyStream)
, bodyDecoder(pBodyDecoder)
Request(const http::RequestStartingLine& startingLine,
const url::mapping::Pattern::MatchMap& pathVariables,
const http::Protocol::Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<const http::incoming::BodyDecoder>& bodyDecoder)
: m_startingLine(startingLine)
, m_pathVariables(pathVariables)
, m_headers(headers)
, m_bodyStream(bodyStream)
, m_bodyDecoder(bodyDecoder)
{}
public:
@ -63,50 +74,59 @@ public:
return Shared_Incoming_Request_Pool::allocateShared(startingLine, pathVariables, headers, bodyStream, bodyDecoder);
}
http::RequestStartingLine startingLine;
url::mapping::Pattern::MatchMap pathVariables;
http::Protocol::Headers headers;
std::shared_ptr<oatpp::data::stream::InputStream> bodyStream;
const http::RequestStartingLine& getStartingLine() const {
return m_startingLine;
}
/**
* Request should be preconfigured with default BodyDecoder.
* Custom BodyDecoder can be set on demand
*/
std::shared_ptr<const http::incoming::BodyDecoder> bodyDecoder;
const url::mapping::Pattern::MatchMap& getPathVariables() const {
return m_pathVariables;
}
const http::Protocol::Headers& getHeaders() const {
return m_headers;
}
std::shared_ptr<oatpp::data::stream::InputStream> getBodyStream() const {
return m_bodyStream;
}
std::shared_ptr<const http::incoming::BodyDecoder> getBodyDecoder() const {
return m_bodyDecoder;
}
oatpp::String getHeader(const oatpp::String& headerName) const{
auto it = headers.find(headerName);
if(it != headers.end()) {
auto it = m_headers.find(headerName);
if(it != m_headers.end()) {
return it->second.toString();
}
return nullptr;
}
oatpp::String getPathVariable(const oatpp::data::share::StringKeyLabel& name) const {
return pathVariables.getVariable(name);
return m_pathVariables.getVariable(name);
}
oatpp::String getPathTail() const {
return pathVariables.getTail();
return m_pathVariables.getTail();
}
void streamBody(const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
bodyDecoder->decode(headers, bodyStream, toStream);
m_bodyDecoder->decode(m_headers, m_bodyStream, toStream);
}
oatpp::String readBodyToString() const {
return bodyDecoder->decodeToString(headers, bodyStream);
return m_bodyDecoder->decodeToString(m_headers, m_bodyStream);
}
template<class Type>
typename Type::ObjectWrapper readBodyToDto(const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return objectMapper->readFromString<Type>(bodyDecoder->decodeToString(headers, bodyStream));
return objectMapper->readFromString<Type>(m_bodyDecoder->decodeToString(m_headers, m_bodyStream));
}
template<class Type>
void readBodyToDto(oatpp::data::mapping::type::PolymorphicWrapper<Type>& objectWrapper,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
objectWrapper = objectMapper->readFromString<Type>(bodyDecoder->decodeToString(headers, bodyStream));
objectWrapper = objectMapper->readFromString<Type>(m_bodyDecoder->decodeToString(m_headers, m_bodyStream));
}
// Async
@ -114,20 +134,20 @@ public:
oatpp::async::Action streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
return bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream);
return m_bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, m_headers, m_bodyStream, toStream);
}
template<typename ParentCoroutineType>
oatpp::async::Action readBodyToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&)) const {
return bodyDecoder->decodeToStringAsync(parentCoroutine, callback, headers, bodyStream);
return m_bodyDecoder->decodeToStringAsync(parentCoroutine, callback, m_headers, m_bodyStream);
}
template<class DtoType, typename ParentCoroutineType>
oatpp::async::Action readBodyToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&),
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return bodyDecoder->decodeToDtoAsync<DtoType>(parentCoroutine, callback, headers, bodyStream, objectMapper);
return m_bodyDecoder->decodeToDtoAsync<DtoType>(parentCoroutine, callback, m_headers, m_bodyStream, objectMapper);
}
};

View File

@ -100,7 +100,7 @@ RequestHeadersReader::Action RequestHeadersReader::readHeadersAsync(oatpp::async
const std::shared_ptr<oatpp::data::stream::IOStream>& connection)
{
class ReaderCoroutine : public oatpp::async::CoroutineWithResult<ReaderCoroutine, Result> {
class ReaderCoroutine : public oatpp::async::CoroutineWithResult<ReaderCoroutine, const Result&> {
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
p_char8 m_buffer;

View File

@ -34,17 +34,32 @@ class Response : public oatpp::base::Controllable {
public:
OBJECT_POOL(Incoming_Response_Pool, Response, 32)
SHARED_OBJECT_POOL(Shared_Incoming_Response_Pool, Response, 32)
private:
v_int32 m_statusCode;
oatpp::String m_statusDescription;
http::Protocol::Headers m_headers;
std::shared_ptr<oatpp::data::stream::InputStream> m_bodyStream;
/**
* Response should be preconfigured with default BodyDecoder.
* Entity that created response object is responsible for providing correct BodyDecoder.
* Custom BodyDecoder can be set on demand
*/
std::shared_ptr<const http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
public:
Response(v_int32 pStatusCode,
const oatpp::String& pStatusDescription,
const http::Protocol::Headers& pHeaders,
const std::shared_ptr<oatpp::data::stream::InputStream>& pBodyStream,
const std::shared_ptr<const http::incoming::BodyDecoder>& pBodyDecoder)
: statusCode(pStatusCode)
, statusDescription(pStatusDescription)
, headers(pHeaders)
, bodyStream(pBodyStream)
, bodyDecoder(pBodyDecoder)
Response(v_int32 statusCode,
const oatpp::String& statusDescription,
const http::Protocol::Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<const http::incoming::BodyDecoder>& bodyDecoder)
: m_statusCode(statusCode)
, m_statusDescription(statusDescription)
, m_headers(headers)
, m_bodyStream(bodyStream)
, m_bodyDecoder(bodyDecoder)
{}
public:
@ -56,29 +71,37 @@ public:
return Shared_Incoming_Response_Pool::allocateShared(statusCode, statusDescription, headers, bodyStream, bodyDecoder);
}
const v_int32 statusCode;
const oatpp::String statusDescription;
const http::Protocol::Headers headers;
const std::shared_ptr<oatpp::data::stream::InputStream> bodyStream;
v_int32 getStatusCode() const {
return m_statusCode;
}
/**
* Response should be preconfigured with default BodyDecoder.
* Entity that created response object is responsible for providing correct BodyDecoder.
* Custom BodyDecoder can be set on demand
*/
std::shared_ptr<const http::incoming::BodyDecoder> bodyDecoder;
oatpp::String getStatusDescription() const {
return m_statusDescription;
}
const http::Protocol::Headers& getHeaders() const {
return m_headers;
}
std::shared_ptr<oatpp::data::stream::InputStream> getBodyStream() const {
return m_bodyStream;
}
std::shared_ptr<const http::incoming::BodyDecoder> getBodyDecoder() const {
return m_bodyDecoder;
}
void streamBody(const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
bodyDecoder->decode(headers, bodyStream, toStream);
m_bodyDecoder->decode(m_headers, m_bodyStream, toStream);
}
oatpp::String readBodyToString() const {
return bodyDecoder->decodeToString(headers, bodyStream);
return m_bodyDecoder->decodeToString(m_headers, m_bodyStream);
}
template<class Type>
typename Type::ObjectWrapper readBodyToDto(const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return bodyDecoder->decodeToDto<Type>(headers, bodyStream, objectMapper);
return m_bodyDecoder->decodeToDto<Type>(m_headers, m_bodyStream, objectMapper);
}
// Async
@ -86,20 +109,20 @@ public:
oatpp::async::Action streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
return bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream);
return m_bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, m_headers, m_bodyStream, toStream);
}
template<typename ParentCoroutineType>
oatpp::async::Action readBodyToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&)) const {
return bodyDecoder->decodeToStringAsync(parentCoroutine, callback, headers, bodyStream);
return m_bodyDecoder->decodeToStringAsync(parentCoroutine, callback, m_headers, m_bodyStream);
}
template<class DtoType, typename ParentCoroutineType>
oatpp::async::Action readBodyToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&),
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return bodyDecoder->decodeToDtoAsync<DtoType>(parentCoroutine, callback, headers, bodyStream, objectMapper);
return m_bodyDecoder->decodeToDtoAsync<DtoType>(parentCoroutine, callback, m_headers, m_bodyStream, objectMapper);
}
};

View File

@ -100,7 +100,7 @@ ResponseHeadersReader::Action ResponseHeadersReader::readHeadersAsync(oatpp::asy
const std::shared_ptr<oatpp::data::stream::IOStream>& connection)
{
class ReaderCoroutine : public oatpp::async::CoroutineWithResult<ReaderCoroutine, Result> {
class ReaderCoroutine : public oatpp::async::CoroutineWithResult<ReaderCoroutine, const Result&> {
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
p_char8 m_buffer;

View File

@ -74,7 +74,7 @@ public:
{}
Action act() override {
return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish());
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish());
}
};

View File

@ -145,7 +145,7 @@ public:
}
Action writeCurrData() {
return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
}
};

View File

@ -31,33 +31,46 @@ bool CommunicationUtils::headerEqualsCI_FAST(const oatpp::data::share::MemoryLab
return size == headerValue.getSize() && oatpp::base::StrBuffer::equalsCI_FAST(headerValue.getData(), value, size);
}
bool CommunicationUtils::considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response){
v_int32 CommunicationUtils::considerConnectionState(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response){
auto outState = response->getHeaders().find(Header::CONNECTION);
if(outState != response->getHeaders().end() && headerEqualsCI_FAST(outState->second, Header::Value::CONNECTION_UPGRADE)) {
return CONNECTION_STATE_UPGRADE;
}
if(request) {
/* Set keep-alive to value specified in the client's request, if no Connection header present in response. */
/* Set keep-alive to value specified in response otherwise */
auto it = request->headers.find(Header::CONNECTION);
if(it != request->headers.end() && headerEqualsCI_FAST(it->second, Header::Value::CONNECTION_KEEP_ALIVE)) {
if(response->putHeaderIfNotExists(Header::CONNECTION, it->second)){
return true;
auto it = request->getHeaders().find(Header::CONNECTION);
if(it != request->getHeaders().end() && headerEqualsCI_FAST(it->second, Header::Value::CONNECTION_KEEP_ALIVE)) {
if(outState != response->getHeaders().end()) {
if(headerEqualsCI_FAST(outState->second, Header::Value::CONNECTION_KEEP_ALIVE)) {
return CONNECTION_STATE_KEEP_ALIVE;
} else {
return CONNECTION_STATE_CLOSE;
}
} else {
auto outKeepAlive = response->getHeaders().find(Header::CONNECTION);
return (outKeepAlive != response->getHeaders().end() && headerEqualsCI_FAST(outKeepAlive->second, Header::Value::CONNECTION_KEEP_ALIVE));
response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE);
return CONNECTION_STATE_KEEP_ALIVE;
}
}
/* If protocol == HTTP/1.1 */
/* Set HTTP/1.1 default Connection header value (Keep-Alive), if no Connection header present in response. */
/* Set keep-alive to value specified in response otherwise */
auto& protocol = request->startingLine.protocol;
auto& protocol = request->getStartingLine().protocol;
if(protocol.getData() != nullptr && headerEqualsCI_FAST(protocol, "HTTP/1.1")) {
if(!response->putHeaderIfNotExists(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE)) {
auto outKeepAlive = response->getHeaders().find(Header::CONNECTION);
return (outKeepAlive != response->getHeaders().end() && headerEqualsCI_FAST(outKeepAlive->second, Header::Value::CONNECTION_KEEP_ALIVE));
if(outState != response->getHeaders().end()) {
if(headerEqualsCI_FAST(outState->second, Header::Value::CONNECTION_KEEP_ALIVE)) {
return CONNECTION_STATE_KEEP_ALIVE;
} else {
return CONNECTION_STATE_CLOSE;
}
} else {
response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE);
return CONNECTION_STATE_KEEP_ALIVE;
}
return true;
}
}
@ -65,12 +78,18 @@ bool CommunicationUtils::considerConnectionKeepAlive(const std::shared_ptr<proto
/* If protocol != HTTP/1.1 */
/* Set default Connection header value (Close), if no Connection header present in response. */
/* Set keep-alive to value specified in response otherwise */
if(!response->putHeaderIfNotExists(Header::CONNECTION, Header::Value::CONNECTION_CLOSE)) {
auto outKeepAlive = response->getHeaders().find(Header::CONNECTION);
return (outKeepAlive != response->getHeaders().end() && headerEqualsCI_FAST(outKeepAlive->second, Header::Value::CONNECTION_KEEP_ALIVE));
if(outState != response->getHeaders().end()) {
if(headerEqualsCI_FAST(outState->second, Header::Value::CONNECTION_KEEP_ALIVE)) {
return CONNECTION_STATE_KEEP_ALIVE;
} else {
return CONNECTION_STATE_CLOSE;
}
} else {
response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_CLOSE);
return CONNECTION_STATE_CLOSE;
}
return false;
return CONNECTION_STATE_CLOSE;
}

View File

@ -31,6 +31,10 @@
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
class CommunicationUtils {
public:
static constexpr v_int32 CONNECTION_STATE_CLOSE = 0;
static constexpr v_int32 CONNECTION_STATE_KEEP_ALIVE = 1;
static constexpr v_int32 CONNECTION_STATE_UPGRADE = 2;
private:
static bool headerEqualsCI_FAST(const oatpp::data::share::MemoryLabel& headerValue, const char* value);
public:
@ -38,11 +42,10 @@ public:
/**
* Consider keep connection alive taking into account request headers, response headers and protocol version.
* Corresponding header will be set to response if not existed before
* return true - keep-alive
* return false - close
* return one of (CONNECTION_STATE_CLOSE, CONNECTION_STATE_KEEP_ALIVE, CONNECTION_STATE_UPGRADE)
*/
static bool considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response);
static v_int32 considerConnectionState(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response);
};

View File

@ -64,11 +64,11 @@ public:
return Shared_Outgoing_Request_Pool::allocateShared(method, path, headers, body);
}
oatpp::data::share::StringKeyLabel getMethod() {
const oatpp::data::share::StringKeyLabel& getMethod() const {
return m_method;
}
oatpp::data::share::StringKeyLabel getPath() {
const oatpp::data::share::StringKeyLabel& getPath() const {
return m_path;
}

View File

@ -27,6 +27,7 @@
#include "oatpp/web/protocol/http/outgoing/Body.hpp"
#include "oatpp/web/protocol/http/Http.hpp"
#include "oatpp/network/server/ConnectionHandler.hpp"
#include "oatpp/core/async/Coroutine.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
@ -41,6 +42,7 @@ private:
Status m_status;
Headers m_headers;
std::shared_ptr<Body> m_body;
std::shared_ptr<oatpp::network::server::ConnectionHandler> m_connectionUpgradeHandler;
public:
Response(const Status& status,
const std::shared_ptr<Body>& body)
@ -54,7 +56,7 @@ public:
return Shared_Outgoing_Response_Pool::allocateShared(status, body);
}
Status getStatus() {
const Status& getStatus() const {
return m_status;
}
@ -75,6 +77,14 @@ public:
return false;
}
void setConnectionUpgradeHandler(const std::shared_ptr<oatpp::network::server::ConnectionHandler>& handler) {
m_connectionUpgradeHandler = handler;
}
std::shared_ptr<oatpp::network::server::ConnectionHandler> getConnectionUpgradeHandler() {
return m_connectionUpgradeHandler;
}
void send(const std::shared_ptr<data::stream::OutputStream>& stream);
oatpp::async::Action sendAsync(oatpp::async::AbstractCoroutine* parentCoroutine,

View File

@ -42,14 +42,14 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::d
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, ioBuffer);
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, ioBuffer);
m_executor.execute<HttpProcessor::Coroutine>(m_router.get(),
m_bodyDecoder,
m_errorHandler,
&m_requestInterceptors,
connection,
ioBuffer,
outStream,
inStream);
m_executor->execute<HttpProcessor::Coroutine>(m_router.get(),
m_bodyDecoder,
m_errorHandler,
&m_requestInterceptors,
connection,
ioBuffer,
outStream,
inStream);
}

View File

@ -44,26 +44,41 @@ class AsyncHttpConnectionHandler : public base::Controllable, public network::se
private:
typedef oatpp::web::protocol::http::incoming::BodyDecoder BodyDecoder;
private:
oatpp::async::Executor m_executor;
std::shared_ptr<oatpp::async::Executor> m_executor;
private:
std::shared_ptr<HttpRouter> m_router;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors m_requestInterceptors;
std::shared_ptr<const BodyDecoder> m_bodyDecoder; // TODO make bodyDecoder configurable here
public:
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router,
v_int32 threadCount = OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT)
: m_executor(threadCount)
: m_executor(std::make_shared<oatpp::async::Executor>(threadCount))
, m_router(router)
, m_errorHandler(handler::DefaultErrorHandler::createShared())
, m_bodyDecoder(std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>())
{
m_executor.detach(); // TODO consider making it configurable
m_executor->detach();
}
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router,
const std::shared_ptr<oatpp::async::Executor>& executor)
: m_executor(executor)
, m_router(router)
, m_errorHandler(handler::DefaultErrorHandler::createShared())
, m_bodyDecoder(std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>())
{}
public:
static std::shared_ptr<AsyncHttpConnectionHandler> createShared(const std::shared_ptr<HttpRouter>& router){
return std::make_shared<AsyncHttpConnectionHandler>(router);
static std::shared_ptr<AsyncHttpConnectionHandler> createShared(const std::shared_ptr<HttpRouter>& router,
v_int32 threadCount = OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT){
return std::make_shared<AsyncHttpConnectionHandler>(router, threadCount);
}
static std::shared_ptr<AsyncHttpConnectionHandler> createShared(const std::shared_ptr<HttpRouter>& router,
const std::shared_ptr<oatpp::async::Executor>& executor){
return std::make_shared<AsyncHttpConnectionHandler>(router, executor);
}
void setErrorHandler(const std::shared_ptr<handler::ErrorHandler>& errorHandler){
@ -80,7 +95,7 @@ public:
void handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) override;
void stop() {
m_executor.stop();
m_executor->stop();
}
};

View File

@ -25,6 +25,7 @@
#include "./HttpConnectionHandler.hpp"
#include "oatpp/web/protocol/http/outgoing/ChunkedBufferBody.hpp"
#include "oatpp/web/protocol/http/outgoing/CommunicationUtils.hpp"
#include "oatpp/web/protocol/http/incoming/Request.hpp"
#include "oatpp/web/protocol/http/Http.hpp"
@ -38,10 +39,11 @@ void HttpConnectionHandler::Task::run(){
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize);
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize);
bool keepAlive = true;
v_int32 connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE;
std::shared_ptr<oatpp::web::protocol::http::outgoing::Response> response;
do {
auto response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, connectionState);
if(response) {
outStream->setBufferPosition(0, 0);
@ -51,7 +53,16 @@ void HttpConnectionHandler::Task::run(){
return;
}
} while(keepAlive);
} while(connectionState == oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE);
if(connectionState == oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_UPGRADE) {
auto handler = response->getConnectionUpgradeHandler();
if(handler) {
handler->handleConnection(m_connection);
} else {
OATPP_LOGD("[oatpp::web::server::HttpConnectionHandler::Task::run()]", "Warning. ConnectionUpgradeHandler not set!");
}
}
}

View File

@ -24,12 +24,8 @@
#include "HttpProcessor.hpp"
#include "oatpp/web/protocol/http/outgoing/CommunicationUtils.hpp"
namespace oatpp { namespace web { namespace server {
const char* HttpProcessor::RETURN_KEEP_ALIVE = "RETURN_KEEP_ALIVE";
std::shared_ptr<protocol::http::outgoing::Response>
HttpProcessor::processRequest(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
@ -39,26 +35,26 @@ HttpProcessor::processRequest(HttpRouter* router,
void* buffer,
v_int32 bufferSize,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
bool& keepAlive) {
v_int32& connectionState) {
RequestHeadersReader headersReader(buffer, bufferSize, 4096);
oatpp::web::protocol::http::HttpError::Info error;
auto headersReadResult = headersReader.readHeaders(connection, error);
if(error.status.code != 0) {
keepAlive = false;
connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE;
return errorHandler->handleError(error.status, "Invalid request headers");
}
if(error.ioStatus <= 0) {
keepAlive = false;
connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE;
return nullptr; // connection is in invalid state. should be dropped
}
auto route = router->getRoute(headersReadResult.startingLine.method, headersReadResult.startingLine.path);
if(!route) {
keepAlive = false;
connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE;
return errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
}
@ -94,7 +90,7 @@ HttpProcessor::processRequest(HttpRouter* router,
response->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
keepAlive = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionKeepAlive(request, response);
connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionState(request, response);
return response;
}
@ -152,7 +148,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponse(const std:
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
m_keepAlive = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionKeepAlive(m_currentRequest, m_currentResponse);
m_connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse);
m_outStream->setBufferPosition(0, 0);
return m_currentResponse->sendAsync(this,
m_outStream->flushAsync(
@ -163,9 +159,20 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestDone() {
if(m_keepAlive) {
if(m_connectionState == oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE) {
return yieldTo(&HttpProcessor::Coroutine::act);
}
if(m_connectionState == oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_UPGRADE) {
auto handler = m_currentResponse->getConnectionUpgradeHandler();
if(handler) {
handler->handleConnection(m_connection);
} else {
OATPP_LOGD("[oatpp::web::server::HttpProcessor::Coroutine::onRequestDone()]", "Warning. ConnectionUpgradeHandler not set!");
}
}
return abort();
}

View File

@ -34,6 +34,7 @@
#include "oatpp/web/protocol/http/incoming/Request.hpp"
#include "oatpp/web/protocol/http/outgoing/Response.hpp"
#include "oatpp/web/protocol/http/outgoing/CommunicationUtils.hpp"
#include "oatpp/core/data/stream/StreamBufferedProxy.hpp"
#include "oatpp/core/async/Processor.hpp"
@ -41,8 +42,6 @@
namespace oatpp { namespace web { namespace server {
class HttpProcessor {
public:
static const char* RETURN_KEEP_ALIVE;
public:
typedef oatpp::collection::LinkedList<std::shared_ptr<oatpp::web::server::handler::RequestInterceptor>> RequestInterceptors;
typedef oatpp::web::protocol::http::incoming::RequestHeadersReader RequestHeadersReader;
@ -76,7 +75,7 @@ public:
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_ioBuffer;
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> m_outStream;
std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy> m_inStream;
bool m_keepAlive;
v_int32 m_connectionState;
private:
oatpp::web::server::HttpRouter::BranchRouter::Route m_currentRoute;
std::shared_ptr<protocol::http::incoming::Request> m_currentRequest;
@ -99,7 +98,7 @@ public:
, m_ioBuffer(ioBuffer)
, m_outStream(outStream)
, m_inStream(inStream)
, m_keepAlive(true)
, m_connectionState(oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE)
{}
Action act() override;
@ -126,7 +125,7 @@ public:
void* buffer,
v_int32 bufferSize,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
bool& keepAlive);
v_int32& connectionState);
};