Added write-callback enabled buffered InputStream using data::buffer::FIFOBuffer

This commit is contained in:
Benedikt-Alexander Mokroß 2021-08-17 14:12:49 +02:00
parent e774517623
commit 8b57320e71
3 changed files with 264 additions and 0 deletions

View File

@ -107,6 +107,8 @@ add_library(oatpp
oatpp/core/data/stream/BufferStream.hpp oatpp/core/data/stream/BufferStream.hpp
oatpp/core/data/stream/ChunkedBuffer.cpp oatpp/core/data/stream/ChunkedBuffer.cpp
oatpp/core/data/stream/ChunkedBuffer.hpp oatpp/core/data/stream/ChunkedBuffer.hpp
oatpp/core/data/stream/FIFOStream.cpp
oatpp/core/data/stream/FIFOStream.hpp
oatpp/core/data/stream/FileStream.cpp oatpp/core/data/stream/FileStream.cpp
oatpp/core/data/stream/FileStream.hpp oatpp/core/data/stream/FileStream.hpp
oatpp/core/data/stream/Stream.cpp oatpp/core/data/stream/Stream.cpp

View File

@ -0,0 +1,118 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Benedikt-Alexander Mokroß <github@bamkrs.de>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "FIFOStream.hpp"
#include "oatpp/core/utils/Binary.hpp"
namespace oatpp { namespace data { namespace stream {
data::stream::DefaultInitializedContext FIFOInputStream::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_FINITE);
FIFOInputStream::FIFOInputStream(v_buff_size initialSize)
: m_memoryHandle(std::make_shared<std::string>(initialSize, (char)0))
, m_fifo(std::make_shared<data::buffer::FIFOBuffer>((void*)m_memoryHandle->data(), m_memoryHandle->size(), 0, 0, false)) {
}
void FIFOInputStream::reset() {
m_fifo->setBufferPosition(0, 0, false);
}
v_io_size FIFOInputStream::read(void *data, v_buff_size count, async::Action& action) {
(void) action;
return m_fifo->read(data, count);
}
void FIFOInputStream::setInputStreamIOMode(IOMode ioMode) {
m_ioMode = ioMode;
}
IOMode FIFOInputStream::getInputStreamIOMode() {
return m_ioMode;
}
Context& FIFOInputStream::getInputStreamContext() {
return DEFAULT_CONTEXT;
}
std::shared_ptr<std::string> FIFOInputStream::getDataMemoryHandle() {
return m_memoryHandle;
}
v_io_size FIFOInputStream::write(const void *data, v_buff_size count, async::Action &action) {
(void) action;
reserveBytesUpfront(count);
return m_fifo->write(data, count);
}
void FIFOInputStream::reserveBytesUpfront(v_buff_size count) {
v_buff_size capacityNeeded = m_fifo->availableToRead() + count;
if(capacityNeeded > m_fifo->getBufferSize()) {
v_buff_size newCapacity = utils::Binary::nextP2(capacityNeeded);
if(newCapacity < 0 || (m_fifo->getBufferSize() > 0 && newCapacity > m_fifo->getBufferSize())) {
newCapacity = m_fifo->getBufferSize();
}
if(newCapacity < capacityNeeded) {
throw std::runtime_error("[oatpp::data::stream::BufferOutputStream::reserveBytesUpfront()]: Error. Unable to allocate requested memory.");
}
// ToDo: In-Memory-Resize
auto newHandle = std::make_shared<std::string>(newCapacity, (char)0);
v_io_size oldSize = m_fifo->availableToRead();
m_fifo->read((void*)newHandle->data(), oldSize);
auto newFifo = std::make_shared<data::buffer::FIFOBuffer>((void*)newHandle->data(), newHandle->size(), 0, oldSize, true);
m_memoryHandle = newHandle;
m_fifo = newFifo;
}
}
v_io_size FIFOInputStream::readAndWriteToStream(data::stream::OutputStream *stream,
v_buff_size count,
async::Action &action) {
return m_fifo->readAndWriteToStream(stream, count, action);
}
v_io_size FIFOInputStream::readFromStreamAndWrite(data::stream::InputStream *stream,
v_buff_size count,
async::Action &action) {
reserveBytesUpfront(count);
return m_fifo->readFromStreamAndWrite(stream, count, action);
}
v_io_size FIFOInputStream::flushToStream(data::stream::OutputStream *stream) {
return m_fifo->flushToStream(stream);
}
async::CoroutineStarter FIFOInputStream::flushToStreamAsync(const std::shared_ptr<data::stream::OutputStream> &stream) {
return m_fifo->flushToStreamAsync(stream);
}
}}}

View File

@ -0,0 +1,144 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Benedikt-Alexander Mokroß <github@bamkrs.de>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_data_stream_FIFOStream_hpp
#define oatpp_data_stream_FIFOStream_hpp
#include "Stream.hpp"
#include "oatpp/core/data/buffer/FIFOBuffer.hpp"
namespace oatpp { namespace data { namespace stream {
/**
* FIFOInputStream
*/
class FIFOInputStream : public InputStream, public WriteCallback {
public:
static data::stream::DefaultInitializedContext DEFAULT_CONTEXT;
private:
std::shared_ptr<std::string> m_memoryHandle;
std::shared_ptr<data::buffer::FIFOBuffer> m_fifo;
IOMode m_ioMode;
public:
/**
* Constructor.
* @param data - buffer.
*/
FIFOInputStream(v_buff_size initialSize = 2048);
static std::shared_ptr<FIFOInputStream> createShared(v_buff_size initialSize = 2048) {
return std::make_shared<FIFOInputStream>(2048);
}
/**
* Same as `reset(nullptr, nullptr, 0);.`
*/
void reset();
/**
* Read data from stream. <br>
* It is a legal case if return result < count. Caller should handle this!
* *Calls to this method are always NON-BLOCKING*
* @param data - buffer to read data to.
* @param count - size of the buffer.
* @param action - async specific action. If action is NOT &id:oatpp::async::Action::TYPE_NONE;, then
* caller MUST return this action on coroutine iteration.
* @return - actual number of bytes read. 0 - designates end of the buffer.
*/
v_io_size read(void *data, v_buff_size count, async::Action& action) override;
/**
* Set stream I/O mode.
* @throws
*/
void setInputStreamIOMode(IOMode ioMode) override;
/**
* Get stream I/O mode.
* @return
*/
IOMode getInputStreamIOMode() override;
/**
* Get stream context.
* @return
*/
Context& getInputStreamContext() override;
/**
* Get data memory handle.
* @return - data memory handle.
*/
std::shared_ptr<std::string> getDataMemoryHandle();
/**
* Write operation callback.
* @param data - pointer to data.
* @param count - size of the data in bytes.
* @param action - async specific action. If action is NOT &id:oatpp::async::Action::TYPE_NONE;, then
* caller MUST return this action on coroutine iteration.
* @return - actual number of bytes written. 0 - to indicate end-of-file.
*/
v_io_size write(const void *data, v_buff_size count, async::Action &action) override;
void reserveBytesUpfront(v_buff_size count);
/**
* call read and then write bytes read to output stream
* @param stream
* @param count
* @param action
* @return [1..count], IOErrors.
*/
v_io_size readAndWriteToStream(data::stream::OutputStream* stream, v_buff_size count, async::Action& action);
/**
* call stream.read() and then write bytes read to buffer
* @param stream
* @param count
* @param action
* @return
*/
v_io_size readFromStreamAndWrite(data::stream::InputStream* stream, v_buff_size count, async::Action& action);
/**
* flush all availableToRead bytes to stream
* @param stream
* @return
*/
v_io_size flushToStream(data::stream::OutputStream* stream);
/**
* flush all availableToRead bytes to stream in asynchronous manner
* @param stream - &id:data::stream::OutputStream;.
* @return - &id:async::CoroutineStarter;.
*/
async::CoroutineStarter flushToStreamAsync(const std::shared_ptr<data::stream::OutputStream>& stream);
};
}}}
#endif // oatpp_data_stream_FIFOStream_hpp