web::client::HttpRequestExecutor: #443. Automatically invalidate closed connection.

This commit is contained in:
lganzzzo 2021-08-04 21:00:01 +03:00
parent c8ff271d06
commit aa90bed836
2 changed files with 90 additions and 24 deletions

View File

@ -36,6 +36,42 @@
namespace oatpp { namespace web { namespace client { namespace oatpp { namespace web { namespace client {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// HttpRequestExecutor::HttpConnectionHandle
HttpRequestExecutor::HttpConnectionHandle::HttpConnectionHandle(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<oatpp::data::stream::IOStream>& stream)
: m_connectionProvider(connectionProvider)
, m_connection(stream)
, m_valid(true)
, m_invalidateOnDestroy(false)
{}
HttpRequestExecutor::HttpConnectionHandle::~HttpConnectionHandle() {
if(m_invalidateOnDestroy) {
invalidate();
}
}
std::shared_ptr<oatpp::data::stream::IOStream> HttpRequestExecutor::HttpConnectionHandle::getConnection() {
return m_connection;
}
void HttpRequestExecutor::HttpConnectionHandle::invalidate() {
if(m_valid) {
m_connectionProvider->invalidate(m_connection);
m_valid = false;
}
}
void HttpRequestExecutor::HttpConnectionHandle::setInvalidateOnDestroy(bool invalidateOnDestroy) {
m_invalidateOnDestroy = invalidateOnDestroy;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// HttpRequestExecutor
HttpRequestExecutor::HttpRequestExecutor(const std::shared_ptr<ClientConnectionProvider>& connectionProvider, HttpRequestExecutor::HttpRequestExecutor(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<RetryPolicy>& retryPolicy, const std::shared_ptr<RetryPolicy>& retryPolicy,
const std::shared_ptr<const BodyDecoder>& bodyDecoder) const std::shared_ptr<const BodyDecoder>& bodyDecoder)
@ -58,7 +94,7 @@ std::shared_ptr<HttpRequestExecutor::ConnectionHandle> HttpRequestExecutor::getC
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_CONNECT, throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_CONNECT,
"[oatpp::web::client::HttpRequestExecutor::getConnection()]: ConnectionProvider failed to provide Connection"); "[oatpp::web::client::HttpRequestExecutor::getConnection()]: ConnectionProvider failed to provide Connection");
} }
return std::make_shared<HttpConnectionHandle>(connection); return std::make_shared<HttpConnectionHandle>(m_connectionProvider, connection);
} }
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<HttpRequestExecutor::ConnectionHandle>&> oatpp::async::CoroutineStarterForResult<const std::shared_ptr<HttpRequestExecutor::ConnectionHandle>&>
@ -78,7 +114,7 @@ HttpRequestExecutor::getConnectionAsync() {
} }
Action onConnectionReady(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) { Action onConnectionReady(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) {
return _return(std::make_shared<HttpConnectionHandle>(connection)); return _return(std::make_shared<HttpConnectionHandle>(m_connectionProvider, connection));
} }
}; };
@ -90,8 +126,8 @@ HttpRequestExecutor::getConnectionAsync() {
void HttpRequestExecutor::invalidateConnection(const std::shared_ptr<ConnectionHandle>& connectionHandle) { void HttpRequestExecutor::invalidateConnection(const std::shared_ptr<ConnectionHandle>& connectionHandle) {
if(connectionHandle) { if(connectionHandle) {
auto connection = static_cast<HttpConnectionHandle*>(connectionHandle.get())->connection; auto handle = static_cast<HttpConnectionHandle*>(connectionHandle.get());
m_connectionProvider->invalidate(connection); handle->invalidate();
} }
} }
@ -104,8 +140,9 @@ HttpRequestExecutor::executeOnce(const String& method,
const std::shared_ptr<ConnectionHandle>& connectionHandle) { const std::shared_ptr<ConnectionHandle>& connectionHandle) {
std::shared_ptr<oatpp::data::stream::IOStream> connection; std::shared_ptr<oatpp::data::stream::IOStream> connection;
if(connectionHandle) { std::shared_ptr<HttpConnectionHandle> httpCH = std::static_pointer_cast<HttpConnectionHandle>(connectionHandle);
connection = static_cast<HttpConnectionHandle*>(connectionHandle.get())->connection; if(httpCH) {
connection = httpCH->getConnection();
} }
if(!connection){ if(!connection){
@ -131,21 +168,20 @@ HttpRequestExecutor::executeOnce(const String& method,
const auto& result = headerReader.readHeaders(connection, error); const auto& result = headerReader.readHeaders(connection, error);
if(error.status.code != 0) { if(error.status.code != 0) {
invalidateConnection(connectionHandle); httpCH->invalidate();
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_PARSE_STARTING_LINE, throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_PARSE_STARTING_LINE,
"[oatpp::web::client::HttpRequestExecutor::executeOnce()]: Failed to parse response. Invalid response headers"); "[oatpp::web::client::HttpRequestExecutor::executeOnce()]: Failed to parse response. Invalid response headers");
} }
if(error.ioStatus < 0) { if(error.ioStatus < 0) {
invalidateConnection(connectionHandle); httpCH->invalidate();
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_PARSE_STARTING_LINE, throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_PARSE_STARTING_LINE,
"[oatpp::web::client::HttpRequestExecutor::executeOnce()]: Failed to read response."); "[oatpp::web::client::HttpRequestExecutor::executeOnce()]: Failed to read response.");
} }
auto con_hdr = result.headers.getAsMemoryLabel<oatpp::data::share::StringKeyLabelCI>("Connection"); auto connectionHeader = result.headers.getAsMemoryLabel<oatpp::data::share::StringKeyLabelCI>(Header::CONNECTION);
if (con_hdr == "close") if (connectionHeader == "close") {
{ httpCH->setInvalidateOnDestroy(true);
invalidateConnection(connectionHandle);
} }
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection,
@ -179,7 +215,7 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
Headers m_headers; Headers m_headers;
std::shared_ptr<Body> m_body; std::shared_ptr<Body> m_body;
std::shared_ptr<const BodyDecoder> m_bodyDecoder; std::shared_ptr<const BodyDecoder> m_bodyDecoder;
std::shared_ptr<ConnectionHandle> m_connectionHandle; std::shared_ptr<HttpConnectionHandle> m_connectionHandle;
oatpp::data::share::MemoryLabel m_buffer; oatpp::data::share::MemoryLabel m_buffer;
ResponseHeadersReader m_headersReader; ResponseHeadersReader m_headersReader;
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> m_upstream; std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> m_upstream;
@ -193,7 +229,7 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
const Headers& headers, const Headers& headers,
const std::shared_ptr<Body>& body, const std::shared_ptr<Body>& body,
const std::shared_ptr<const BodyDecoder>& bodyDecoder, const std::shared_ptr<const BodyDecoder>& bodyDecoder,
const std::shared_ptr<ConnectionHandle>& connectionHandle) const std::shared_ptr<HttpConnectionHandle>& connectionHandle)
: m_this(_this) : m_this(_this)
, m_method(method) , m_method(method)
, m_path(path) , m_path(path)
@ -208,7 +244,7 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
Action act() override { Action act() override {
if(m_connectionHandle) { if(m_connectionHandle) {
m_connection = static_cast<HttpConnectionHandle*>(m_connectionHandle.get())->connection; m_connection = m_connectionHandle->getConnection();
} }
if(!m_connection) { if(!m_connection) {
@ -232,7 +268,12 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
} }
Action onHeadersParsed(const ResponseHeadersReader::Result& result) { Action onHeadersParsed(const ResponseHeadersReader::Result& result) {
auto connectionHeader = result.headers.getAsMemoryLabel<oatpp::data::share::StringKeyLabelCI>(Header::CONNECTION);
if (connectionHeader == "close") {
m_connectionHandle->setInvalidateOnDestroy(true);
}
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection,
m_buffer, m_buffer,
result.bufferPosStart, result.bufferPosStart,
@ -248,7 +289,7 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
Action handleError(oatpp::async::Error* error) override { Action handleError(oatpp::async::Error* error) override {
if(m_connectionHandle) { if(m_connectionHandle) {
m_this->invalidateConnection(m_connectionHandle); m_connectionHandle->invalidate();
} }
return error; return error;
@ -256,8 +297,9 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
} }
}; };
return ExecutorCoroutine::startForResult(this, method, path, headers, body, m_bodyDecoder, connectionHandle); auto httpCH = std::static_pointer_cast<HttpConnectionHandle>(connectionHandle);
return ExecutorCoroutine::startForResult(this, method, path, headers, body, m_bodyDecoder, httpCH);
} }

View File

@ -51,20 +51,44 @@ public:
* For more details see &id:oatpp::web::client::RequestExecutor::ConnectionHandle;. * For more details see &id:oatpp::web::client::RequestExecutor::ConnectionHandle;.
*/ */
class HttpConnectionHandle : public ConnectionHandle { class HttpConnectionHandle : public ConnectionHandle {
private:
/* provider which created this connection */
std::shared_ptr<ClientConnectionProvider> m_connectionProvider;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
bool m_valid;
bool m_invalidateOnDestroy;
public: public:
/** /**
* Constructor. * Constructor.
* @param connectionProvider - connection provider which created this connection.
* @param stream - &id:oatpp::data::stream::IOStream;. * @param stream - &id:oatpp::data::stream::IOStream;.
*/ */
HttpConnectionHandle(const std::shared_ptr<oatpp::data::stream::IOStream>& stream) HttpConnectionHandle(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
: connection(stream) const std::shared_ptr<oatpp::data::stream::IOStream>& stream);
{}
/** /**
* Connection. * Destructor.
*/ */
std::shared_ptr<oatpp::data::stream::IOStream> connection; ~HttpConnectionHandle() override;
/**
* Get connection I/O stream.
* @return
*/
std::shared_ptr<oatpp::data::stream::IOStream> getConnection();
/**
* Invalidate this connection.
*/
void invalidate();
/**
* Set if connection should be invalidated on destroy of ConnectionHandle.
* @param invalidateOnDestroy
*/
void setInvalidateOnDestroy(bool invalidateOnDestroy);
}; };
public: public: