Refactor stream interfaces. Rename stream inline read/write data.

This commit is contained in:
lganzzzo 2020-01-03 11:01:55 +02:00
parent eb8121a8bf
commit 2cfef00127
15 changed files with 329 additions and 287 deletions

View File

@ -380,8 +380,8 @@ async::CoroutineStarter FIFOBuffer::flushToStreamAsync(const std::shared_ptr<dat
FIFOBuffer* m_fifo;
std::shared_ptr<data::stream::OutputStream> m_stream;
private:
data::stream::AsyncInlineWriteData m_data1;
data::stream::AsyncInlineWriteData m_data2;
data::stream::InlineWriteData m_data1;
data::stream::InlineWriteData m_data2;
public:
FlushCoroutine(FIFOBuffer* fifo, const std::shared_ptr<data::stream::OutputStream>& stream)

View File

@ -134,7 +134,7 @@ oatpp::async::CoroutineStarter BufferOutputStream::flushToStreamAsync(const std:
private:
std::shared_ptr<BufferOutputStream> m_this;
std::shared_ptr<oatpp::data::stream::OutputStream> m_stream;
AsyncInlineWriteData m_inlineData;
InlineWriteData m_inlineData;
public:
WriteDataCoroutine(const std::shared_ptr<BufferOutputStream>& _this,

View File

@ -248,7 +248,7 @@ oatpp::async::CoroutineStarter ChunkedBuffer::flushToStreamAsync(const std::shar
ChunkEntry* m_currEntry;
data::v_io_size m_bytesLeft;
Action m_nextAction;
data::stream::AsyncInlineWriteData m_currData;
data::stream::InlineWriteData m_currData;
bool m_needInit;
public:

View File

@ -27,6 +27,213 @@
namespace oatpp { namespace data{ namespace stream {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// WriteCallback
data::v_io_size WriteCallback::write(InlineWriteData& inlineData, async::Action& action) {
auto res = write(inlineData.currBufferPtr, inlineData.bytesLeft, action);
if(res > 0) {
inlineData.inc(res);
}
return res;
}
data::v_io_size WriteCallback::writeSimple(const void *data, v_buff_size count) {
async::Action action;
auto res = write(data, count, action);
if(!action.isNone()) {
throw std::runtime_error("[oatpp::data::stream::WriteCallback::writeSimple()]: Error. writeSimple is called on a stream in Async mode.");
}
return res;
}
data::v_io_size WriteCallback::writeExactSizeDataSimple(InlineWriteData& inlineData) {
auto initialCount = inlineData.bytesLeft;
while(inlineData.bytesLeft > 0) {
async::Action action;
auto res = write(inlineData, action);
if(!action.isNone()) {
throw std::runtime_error("[oatpp::data::stream::WriteCallback::writeExactSizeDataSimple()]: Error. writeExactSizeDataSimple() is called on a stream in Async mode.");
}
if(res == data::IOError::BROKEN_PIPE || res == data::IOError::ZERO_VALUE) {
break;
}
}
return initialCount - inlineData.bytesLeft;
}
data::v_io_size WriteCallback::writeExactSizeDataSimple(const void *data, v_buff_size count) {
InlineWriteData inlineData(data, count);
return writeExactSizeDataSimple(inlineData);
}
async::Action WriteCallback::writeExactSizeDataAsyncInline(InlineWriteData& inlineData, async::Action&& nextAction) {
if(inlineData.bytesLeft > 0) {
async::Action action;
auto res = write(inlineData, action);
if (!action.isNone()) {
return action;
}
if (res > 0) {
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
} else {
switch (res) {
case IOError::BROKEN_PIPE:
return new AsyncIOError(data::IOError::BROKEN_PIPE);
case IOError::ZERO_VALUE:
break;
case IOError::RETRY_READ:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
case IOError::RETRY_WRITE:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
default:
return new async::Error(
"[oatpp::data::stream::writeExactSizeDataAsyncInline()]: Error. Unknown IO result.");
}
}
}
return std::forward<async::Action>(nextAction);
}
async::CoroutineStarter WriteCallback::writeExactSizeDataAsync(const void* data, v_buff_size size) {
class WriteDataCoroutine : public oatpp::async::Coroutine<WriteDataCoroutine> {
private:
WriteCallback* m_this;
InlineWriteData m_inlineData;
public:
WriteDataCoroutine(WriteCallback* _this,
const void* data, v_buff_size size)
: m_this(_this)
, m_inlineData(data, size)
{}
Action act() {
return m_this->writeExactSizeDataAsyncInline(m_inlineData, finish());
}
};
return WriteDataCoroutine::start(this, data, size);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ReadCallback
data::v_io_size ReadCallback::read(InlineReadData& inlineData, async::Action& action) {
auto res = read(inlineData.currBufferPtr, inlineData.bytesLeft, action);
if(res > 0) {
inlineData.inc(res);
}
return res;
}
data::v_io_size ReadCallback::readExactSizeDataSimple(InlineReadData& inlineData) {
auto initialCount = inlineData.bytesLeft;
while(inlineData.bytesLeft > 0) {
async::Action action;
auto res = read(inlineData, action);
if(!action.isNone()) {
throw std::runtime_error("[oatpp::data::stream::ReadCallback::readExactSizeDataSimple()]: Error. readExactSizeDataSimple() is called on a stream in Async mode.");
}
if(res == data::IOError::BROKEN_PIPE || res == data::IOError::ZERO_VALUE) {
break;
}
}
return initialCount - inlineData.bytesLeft;
}
data::v_io_size ReadCallback::readExactSizeDataSimple(void *data, v_buff_size count) {
InlineReadData inlineData(data, count);
return readExactSizeDataSimple(inlineData);
}
async::Action ReadCallback::readExactSizeDataAsyncInline(InlineReadData& inlineData, async::Action&& nextAction) {
if(inlineData.bytesLeft > 0) {
async::Action action;
auto res = read(inlineData, action);
if (!action.isNone()) {
return action;
}
if (res > 0) {
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
} else {
switch (res) {
case IOError::BROKEN_PIPE:
return new AsyncIOError(data::IOError::BROKEN_PIPE);
case IOError::ZERO_VALUE:
break;
case IOError::RETRY_READ:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
case IOError::RETRY_WRITE:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
default:
return new async::Error(
"[oatpp::data::stream::readExactSizeDataAsyncInline()]: Error. Unknown IO result.");
}
}
}
return std::forward<async::Action>(nextAction);
}
async::Action ReadCallback::readSomeDataAsyncInline(InlineReadData& inlineData, async::Action&& nextAction) {
if(inlineData.bytesLeft > 0) {
async::Action action;
auto res = read(inlineData.currBufferPtr, inlineData.bytesLeft, action);
if(!action.isNone()) {
return action;
}
if(res < 0) {
switch (res) {
case IOError::BROKEN_PIPE:
return new AsyncIOError(data::IOError::BROKEN_PIPE);
// case IOError::ZERO_VALUE:
// break;
case IOError::RETRY_READ:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
case IOError::RETRY_WRITE:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
default:
return new async::Error(
"[oatpp::data::stream::readSomeDataAsyncInline()]: Error. Unknown IO result.");
}
}
}
return std::forward<async::Action>(nextAction);
}
data::v_io_size ReadCallback::readSimple(void *data, v_buff_size count) {
async::Action action;
auto res = read(data, count, action);
if(!action.isNone()) {
throw std::runtime_error("[oatpp::data::stream::ReadCallback::readSimple()]: Error. readSimple is called on a stream in Async mode.");
}
return res;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Context
@ -156,57 +363,57 @@ data::v_io_size ConsistentOutputStream::writeAsString(bool value) {
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// AsyncInlineWriteData
// InlineWriteData
AsyncInlineWriteData::AsyncInlineWriteData()
InlineWriteData::InlineWriteData()
: currBufferPtr(nullptr)
, bytesLeft(0)
{}
AsyncInlineWriteData::AsyncInlineWriteData(const void* data, v_buff_size size)
InlineWriteData::InlineWriteData(const void* data, v_buff_size size)
: currBufferPtr(data)
, bytesLeft(size)
{}
void AsyncInlineWriteData::set(const void* data, v_buff_size size) {
void InlineWriteData::set(const void* data, v_buff_size size) {
currBufferPtr = data;
bytesLeft = size;
}
void AsyncInlineWriteData::inc(v_buff_size amount) {
void InlineWriteData::inc(v_buff_size amount) {
currBufferPtr = &((p_char8) currBufferPtr)[amount];
bytesLeft -= amount;
}
void AsyncInlineWriteData::setEof() {
void InlineWriteData::setEof() {
currBufferPtr = &((p_char8) currBufferPtr)[bytesLeft];
bytesLeft = 0;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// AsyncInlineReadData
// InlineReadData
AsyncInlineReadData::AsyncInlineReadData()
InlineReadData::InlineReadData()
: currBufferPtr(nullptr)
, bytesLeft(0)
{}
AsyncInlineReadData::AsyncInlineReadData(void* data, v_buff_size size)
InlineReadData::InlineReadData(void* data, v_buff_size size)
: currBufferPtr(data)
, bytesLeft(size)
{}
void AsyncInlineReadData::set(void* data, v_buff_size size) {
void InlineReadData::set(void* data, v_buff_size size) {
currBufferPtr = data;
bytesLeft = size;
}
void AsyncInlineReadData::inc(v_buff_size amount) {
void InlineReadData::inc(v_buff_size amount) {
currBufferPtr = &((p_char8) currBufferPtr)[amount];
bytesLeft -= amount;
}
void AsyncInlineReadData::setEof() {
void InlineReadData::setEof() {
currBufferPtr = &((p_char8) currBufferPtr)[bytesLeft];
bytesLeft = 0;
}
@ -383,8 +590,8 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<ReadCallback>
v_buff_size m_desiredReadCount;
AsyncInlineReadData m_inlineReadData;
AsyncInlineWriteData m_inlineWriteData;
InlineReadData m_inlineReadData;
InlineWriteData m_inlineWriteData;
public:

View File

@ -193,7 +193,7 @@ enum IOMode : v_int32 {
/**
* Convenience structure for stream Async-Inline write operations.
*/
struct AsyncInlineWriteData {
struct InlineWriteData {
/**
* Pointer to current position in the buffer.
@ -208,14 +208,14 @@ struct AsyncInlineWriteData {
/**
* Default constructor.
*/
AsyncInlineWriteData();
InlineWriteData();
/**
* Constructor.
* @param data
* @param size
*/
AsyncInlineWriteData(const void* data, v_buff_size size);
InlineWriteData(const void* data, v_buff_size size);
/**
* Set `currBufferPtr` and `bytesLeft` values. <br>
@ -259,101 +259,17 @@ public:
*/
virtual data::v_io_size write(const void *data, v_buff_size count, async::Action& action) = 0;
data::v_io_size write(AsyncInlineWriteData& inlineData, async::Action& action) {
auto res = write(inlineData.currBufferPtr, inlineData.bytesLeft, action);
if(res > 0) {
inlineData.inc(res);
}
return res;
}
data::v_io_size write(InlineWriteData& inlineData, async::Action& action);
data::v_io_size writeSimple(const void *data, v_buff_size count) {
async::Action action;
auto res = write(data, count, action);
if(!action.isNone()) {
throw std::runtime_error("[oatpp::data::stream::WriteCallback::writeSimple()]: Error. writeSimple is called on a stream in Async mode.");
}
return res;
}
data::v_io_size writeSimple(const void *data, v_buff_size count);
data::v_io_size writeExactSizeDataSimple(AsyncInlineWriteData& inlineData) {
auto initialCount = inlineData.bytesLeft;
while(inlineData.bytesLeft > 0) {
async::Action action;
auto res = write(inlineData, action);
if(!action.isNone()) {
throw std::runtime_error("[oatpp::data::stream::WriteCallback::writeExactSizeDataSimple()]: Error. writeExactSizeDataSimple() is called on a stream in Async mode.");
}
if(res == data::IOError::BROKEN_PIPE || res == data::IOError::ZERO_VALUE) {
break;
}
}
return initialCount - inlineData.bytesLeft;
}
data::v_io_size writeExactSizeDataSimple(InlineWriteData& inlineData);
data::v_io_size writeExactSizeDataSimple(const void *data, v_buff_size count) {
AsyncInlineWriteData inlineData(data, count);
return writeExactSizeDataSimple(inlineData);
}
data::v_io_size writeExactSizeDataSimple(const void *data, v_buff_size count);
async::Action writeExactSizeDataAsyncInline(AsyncInlineWriteData& inlineData, async::Action&& nextAction) {
async::Action writeExactSizeDataAsyncInline(InlineWriteData& inlineData, async::Action&& nextAction);
if(inlineData.bytesLeft > 0) {
async::Action action;
auto res = write(inlineData, action);
if (!action.isNone()) {
return action;
}
if (res > 0) {
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
} else {
switch (res) {
case IOError::BROKEN_PIPE:
return new AsyncIOError(data::IOError::BROKEN_PIPE);
case IOError::ZERO_VALUE:
break;
case IOError::RETRY_READ:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
case IOError::RETRY_WRITE:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
default:
return new async::Error(
"[oatpp::data::stream::writeExactSizeDataAsyncInline()]: Error. Unknown IO result.");
}
}
}
return std::forward<async::Action>(nextAction);
}
async::CoroutineStarter writeExactSizeDataAsync(const void* data, v_buff_size size) {
class WriteDataCoroutine : public oatpp::async::Coroutine<WriteDataCoroutine> {
private:
WriteCallback* m_this;
AsyncInlineWriteData m_inlineData;
public:
WriteDataCoroutine(WriteCallback* _this,
const void* data, v_buff_size size)
: m_this(_this)
, m_inlineData(data, size)
{}
Action act() {
return m_this->writeExactSizeDataAsyncInline(m_inlineData, finish());
}
};
return WriteDataCoroutine::start(this, data, size);
}
async::CoroutineStarter writeExactSizeDataAsync(const void* data, v_buff_size size);
/**
* Same as `write((p_char8)data, std::strlen(data));`.
@ -418,7 +334,7 @@ public:
/**
* Convenience structure for stream Async-Inline read operations.
*/
struct AsyncInlineReadData {
struct InlineReadData {
/**
* Pointer to current position in the buffer.
@ -433,14 +349,14 @@ struct AsyncInlineReadData {
/**
* Default constructor.
*/
AsyncInlineReadData();
InlineReadData();
/**
* Constructor.
* @param data
* @param size
*/
AsyncInlineReadData(void* data, v_buff_size size);
InlineReadData(void* data, v_buff_size size);
/**
* Set `currBufferPtr` and `bytesLeft` values. <br>
@ -484,110 +400,17 @@ public:
*/
virtual data::v_io_size read(void *buffer, v_buff_size count, async::Action& action) = 0;
data::v_io_size read(AsyncInlineReadData& inlineData, async::Action& action) {
auto res = read(inlineData.currBufferPtr, inlineData.bytesLeft, action);
if(res > 0) {
inlineData.inc(res);
}
return res;
}
data::v_io_size read(InlineReadData& inlineData, async::Action& action);
data::v_io_size readExactSizeDataSimple(AsyncInlineReadData& inlineData) {
auto initialCount = inlineData.bytesLeft;
while(inlineData.bytesLeft > 0) {
async::Action action;
auto res = read(inlineData, action);
if(!action.isNone()) {
throw std::runtime_error("[oatpp::data::stream::ReadCallback::readExactSizeDataSimple()]: Error. readExactSizeDataSimple() is called on a stream in Async mode.");
}
if(res == data::IOError::BROKEN_PIPE || res == data::IOError::ZERO_VALUE) {
break;
}
}
return initialCount - inlineData.bytesLeft;
}
data::v_io_size readExactSizeDataSimple(InlineReadData& inlineData);
data::v_io_size readExactSizeDataSimple(void *data, v_buff_size count) {
AsyncInlineReadData inlineData(data, count);
return readExactSizeDataSimple(inlineData);
}
data::v_io_size readExactSizeDataSimple(void *data, v_buff_size count);
async::Action readExactSizeDataAsyncInline(AsyncInlineReadData& inlineData, async::Action&& nextAction) {
async::Action readExactSizeDataAsyncInline(InlineReadData& inlineData, async::Action&& nextAction);
if(inlineData.bytesLeft > 0) {
async::Action readSomeDataAsyncInline(InlineReadData& inlineData, async::Action&& nextAction);
async::Action action;
auto res = read(inlineData, action);
if (!action.isNone()) {
return action;
}
if (res > 0) {
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
} else {
switch (res) {
case IOError::BROKEN_PIPE:
return new AsyncIOError(data::IOError::BROKEN_PIPE);
case IOError::ZERO_VALUE:
break;
case IOError::RETRY_READ:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
case IOError::RETRY_WRITE:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
default:
return new async::Error(
"[oatpp::data::stream::readExactSizeDataAsyncInline()]: Error. Unknown IO result.");
}
}
}
return std::forward<async::Action>(nextAction);
}
async::Action readSomeDataAsyncInline(AsyncInlineReadData& inlineData, async::Action&& nextAction) {
if(inlineData.bytesLeft > 0) {
async::Action action;
auto res = read(inlineData.currBufferPtr, inlineData.bytesLeft, action);
if(!action.isNone()) {
return action;
}
if(res < 0) {
switch (res) {
case IOError::BROKEN_PIPE:
return new AsyncIOError(data::IOError::BROKEN_PIPE);
// case IOError::ZERO_VALUE:
// break;
case IOError::RETRY_READ:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
case IOError::RETRY_WRITE:
return async::Action::createActionByType(async::Action::TYPE_REPEAT);
default:
return new async::Error(
"[oatpp::data::stream::readSomeDataAsyncInline()]: Error. Unknown IO result.");
}
}
}
return std::forward<async::Action>(nextAction);
}
data::v_io_size readSimple(void *data, v_buff_size count) {
async::Action action;
auto res = read(data, count, action);
if(!action.isNone()) {
throw std::runtime_error("[oatpp::data::stream::ReadCallback::readSimple()]: Error. readSimple is called on a stream in Async mode.");
}
return res;
}
data::v_io_size readSimple(void *data, v_buff_size count);
};
@ -640,60 +463,6 @@ public:
};
class CompoundIOStream : public oatpp::base::Countable, public IOStream {
public:
OBJECT_POOL(CompoundIOStream_Pool, CompoundIOStream, 32);
SHARED_OBJECT_POOL(Shared_CompoundIOStream_Pool, CompoundIOStream, 32);
private:
std::shared_ptr<OutputStream> m_outputStream;
std::shared_ptr<InputStream> m_inputStream;
public:
CompoundIOStream(const std::shared_ptr<OutputStream>& outputStream,
const std::shared_ptr<InputStream>& inputStream)
: m_outputStream(outputStream)
, m_inputStream(inputStream)
{}
public:
static std::shared_ptr<CompoundIOStream> createShared(const std::shared_ptr<OutputStream>& outputStream,
const std::shared_ptr<InputStream>& inputStream){
return Shared_CompoundIOStream_Pool::allocateShared(outputStream, inputStream);
}
data::v_io_size write(const void *data, v_buff_size count, async::Action& action) override {
return m_outputStream->write(data, count, action);
}
data::v_io_size read(void *data, v_buff_size count, async::Action& action) override {
return m_inputStream->read(data, count, action);
}
void setOutputStreamIOMode(IOMode ioMode) override {
m_outputStream->setOutputStreamIOMode(ioMode);
}
IOMode getOutputStreamIOMode() override {
return m_outputStream->getOutputStreamIOMode();
}
Context& getOutputStreamContext() override {
return m_outputStream->getOutputStreamContext();
}
void setInputStreamIOMode(IOMode ioMode) override {
m_inputStream->setInputStreamIOMode(ioMode);
}
IOMode getInputStreamIOMode() override {
return m_inputStream->getInputStreamIOMode();
}
Context& getInputStreamContext() override {
return m_inputStream->getInputStreamContext();
}
};
/**
* Streams that guarantee data to be written in exact amount as specified in call to &l:OutputStream::write (); should extend this class.
*/

View File

@ -158,7 +158,7 @@ void Reader::readAll() {
if(res > 0) {
data::stream::AsyncInlineWriteData inlineData(buffer.getData(), res);
data::stream::InlineWriteData inlineData(buffer.getData(), res);
while(inlineData.bytesLeft > 0 && !m_parser.finished()) {
async::Action action;
m_parser.parseNext(inlineData, action);
@ -207,7 +207,7 @@ AsyncReader::AsyncReader(const std::shared_ptr<Multipart>& multipart)
data::v_io_size AsyncReader::write(const void *data, v_buff_size count, async::Action& action) {
data::stream::AsyncInlineWriteData inlineData(data, count);
data::stream::InlineWriteData inlineData(data, count);
while(inlineData.bytesLeft > 0 && !m_parser.finished() && action.isNone()) {
m_parser.parseNext(inlineData, action);
}

View File

@ -131,7 +131,7 @@ void StatefulParser::parseHeaders(Headers& headers) {
}
StatefulParser::ListenerCall StatefulParser::parseNext_Boundary(data::stream::AsyncInlineWriteData& inlineData) {
StatefulParser::ListenerCall StatefulParser::parseNext_Boundary(data::stream::InlineWriteData& inlineData) {
ListenerCall result;
p_char8 data = (p_char8)inlineData.currBufferPtr;
@ -190,7 +190,7 @@ StatefulParser::ListenerCall StatefulParser::parseNext_Boundary(data::stream::As
}
void StatefulParser::parseNext_AfterBoundary(data::stream::AsyncInlineWriteData& inlineData) {
void StatefulParser::parseNext_AfterBoundary(data::stream::InlineWriteData& inlineData) {
p_char8 data = (p_char8) inlineData.currBufferPtr;
auto size = inlineData.bytesLeft;
@ -232,7 +232,7 @@ void StatefulParser::parseNext_AfterBoundary(data::stream::AsyncInlineWriteData&
}
StatefulParser::ListenerCall StatefulParser::parseNext_Headers(data::stream::AsyncInlineWriteData& inlineData) {
StatefulParser::ListenerCall StatefulParser::parseNext_Headers(data::stream::InlineWriteData& inlineData) {
ListenerCall result;
@ -276,7 +276,7 @@ StatefulParser::ListenerCall StatefulParser::parseNext_Headers(data::stream::Asy
}
StatefulParser::ListenerCall StatefulParser::parseNext_Data(data::stream::AsyncInlineWriteData& inlineData) {
StatefulParser::ListenerCall StatefulParser::parseNext_Data(data::stream::InlineWriteData& inlineData) {
ListenerCall result;
@ -309,7 +309,7 @@ StatefulParser::ListenerCall StatefulParser::parseNext_Data(data::stream::AsyncI
}
void StatefulParser::parseNext(data::stream::AsyncInlineWriteData& inlineData, async::Action& action) {
void StatefulParser::parseNext(data::stream::InlineWriteData& inlineData, async::Action& action) {
while(inlineData.bytesLeft > 0) {

View File

@ -191,10 +191,10 @@ private:
private:
ListenerCall parseNext_Boundary(data::stream::AsyncInlineWriteData& inlineData);
void parseNext_AfterBoundary(data::stream::AsyncInlineWriteData& inlineData);
ListenerCall parseNext_Headers(data::stream::AsyncInlineWriteData& inlineData);
ListenerCall parseNext_Data(data::stream::AsyncInlineWriteData& inlineData);
ListenerCall parseNext_Boundary(data::stream::InlineWriteData& inlineData);
void parseNext_AfterBoundary(data::stream::InlineWriteData& inlineData);
ListenerCall parseNext_Headers(data::stream::InlineWriteData& inlineData);
ListenerCall parseNext_Data(data::stream::InlineWriteData& inlineData);
public:
@ -213,7 +213,7 @@ public:
* @param inlineData - inline data.
* @param action - Async Action in case Async Listener was provided in constructor.
*/
void parseNext(data::stream::AsyncInlineWriteData& inlineData, async::Action& action);
void parseNext(data::stream::InlineWriteData& inlineData, async::Action& action);
/**
* Check if parser done parsing data.

View File

@ -78,7 +78,7 @@ public:
private:
std::shared_ptr<BufferBody> m_body;
std::shared_ptr<OutputStream> m_stream;
oatpp::data::stream::AsyncInlineWriteData m_inlineData;
oatpp::data::stream::InlineWriteData m_inlineData;
public:
/**

View File

@ -96,9 +96,9 @@ oatpp::async::CoroutineStarter ChunkedBody::writeToStreamAsync(const std::shared
// private:
// oatpp::String m_chunkSizeHex;
// private:
// data::stream::AsyncInlineReadData m_inlineReadData;
// data::stream::AsyncInlineWriteData m_inlineWriteData;
// data::stream::AsyncInlineWriteData m_chunkSizeWriteData;
// data::stream::InlineReadData m_inlineReadData;
// data::stream::InlineWriteData m_inlineWriteData;
// data::stream::InlineWriteData m_chunkSizeWriteData;
// public:
//
// WriteCoroutine(const std::shared_ptr<ChunkedBody>& body,

View File

@ -153,12 +153,12 @@ data::v_io_size MultipartBody::AsyncMultipartReadCallback::read(void *buffer, v_
// class ReadCoroutine : public oatpp::async::Coroutine<ReadCoroutine> {
// private:
// AsyncMultipartReadCallback* m_this;
// oatpp::data::stream::AsyncInlineReadData* m_inlineData;
// oatpp::data::stream::InlineReadData* m_inlineData;
// data::v_io_size m_readResult;
// data::v_io_size m_bodyReadPosition0;
// public:
//
// ReadCoroutine(AsyncMultipartReadCallback* _this, oatpp::data::stream::AsyncInlineReadData* inlineData)
// ReadCoroutine(AsyncMultipartReadCallback* _this, oatpp::data::stream::InlineReadData* inlineData)
// : m_this(_this)
// , m_inlineData(inlineData)
// {}

View File

@ -100,7 +100,7 @@ private:
v_int32 m_state;
data::stream::BufferInputStream m_readStream;
data::v_io_size m_wantToRead;
data::stream::AsyncInlineReadData m_inlineData;
data::stream::InlineReadData m_inlineData;
data::buffer::IOBuffer m_buffer;
public:

View File

@ -108,4 +108,27 @@ data::stream::Context& ChunkedDecodingStream::getInputStreamContext() {
return m_baseStream->getInputStreamContext();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ChunkedEncodingStream
ChunkedEncodingStream::ChunkedEncodingStream(const base::ObjectHandle<data::stream::InputStream>& baseStream)
: m_baseStream(baseStream)
{}
data::v_io_size ChunkedEncodingStream::read(void *buffer, v_buff_size count, async::Action& action) {
}
void ChunkedEncodingStream::setInputStreamIOMode(data::stream::IOMode ioMode) {
m_baseStream->setInputStreamIOMode(ioMode);
}
data::stream::IOMode ChunkedEncodingStream::getInputStreamIOMode() {
return m_baseStream->getInputStreamIOMode();
}
data::stream::Context& ChunkedEncodingStream::getInputStreamContext() {
return m_baseStream->getInputStreamContext();
}
}}}}}

View File

@ -80,6 +80,49 @@ public:
};
/**
* Stream to chunked-encode data.
*/
class ChunkedEncodingStream : public data::stream::InputStream {
private:
base::ObjectHandle<data::stream::InputStream> m_baseStream;
public:
/**
* Constructor.
* @param baseStream - chunked encoded stream.
*/
ChunkedEncodingStream(const base::ObjectHandle<data::stream::InputStream>& baseStream);
/**
* Read encoded bytes.
* @param buffer
* @param count
* @param action
* @return
*/
data::v_io_size read(void *buffer, v_buff_size count, async::Action& action) override;
/**
* Set stream I/O mode.
* @throws
*/
void setInputStreamIOMode(data::stream::IOMode ioMode) override;
/**
* Get stream I/O mode.
* @return
*/
data::stream::IOMode getInputStreamIOMode() override;
/**
* Get stream context.
* @return - &l:Context;.
*/
data::stream::Context& getInputStreamContext() override;
};
}}}}}
#endif // oatpp_web_protocol_http_stream_ChunkedStream_hpp

View File

@ -67,7 +67,7 @@ namespace {
std::unique_ptr<v_char8> buffer(new v_char8[step]);
data::v_io_size size;
while((size = stream.readSimple(buffer.get(), step)) != 0) {
oatpp::data::stream::AsyncInlineWriteData inlineData(buffer.get(), size);
oatpp::data::stream::InlineWriteData inlineData(buffer.get(), size);
while(inlineData.bytesLeft > 0 && !parser.finished()) {
oatpp::async::Action action;
parser.parseNext(inlineData, action);