virtual pipe test

This commit is contained in:
lganzzzo 2018-11-15 02:36:19 +02:00
parent 7664ff6dc8
commit f132bdd33f
10 changed files with 323 additions and 90 deletions

View File

@ -263,6 +263,8 @@ if(OATPP_BUILD_TESTS)
test/parser/json/mapping/DTOMapperPerfTest.hpp
test/parser/json/mapping/DTOMapperTest.cpp
test/parser/json/mapping/DTOMapperTest.hpp
test/network/virtual_/PipeTest.cpp
test/network/virtual_/PipeTest.hpp
)
target_link_libraries(oatppAllTests PRIVATE oatpp)
set_target_properties(oatppAllTests PROPERTIES

View File

@ -27,6 +27,7 @@
namespace oatpp { namespace data{ namespace buffer {
os::io::Library::v_size FIFOBuffer::availableToRead() {
oatpp::concurrency::SpinLock lock(m_atom);
if(!m_canRead) {
return 0;
}
@ -37,6 +38,7 @@ os::io::Library::v_size FIFOBuffer::availableToRead() {
}
os::io::Library::v_size FIFOBuffer::availableToWrite() {
oatpp::concurrency::SpinLock lock(m_atom);
if(m_canRead && m_writePosition == m_readPosition) {
return 0;
}
@ -48,6 +50,8 @@ os::io::Library::v_size FIFOBuffer::availableToWrite() {
os::io::Library::v_size FIFOBuffer::read(void *data, os::io::Library::v_size count) {
oatpp::concurrency::SpinLock lock(m_atom);
if(!m_canRead) {
return 0;
}
@ -97,6 +101,8 @@ os::io::Library::v_size FIFOBuffer::read(void *data, os::io::Library::v_size cou
os::io::Library::v_size FIFOBuffer::write(const void *data, os::io::Library::v_size count) {
oatpp::concurrency::SpinLock lock(m_atom);
if(m_canRead && m_writePosition == m_readPosition) {
return 0;
}

View File

@ -26,6 +26,7 @@
#define oatpp_data_buffer_FIFOBuffer_hpp
#include "./IOBuffer.hpp"
#include "oatpp/core/concurrency/SpinLock.hpp"
#include "oatpp/core/os/io/Library.hpp"
namespace oatpp { namespace data{ namespace buffer {
@ -39,11 +40,13 @@ private:
os::io::Library::v_size m_readPosition;
os::io::Library::v_size m_writePosition;
IOBuffer m_buffer;
oatpp::concurrency::SpinLock::Atom m_atom;
public:
FIFOBuffer()
: m_canRead(false)
, m_readPosition(0)
, m_writePosition(0)
, m_atom(false)
{}
public:

View File

@ -0,0 +1,25 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "Interface.hpp"

View File

@ -0,0 +1,34 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_virtual__Interface_hpp
#define oatpp_network_virtual__Interface_hpp
namespace oatpp { namespace network { namespace virtual_ {
}}}
#endif /* oatpp_network_virtual__Interface_hpp */

View File

@ -29,41 +29,31 @@ namespace oatpp { namespace network { namespace virtual_ {
os::io::Library::v_size Pipe::Reader::read(void *data, os::io::Library::v_size count) {
Pipe& pipe = *m_pipe;
if(!pipe.m_alive) {
return oatpp::data::stream::Errors::ERROR_IO_PIPE;
}
oatpp::os::io::Library::v_size result;
if(m_nonBlocking) {
oatpp::concurrency::SpinLock spinLock(pipe.m_atom);
if(pipe.m_buffer.availableToRead() > 0) {
auto result = pipe.m_buffer.read(data, count);
pipe.m_writeCondition.notify_one();
return result;
result = pipe.m_buffer.read(data, count);
} else if(pipe.m_alive) {
result = oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY;
} else {
return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY;
result = oatpp::data::stream::Errors::ERROR_IO_PIPE;
}
} else {
std::unique_lock<std::mutex> lock(pipe.m_mutex);
while (pipe.m_buffer.availableToRead() == 0 && pipe.m_alive) {
pipe.m_conditionWrite.notify_one();
pipe.m_conditionRead.wait(lock);
}
if (pipe.m_buffer.availableToRead() > 0) {
result = pipe.m_buffer.read(data, count);
} else {
result = oatpp::data::stream::Errors::ERROR_IO_PIPE;
}
}
std::unique_lock<std::mutex> lock(pipe.m_readMutex);
pipe.m_readCondition.wait(lock, [&pipe] {return (pipe.m_buffer.availableToRead() > 0 || !pipe.m_alive);});
pipe.m_conditionWrite.notify_one();
oatpp::concurrency::SpinLock spinLock(pipe.m_atom);
if(!pipe.m_alive) {
lock.unlock();
pipe.m_writeCondition.notify_all();
pipe.m_readCondition.notify_all();
return oatpp::data::stream::Errors::ERROR_IO_PIPE;
}
if(pipe.m_buffer.availableToRead() == 0) {
return oatpp::data::stream::Errors::ERROR_IO_RETRY;
}
auto result = pipe.m_buffer.read(data, count);
lock.unlock();
pipe.m_writeCondition.notify_one();
return result;
}
@ -71,41 +61,31 @@ os::io::Library::v_size Pipe::Reader::read(void *data, os::io::Library::v_size c
os::io::Library::v_size Pipe::Writer::write(const void *data, os::io::Library::v_size count) {
Pipe& pipe = *m_pipe;
if(!pipe.m_alive) {
return oatpp::data::stream::Errors::ERROR_IO_PIPE;
}
oatpp::os::io::Library::v_size result;
if(m_nonBlocking) {
oatpp::concurrency::SpinLock spinLock(pipe.m_atom);
if(pipe.m_buffer.availableToWrite() > 0) {
auto result = pipe.m_buffer.write(data, count);
pipe.m_readCondition.notify_one();
return result;
result = pipe.m_buffer.write(data, count);
} else if(pipe.m_alive) {
result = oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY;
} else {
return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY;
result = oatpp::data::stream::Errors::ERROR_IO_PIPE;
}
} else {
std::unique_lock<std::mutex> lock(pipe.m_mutex);
while (pipe.m_buffer.availableToWrite() == 0 && pipe.m_alive) {
pipe.m_conditionRead.notify_one();
pipe.m_conditionWrite.wait(lock);
}
if (pipe.m_alive && pipe.m_buffer.availableToWrite() > 0) {
result = pipe.m_buffer.write(data, count);
} else {
result = oatpp::data::stream::Errors::ERROR_IO_PIPE;
}
}
std::unique_lock<std::mutex> lock(pipe.m_writeMutex);
pipe.m_writeCondition.wait(lock, [&pipe] {return (pipe.m_buffer.availableToWrite() > 0 || !pipe.m_alive);});
pipe.m_conditionRead.notify_one();
oatpp::concurrency::SpinLock spinLock(pipe.m_atom);
if(!pipe.m_alive) {
lock.unlock();
pipe.m_writeCondition.notify_all();
pipe.m_readCondition.notify_all();
return oatpp::data::stream::Errors::ERROR_IO_PIPE;
}
if(pipe.m_buffer.availableToWrite() == 0) {
return oatpp::data::stream::Errors::ERROR_IO_RETRY;
}
auto result = pipe.m_buffer.write(data, count);
lock.unlock();
pipe.m_readCondition.notify_one();
return result;
}

View File

@ -36,28 +36,18 @@
namespace oatpp { namespace network { namespace virtual_ {
class Pipe : public oatpp::base::Controllable {
public:
OBJECT_POOL(Pipe_Pool, Pipe, 32)
SHARED_OBJECT_POOL(Shared_Pipe_Pool, Pipe, 32)
public:
class Reader : public oatpp::base::Controllable, public oatpp::data::stream::InputStream {
public:
OBJECT_POOL(Pipe_Reader_Pool, Pipe, 32)
SHARED_OBJECT_POOL(Shared_Pipe_Reader_Pool, Reader, 32)
class Reader : public oatpp::data::stream::InputStream {
private:
std::shared_ptr<Pipe> m_pipe;
Pipe* m_pipe;
bool m_nonBlocking;
public:
Reader(const std::shared_ptr<Pipe>& pipe, bool nonBlocking = false)
Reader(Pipe* pipe, bool nonBlocking = false)
: m_pipe(pipe)
, m_nonBlocking(nonBlocking)
{}
public:
static std::shared_ptr<Reader> createShared(const std::shared_ptr<Pipe>& pipe, bool nonBlocking = false){
return Shared_Pipe_Reader_Pool::allocateShared(pipe, nonBlocking);
}
void setNonBlocking(bool nonBlocking) {
m_nonBlocking = nonBlocking;
@ -67,23 +57,16 @@ public:
};
class Writer : public oatpp::base::Controllable, public oatpp::data::stream::OutputStream {
public:
OBJECT_POOL(Pipe_Writer_Pool, Pipe, 32)
SHARED_OBJECT_POOL(Shared_Pipe_Writer_Pool, Writer, 32)
class Writer : public oatpp::data::stream::OutputStream {
private:
std::shared_ptr<Pipe> m_pipe;
Pipe* m_pipe;
bool m_nonBlocking;
public:
Writer(const std::shared_ptr<Pipe>& pipe, bool nonBlocking = false)
Writer(Pipe* pipe, bool nonBlocking = false)
: m_pipe(pipe)
, m_nonBlocking(nonBlocking)
{}
public:
static std::shared_ptr<Writer> createShared(const std::shared_ptr<Pipe>& pipe, bool nonBlocking = false){
return Shared_Pipe_Writer_Pool::allocateShared(pipe, nonBlocking);
}
void setNonBlocking(bool nonBlocking) {
m_nonBlocking = nonBlocking;
@ -94,26 +77,31 @@ public:
};
private:
oatpp::data::buffer::FIFOBuffer m_buffer;
bool m_alive;
oatpp::concurrency::SpinLock::Atom m_atom;
std::mutex m_readMutex;
std::condition_variable m_readCondition;
std::mutex m_writeMutex;
std::condition_variable m_writeCondition;
Writer m_writer;
Reader m_reader;
oatpp::data::buffer::FIFOBuffer m_buffer;
std::mutex m_mutex;
std::condition_variable m_conditionRead;
std::condition_variable m_conditionWrite;
public:
Pipe()
: m_alive(true)
, m_atom(false)
, m_writer(this)
, m_reader(this)
{}
std::shared_ptr<Reader> getReader(bool nonBlocking = false) {
return Reader::createShared(getSharedPtr<Pipe>(), nonBlocking);
static std::shared_ptr<Pipe> createShared(){
return std::make_shared<Pipe>();
}
std::shared_ptr<Writer> getWriter(bool nonBlocking = false) {
return Writer::createShared(getSharedPtr<Pipe>(), nonBlocking);
Writer* getWriter() {
return &m_writer;
}
Reader* getReader() {
return &m_reader;
}
};

View File

@ -1,4 +1,6 @@
#include "oatpp/test/network/virtual_/PipeTest.hpp"
#include "oatpp/test/parser/json/mapping/DeserializerTest.hpp"
#include "oatpp/test/parser/json/mapping/DTOMapperPerfTest.hpp"
#include "oatpp/test/parser/json/mapping/DTOMapperTest.hpp"
@ -49,6 +51,7 @@ void runTests() {
OATPP_RUN_TEST(oatpp::test::parser::json::mapping::DTOMapperTest);
OATPP_RUN_TEST(oatpp::test::encoding::Base64Test);
OATPP_RUN_TEST(oatpp::test::encoding::UnicodeTest);
OATPP_RUN_TEST(oatpp::test::network::virtual_::PipeTest);
}
}

View File

@ -0,0 +1,152 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "PipeTest.hpp"
#include "oatpp/network/virtual_/Pipe.hpp"
#include "oatpp/core/data/stream/ChunkedBuffer.hpp"
#include "oatpp/core/concurrency/Thread.hpp"
#include "oatpp/test/Checker.hpp"
#include <iostream>
namespace oatpp { namespace test { namespace network { namespace virtual_ {
namespace {
typedef oatpp::network::virtual_::Pipe Pipe;
const char* DATA_CHUNK = "<0123456789/abcdefghijklmnopqrstuvwxyz/ABCDEFGHIJKLMNOPQRSTUVWXYZ>";
const os::io::Library::v_size CHUNK_SIZE = std::strlen(DATA_CHUNK);
class WriterTask : public oatpp::concurrency::Runnable {
private:
std::shared_ptr<Pipe> m_pipe;
v_int32 m_chunksToTransfer;
os::io::Library::v_size m_position = 0;
os::io::Library::v_size m_transferedBytes = 0;
public:
WriterTask(const std::shared_ptr<Pipe>& pipe, v_int32 chunksToTransfer)
: m_pipe(pipe)
, m_chunksToTransfer(chunksToTransfer)
{}
void run() override {
while (m_transferedBytes < CHUNK_SIZE * m_chunksToTransfer) {
auto res = m_pipe->getWriter()->write(&DATA_CHUNK[m_position], CHUNK_SIZE - m_position);
if(res > 0) {
m_transferedBytes += res;
m_position += res;
if(m_position == CHUNK_SIZE) {
m_position = 0;
}
}
}
OATPP_LOGD("WriterTask", "sent %d bytes", m_transferedBytes);
}
};
class ReaderTask : public oatpp::concurrency::Runnable {
private:
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_buffer;
std::shared_ptr<Pipe> m_pipe;
v_int32 m_chunksToTransfer;
public:
ReaderTask(const std::shared_ptr<oatpp::data::stream::ChunkedBuffer> &buffer,
const std::shared_ptr<Pipe>& pipe,
v_int32 chunksToTransfer)
: m_buffer(buffer)
, m_pipe(pipe)
, m_chunksToTransfer(chunksToTransfer)
{}
void run() override {
v_char8 readBuffer[256];
while (m_buffer->getSize() < CHUNK_SIZE * m_chunksToTransfer) {
auto res = m_pipe->getReader()->read(readBuffer, 256);
if(res > 0) {
m_buffer->write(readBuffer, res);
}
}
OATPP_LOGD("ReaderTask", "sent %d bytes", m_buffer->getSize());
}
};
void runTransfer(const std::shared_ptr<Pipe>& pipe, v_int32 chunksToTransfer, bool writeNonBlock, bool readerNonBlock) {
OATPP_LOGD("transfer", "writer-nb: %d, reader-nb: %d", writeNonBlock, readerNonBlock);
auto buffer = oatpp::data::stream::ChunkedBuffer::createShared();
{
oatpp::test::PerformanceChecker timer("timer");
auto writerThread = oatpp::concurrency::Thread::createShared(std::make_shared<WriterTask>(pipe, chunksToTransfer));
auto readerThread = oatpp::concurrency::Thread::createShared(std::make_shared<ReaderTask>(buffer, pipe, chunksToTransfer));
writerThread->join();
readerThread->join();
}
OATPP_ASSERT(buffer->getSize() == chunksToTransfer * CHUNK_SIZE);
auto ruleBuffer = oatpp::data::stream::ChunkedBuffer::createShared();
for(v_int32 i = 0; i < chunksToTransfer; i ++) {
ruleBuffer->write(DATA_CHUNK, CHUNK_SIZE);
}
auto str1 = buffer->toString();
auto str2 = buffer->toString();
OATPP_ASSERT(str1 == str2);
}
}
bool PipeTest::onRun() {
auto pipe = Pipe::createShared();
v_int32 chunkCount = oatpp::data::buffer::IOBuffer::BUFFER_SIZE * 10 / CHUNK_SIZE;
runTransfer(pipe, chunkCount, false, false);
runTransfer(pipe, chunkCount, true, false);
runTransfer(pipe, chunkCount, false, true);
runTransfer(pipe, chunkCount, true, true);
return true;
}
}}}}

View File

@ -0,0 +1,40 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_test_network_virtual__PipeTest_hpp
#define oatpp_test_network_virtual__PipeTest_hpp
#include "oatpp/test/UnitTest.hpp"
namespace oatpp { namespace test { namespace network { namespace virtual_ {
class PipeTest : public UnitTest {
public:
PipeTest():UnitTest("TEST[network::virtual_::PipeTest]"){}
bool onRun() override;
};
}}}}
#endif /* oatpp_test_network_virtual__PipeTest_hpp */