Merge pull request #219 from oatpp/correct_connection_invalidation

SimpleTCPConnectionProvider. Correct invalidateConnection() method.
This commit is contained in:
Leonid Stryzhevskyi 2020-04-11 15:50:05 +03:00 committed by GitHub
commit a5140ca799
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 87 additions and 21 deletions

View File

@ -104,6 +104,10 @@ bool ConnectionPool::ConnectionWrapper::isValid() {
return m_connection && m_recycleConnection;
}
const std::shared_ptr<data::stream::IOStream>& ConnectionPool::ConnectionWrapper::getUnderlyingConnection() {
return m_connection;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionPool
@ -310,6 +314,13 @@ void ConnectionPool::close() {
}
void ConnectionPool::invalidateConnection(const std::shared_ptr<IOStream>& connection) {
auto wrapper = std::static_pointer_cast<ConnectionPool::ConnectionWrapper>(connection);
wrapper->invalidate();
auto c = wrapper->getUnderlyingConnection();
m_connectionProvider->invalidateConnection(c);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ServerConnectionPool
@ -358,8 +369,7 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
}
void ServerConnectionPool::invalidateConnection(const std::shared_ptr<IOStream>& connection) {
auto wrapper = std::static_pointer_cast<ConnectionPool::ConnectionWrapper>(connection);
wrapper->invalidate();
m_pool.invalidateConnection(connection);
}
void ServerConnectionPool::close() {
@ -414,8 +424,7 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
}
void ClientConnectionPool::invalidateConnection(const std::shared_ptr<IOStream>& connection) {
auto wrapper = std::static_pointer_cast<ConnectionPool::ConnectionWrapper>(connection);
wrapper->invalidate();
m_pool.invalidateConnection(connection);
}
void ClientConnectionPool::close() {

View File

@ -80,10 +80,13 @@ public:
* Will acquire connection from the pool on initialization and will return connection to the pool on destruction.
*/
class ConnectionWrapper : public oatpp::data::stream::IOStream {
friend ConnectionPool;
private:
std::shared_ptr<IOStream> m_connection;
std::shared_ptr<Pool> m_pool;
bool m_recycleConnection;
private:
void invalidate();
public:
ConnectionWrapper(const std::shared_ptr<IOStream>& connection, const std::shared_ptr<Pool>& pool);
@ -101,17 +104,18 @@ public:
oatpp::data::stream::Context& getOutputStreamContext() override;
oatpp::data::stream::Context& getInputStreamContext() override;
/**
* Mark that this connection cannot be reused in the pool any more.
*/
void invalidate();
/**
* Check if connection is still valid.
* @return
*/
bool isValid();
/**
* Get underlying connection.
* @return
*/
const std::shared_ptr<IOStream>& getUnderlyingConnection();
};
private:
@ -197,6 +201,12 @@ public:
*/
void close();
/**
* Invalidate connection that was previously obtain by a call to `getConnection()` or `getConnectionAsync()`.
* @param connection - **MUST** be instance of `&l:ConnectionPool::ConnectionWrapper;` or its subclass.
*/
void invalidateConnection(const std::shared_ptr<IOStream>& connection);
};
/**

View File

@ -266,4 +266,29 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
}
void SimpleTCPConnectionProvider::invalidateConnection(const std::shared_ptr<IOStream>& connection) {
/************************************************
* WARNING!!!
*
* shutdown(handle, SHUT_RDWR) <--- DO!
* close(handle); <--- DO NOT!
*
* DO NOT CLOSE file handle here -
* USE shutdown instead.
* Using close prevent FDs popping out of epoll,
* and they'll be stuck there forever.
************************************************/
auto c = std::static_pointer_cast<network::Connection>(connection);
v_io_handle handle = c->getHandle();
#if defined(WIN32) || defined(_WIN32)
shutdown(handle, SD_BOTH);
#else
shutdown(handle, SHUT_RDWR);
#endif
}
}}}

View File

@ -78,13 +78,12 @@ public:
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::stream::IOStream>&> getConnectionAsync() override;
/**
* Does nothing.
* Call shutdown read and write on an underlying file descriptor.
* `connection` **MUST** be an object previously obtained from **THIS** connection provider.
* @param connection
*/
void invalidateConnection(const std::shared_ptr<IOStream>& connection) override {
(void)connection;
// DO Nothing.
}
void invalidateConnection(const std::shared_ptr<IOStream>& connection) override;
/**
* Get host name.
* @return - host name.

View File

@ -334,4 +334,29 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getC
}
void SimpleTCPConnectionProvider::invalidateConnection(const std::shared_ptr<IOStream>& connection) {
/************************************************
* WARNING!!!
*
* shutdown(handle, SHUT_RDWR) <--- DO!
* close(handle); <--- DO NOT!
*
* DO NOT CLOSE file handle here -
* USE shutdown instead.
* Using close prevent FDs popping out of epoll,
* and they'll be stuck there forever.
************************************************/
auto c = std::static_pointer_cast<network::Connection>(connection);
v_io_handle handle = c->getHandle();
#if defined(WIN32) || defined(_WIN32)
shutdown(handle, SD_BOTH);
#else
shutdown(handle, SHUT_RDWR);
#endif
}
}}}

View File

@ -142,13 +142,11 @@ public:
}
/**
* Does nothing.
* Call shutdown read and write on an underlying file descriptor.
* `connection` **MUST** be an object previously obtained from **THIS** connection provider.
* @param connection
*/
void invalidateConnection(const std::shared_ptr<IOStream>& connection) override {
(void)connection;
// DO Nothing.
}
void invalidateConnection(const std::shared_ptr<IOStream>& connection) override;
/**
* Get port.

View File

@ -140,7 +140,7 @@ public:
return waitFor(std::chrono::milliseconds(100)).next(yieldTo(&ClientCoroutine::useConnection));
}
if(m_invalidate) {
m_connection->invalidate();
m_pool->invalidateConnection(m_connection);
}
return finish();
}
@ -151,7 +151,7 @@ void clientMethod(std::shared_ptr<ConnectionPool> pool, bool invalidate) {
auto connection = pool->getConnection();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if(invalidate) {
connection->invalidate();
pool->invalidateConnection(connection);
}
}