Fix windows ci build (#178)

* log request headers reader

* Tests. Pipeline run test on real port

* Log transfer pipe in

* Log tranfer write

* disable other tests

* log connection

* More logs for connection

* Check connection stream mode before returning IOAction.

* Fix connection stream mode check on Windows.

* Tests. Fix PipelineTest.

* Tests. Remove logging, and uncomment test.
This commit is contained in:
Leonid Stryzhevskyi 2020-01-07 18:33:31 +02:00 committed by GitHub
parent 9f793a3771
commit 642ce18656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 56 additions and 141 deletions

View File

@ -42,6 +42,7 @@ data::v_io_size WriteCallback::writeSimple(const void *data, v_buff_size count)
async::Action action;
auto res = write(data, count, action);
if(!action.isNone()) {
OATPP_LOGD("[oatpp::data::stream::WriteCallback::writeSimple()]", "Error. writeSimple is called on a stream in Async mode.");
throw std::runtime_error("[oatpp::data::stream::WriteCallback::writeSimple()]: Error. writeSimple is called on a stream in Async mode.");
}
return res;

View File

@ -46,11 +46,29 @@ oatpp::data::stream::DefaultInitializedContext Connection::DEFAULT_CONTEXT(data:
Connection::Connection(data::v_io_handle handle)
: m_handle(handle)
{
#if defined(WIN32) || defined(_WIN32)
// in Windows, there is no reliable method to get if a socket is blocking or not.
// Eevery socket is created blocking in Windows so we assume this state and pray.
m_mode = data::stream::BLOCKING;
// in Windows, there is no reliable method to get if a socket is blocking or not.
// Eevery socket is created blocking in Windows so we assume this state and pray.
m_mode = data::stream::BLOCKING;
#else
auto flags = fcntl(m_handle, F_GETFL);
if (flags < 0) {
throw std::runtime_error("[oatpp::network::Connection::Connection()]: Error. Can't get socket flags.");
}
if((flags & O_NONBLOCK) > 0) {
m_mode = data::stream::IOMode::ASYNCHRONOUS;
} else {
m_mode = data::stream::IOMode::BLOCKING;
}
#endif
}
Connection::~Connection(){
@ -68,7 +86,9 @@ data::v_io_size Connection::write(const void *buff, v_buff_size count, async::Ac
auto e = WSAGetLastError();
if(e == WSAEWOULDBLOCK){
action = oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
if(m_mode == data::stream::ASYNCHRONOUS) {
action = oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
}
return data::IOError::RETRY_WRITE; // For async io. In case socket is non-blocking
} else if(e == WSAEINTR) {
return data::IOError::RETRY_WRITE;
@ -94,7 +114,9 @@ data::v_io_size Connection::write(const void *buff, v_buff_size count, async::Ac
if(result <= 0) {
auto e = errno;
if(e == EAGAIN || e == EWOULDBLOCK){
action = oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
if(m_mode == data::stream::ASYNCHRONOUS) {
action = oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
}
return data::IOError::RETRY_WRITE; // For async io. In case socket is non-blocking
} else if(e == EINTR) {
return data::IOError::RETRY_WRITE;
@ -121,7 +143,9 @@ data::v_io_size Connection::read(void *buff, v_buff_size count, async::Action& a
auto e = WSAGetLastError();
if(e == WSAEWOULDBLOCK){
action = oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ);
if(m_mode == data::stream::ASYNCHRONOUS) {
action = oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ);
}
return data::IOError::RETRY_READ; // For async io. In case socket is non-blocking
} else if(e == WSAEINTR) {
return data::IOError::RETRY_READ;
@ -133,7 +157,6 @@ data::v_io_size Connection::read(void *buff, v_buff_size count, async::Action& a
}
return result;
#else
errno = 0;
@ -143,7 +166,9 @@ data::v_io_size Connection::read(void *buff, v_buff_size count, async::Action& a
if(result <= 0) {
auto e = errno;
if(e == EAGAIN || e == EWOULDBLOCK){
action = oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ);
if(m_mode == data::stream::ASYNCHRONOUS) {
action = oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ);
}
return data::IOError::RETRY_READ; // For async io. In case socket is non-blocking
} else if(e == EINTR) {
return data::IOError::RETRY_READ;
@ -210,33 +235,12 @@ void Connection::setStreamIOMode(oatpp::data::stream::IOMode ioMode) {
}
#endif
#if defined(WIN32) || defined(_WIN32)
oatpp::data::stream::IOMode Connection::getStreamIOMode() {
return m_mode;
}
#else
oatpp::data::stream::IOMode Connection::getStreamIOMode() {
auto flags = fcntl(m_handle, F_GETFL);
if (flags < 0) {
throw std::runtime_error("[oatpp::network::Connection::getStreamIOMode()]: Error. Can't get socket flags.");
}
if((flags & O_NONBLOCK) > 0) {
return oatpp::data::stream::IOMode::ASYNCHRONOUS;
}
return oatpp::data::stream::IOMode::BLOCKING;
}
#endif
void Connection::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
setStreamIOMode(ioMode);
}
oatpp::data::stream::IOMode Connection::getOutputStreamIOMode() {
return getStreamIOMode();
return m_mode;
}
oatpp::data::stream::Context& Connection::getOutputStreamContext() {
@ -248,7 +252,7 @@ void Connection::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
}
oatpp::data::stream::IOMode Connection::getInputStreamIOMode() {
return getStreamIOMode();
return m_mode;
}
oatpp::data::stream::Context& Connection::getInputStreamContext() {

View File

@ -38,12 +38,9 @@ private:
static oatpp::data::stream::DefaultInitializedContext DEFAULT_CONTEXT;
private:
data::v_io_handle m_handle;
#if defined(WIN32) || defined(_WIN32)
oatpp::data::stream::IOMode m_mode;
#endif
data::stream::IOMode m_mode;
private:
void setStreamIOMode(oatpp::data::stream::IOMode ioMode);
oatpp::data::stream::IOMode getStreamIOMode();
public:
/**
* Constructor.

View File

@ -110,18 +110,18 @@ void runTests() {
*/
{
// oatpp::test::web::PipelineTest test_virtual(0, 3000);
// test_virtual.run();
oatpp::test::web::PipelineTest test_virtual(0, 3000);
test_virtual.run();
// oatpp::test::web::PipelineTest test_port(8000, 3000);
// test_port.run();
oatpp::test::web::PipelineTest test_port(8000, 3000);
test_port.run();
}
{
// oatpp::test::web::PipelineAsyncTest test_virtual(0, 3000);
// test_virtual.run();
oatpp::test::web::PipelineAsyncTest test_virtual(0, 3000);
test_virtual.run();
oatpp::test::web::PipelineAsyncTest test_port(8000, 3000);
test_port.run();

View File

@ -139,6 +139,7 @@ void PipelineAsyncTest::onRun() {
OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider);
auto connection = clientConnectionProvider->getConnection();
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
std::thread pipeInThread([this, connection] {
@ -158,60 +159,15 @@ void PipelineAsyncTest::onRun() {
std::thread pipeOutThread([this, connection] {
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
oatpp::data::stream::ChunkedBuffer pipelineStream;
for (v_int32 i = 0; i < m_pipelineSize; i++) {
pipelineStream << SAMPLE_OUT;
}
oatpp::String sample = SAMPLE_OUT;
oatpp::data::stream::ChunkedBuffer receiveStream;
oatpp::data::buffer::IOBuffer ioBuffer;
v_int32 retries = 0;
oatpp::data::v_io_size readResult;
while(true) {
async::Action action; // In this particular case, the action is just ignored.
readResult = connection->read(ioBuffer.getData(), ioBuffer.getSize(), action);
OATPP_LOGD("AAA", "readResult=%d", readResult);
if(readResult > 0) {
retries = 0;
receiveStream.writeSimple(ioBuffer.getData(), readResult);
} else {
retries ++;
if(retries == 50) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
auto res = oatpp::data::stream::transfer(connection.get(), &receiveStream, sample->getSize() * m_pipelineSize, ioBuffer.getData(), ioBuffer.getSize());
auto result = receiveStream.toString();
auto wantedResult = pipelineStream.toString();
if(result != wantedResult) {
//
// if(result->getSize() == wantedResult->getSize()) {
// for(v_int32 i = 0; i < result->getSize(); i++) {
// if(result->getData()[i] != wantedResult->getData()[i]) {
// OATPP_LOGD(TAG, "result='%s'", &result->getData()[i]);
// OATPP_LOGD(TAG, "wanted='%s'", &wantedResult->getData()[i]);
// OATPP_LOGD(TAG, "diff-pos=%d", i);
// break;
// }
// }
// }
OATPP_LOGD(TAG, "result-size=%d, wanted-size=%d", result->getSize(), wantedResult->getSize());
OATPP_LOGD(TAG, "last readResult=%d", readResult);
}
OATPP_ASSERT(result->getSize() == wantedResult->getSize());
OATPP_ASSERT(result->getSize() == sample->getSize() * m_pipelineSize);
//OATPP_ASSERT(result == wantedResult); // headers may come in different order on different OSs
});

View File

@ -134,6 +134,7 @@ void PipelineTest::onRun() {
OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider);
auto connection = clientConnectionProvider->getConnection();
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
std::thread pipeInThread([this, connection] {
@ -143,71 +144,28 @@ void PipelineTest::onRun() {
pipelineStream << SAMPLE_IN;
}
oatpp::data::stream::BufferInputStream inputStream(pipelineStream.toString());
auto dataToSend = pipelineStream.toString();
OATPP_LOGD(TAG, "Sending %d bytes", dataToSend->getSize());
oatpp::data::stream::BufferInputStream inputStream(dataToSend);
oatpp::data::buffer::IOBuffer ioBuffer;
oatpp::data::stream::transfer(&inputStream, connection.get(), 0, ioBuffer.getData(), ioBuffer.getSize());
auto res = oatpp::data::stream::transfer(&inputStream, connection.get(), 0, ioBuffer.getData(), ioBuffer.getSize());
});
std::thread pipeOutThread([this, connection] {
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
oatpp::data::stream::ChunkedBuffer pipelineStream;
for (v_int32 i = 0; i < m_pipelineSize; i++) {
pipelineStream << SAMPLE_OUT;
}
oatpp::String sample = SAMPLE_OUT;
oatpp::data::stream::ChunkedBuffer receiveStream;
oatpp::data::buffer::IOBuffer ioBuffer;
v_int32 retries = 0;
oatpp::data::v_io_size readResult;
while(true) {
async::Action action; // In this particular case, the action is just ignored.
readResult = connection->read(ioBuffer.getData(), ioBuffer.getSize(), action);
OATPP_LOGD("AAA", "readResult=%d", readResult);
if(readResult > 0) {
retries = 0;
receiveStream.writeSimple(ioBuffer.getData(), readResult);
} else {
retries ++;
if(retries == 50) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
auto res = oatpp::data::stream::transfer(connection.get(), &receiveStream, sample->getSize() * m_pipelineSize, ioBuffer.getData(), ioBuffer.getSize());
auto result = receiveStream.toString();
auto wantedResult = pipelineStream.toString();
if(result != wantedResult) {
// if(result->getSize() == wantedResult->getSize()) {
// for(v_int32 i = 0; i < result->getSize(); i++) {
// if(result->getData()[i] != wantedResult->getData()[i]) {
// OATPP_LOGD(TAG, "result0='%s'", result->getData());
// OATPP_LOGD(TAG, "result='%s'", &result->getData()[i]);
// OATPP_LOGD(TAG, "wanted='%s'", &wantedResult->getData()[i]);
// OATPP_LOGD(TAG, "diff-pos=%d", i);
// break;
// }
// }
// }
OATPP_LOGD(TAG, "result-size=%d, wanted-size=%d", result->getSize(), wantedResult->getSize());
OATPP_LOGD(TAG, "last readResult=%d", readResult);
}
OATPP_ASSERT(result->getSize() == wantedResult->getSize());
OATPP_ASSERT(result->getSize() == sample->getSize() * m_pipelineSize);
//OATPP_ASSERT(result == wantedResult); // headers may come in different order on different OSs
});

View File

@ -65,7 +65,6 @@ public:
std::atomic<bool> available;
ENDPOINT("GET", "/", root) {
OATPP_LOGD("AAA", "Root Called!");
return createResponse(Status::CODE_200, "Hello World!!!");
}