diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 47acf0f1..7517a799 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -107,6 +107,8 @@ add_library(oatpp oatpp/core/data/stream/BufferStream.hpp oatpp/core/data/stream/ChunkedBuffer.cpp oatpp/core/data/stream/ChunkedBuffer.hpp + oatpp/core/data/stream/FIFOStream.cpp + oatpp/core/data/stream/FIFOStream.hpp oatpp/core/data/stream/FileStream.cpp oatpp/core/data/stream/FileStream.hpp oatpp/core/data/stream/Stream.cpp diff --git a/src/oatpp/core/data/stream/FIFOStream.cpp b/src/oatpp/core/data/stream/FIFOStream.cpp new file mode 100644 index 00000000..df10ca41 --- /dev/null +++ b/src/oatpp/core/data/stream/FIFOStream.cpp @@ -0,0 +1,118 @@ +/*************************************************************************** + * + * Project _____ __ ____ _ _ + * ( _ ) /__\ (_ _)_| |_ _| |_ + * )(_)( /(__)\ )( (_ _)(_ _) + * (_____)(__)(__)(__) |_| |_| + * + * + * Copyright 2018-present, Benedikt-Alexander Mokroß + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ + + +#include "FIFOStream.hpp" +#include "oatpp/core/utils/Binary.hpp" + +namespace oatpp { namespace data { namespace stream { + +data::stream::DefaultInitializedContext FIFOInputStream::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_FINITE); + +FIFOInputStream::FIFOInputStream(v_buff_size initialSize) + : m_memoryHandle(std::make_shared(initialSize, (char)0)) + , m_fifo(std::make_shared((void*)m_memoryHandle->data(), m_memoryHandle->size(), 0, 0, false)) { + +} + +void FIFOInputStream::reset() { + m_fifo->setBufferPosition(0, 0, false); +} + +v_io_size FIFOInputStream::read(void *data, v_buff_size count, async::Action& action) { + (void) action; + return m_fifo->read(data, count); +} + +void FIFOInputStream::setInputStreamIOMode(IOMode ioMode) { + m_ioMode = ioMode; +} + +IOMode FIFOInputStream::getInputStreamIOMode() { + return m_ioMode; +} + +Context& FIFOInputStream::getInputStreamContext() { + return DEFAULT_CONTEXT; +} + +std::shared_ptr FIFOInputStream::getDataMemoryHandle() { + return m_memoryHandle; +} + +v_io_size FIFOInputStream::write(const void *data, v_buff_size count, async::Action &action) { + (void) action; + reserveBytesUpfront(count); + return m_fifo->write(data, count); +} + +void FIFOInputStream::reserveBytesUpfront(v_buff_size count) { + + v_buff_size capacityNeeded = m_fifo->availableToRead() + count; + + if(capacityNeeded > m_fifo->getBufferSize()) { + + v_buff_size newCapacity = utils::Binary::nextP2(capacityNeeded); + + if(newCapacity < 0 || (m_fifo->getBufferSize() > 0 && newCapacity > m_fifo->getBufferSize())) { + newCapacity = m_fifo->getBufferSize(); + } + + if(newCapacity < capacityNeeded) { + throw std::runtime_error("[oatpp::data::stream::BufferOutputStream::reserveBytesUpfront()]: Error. Unable to allocate requested memory."); + } + + // ToDo: In-Memory-Resize + auto newHandle = std::make_shared(newCapacity, (char)0); + v_io_size oldSize = m_fifo->availableToRead(); + m_fifo->read((void*)newHandle->data(), oldSize); + auto newFifo = std::make_shared((void*)newHandle->data(), newHandle->size(), 0, oldSize, true); + m_memoryHandle = newHandle; + m_fifo = newFifo; + } + +} + +v_io_size FIFOInputStream::readAndWriteToStream(data::stream::OutputStream *stream, + v_buff_size count, + async::Action &action) { + return m_fifo->readAndWriteToStream(stream, count, action); +} + +v_io_size FIFOInputStream::readFromStreamAndWrite(data::stream::InputStream *stream, + v_buff_size count, + async::Action &action) { + reserveBytesUpfront(count); + return m_fifo->readFromStreamAndWrite(stream, count, action); +} + +v_io_size FIFOInputStream::flushToStream(data::stream::OutputStream *stream) { + return m_fifo->flushToStream(stream); +} + +async::CoroutineStarter FIFOInputStream::flushToStreamAsync(const std::shared_ptr &stream) { + return m_fifo->flushToStreamAsync(stream); +} + +}}} \ No newline at end of file diff --git a/src/oatpp/core/data/stream/FIFOStream.hpp b/src/oatpp/core/data/stream/FIFOStream.hpp new file mode 100644 index 00000000..16aa7ea8 --- /dev/null +++ b/src/oatpp/core/data/stream/FIFOStream.hpp @@ -0,0 +1,144 @@ +/*************************************************************************** + * + * Project _____ __ ____ _ _ + * ( _ ) /__\ (_ _)_| |_ _| |_ + * )(_)( /(__)\ )( (_ _)(_ _) + * (_____)(__)(__)(__) |_| |_| + * + * + * Copyright 2018-present, Benedikt-Alexander Mokroß + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ + +#ifndef oatpp_data_stream_FIFOStream_hpp +#define oatpp_data_stream_FIFOStream_hpp + +#include "Stream.hpp" +#include "oatpp/core/data/buffer/FIFOBuffer.hpp" + +namespace oatpp { namespace data { namespace stream { + + +/** + * FIFOInputStream + */ +class FIFOInputStream : public InputStream, public WriteCallback { + public: + static data::stream::DefaultInitializedContext DEFAULT_CONTEXT; + private: + std::shared_ptr m_memoryHandle; + std::shared_ptr m_fifo; + IOMode m_ioMode; + public: + + /** + * Constructor. + * @param data - buffer. + */ + FIFOInputStream(v_buff_size initialSize = 2048); + + static std::shared_ptr createShared(v_buff_size initialSize = 2048) { + return std::make_shared(2048); + } + + /** + * Same as `reset(nullptr, nullptr, 0);.` + */ + void reset(); + + /** + * Read data from stream.
+ * It is a legal case if return result < count. Caller should handle this! + * *Calls to this method are always NON-BLOCKING* + * @param data - buffer to read data to. + * @param count - size of the buffer. + * @param action - async specific action. If action is NOT &id:oatpp::async::Action::TYPE_NONE;, then + * caller MUST return this action on coroutine iteration. + * @return - actual number of bytes read. 0 - designates end of the buffer. + */ + v_io_size read(void *data, v_buff_size count, async::Action& action) override; + + /** + * Set stream I/O mode. + * @throws + */ + void setInputStreamIOMode(IOMode ioMode) override; + + /** + * Get stream I/O mode. + * @return + */ + IOMode getInputStreamIOMode() override; + + /** + * Get stream context. + * @return + */ + Context& getInputStreamContext() override; + + /** + * Get data memory handle. + * @return - data memory handle. + */ + std::shared_ptr getDataMemoryHandle(); + + /** + * Write operation callback. + * @param data - pointer to data. + * @param count - size of the data in bytes. + * @param action - async specific action. If action is NOT &id:oatpp::async::Action::TYPE_NONE;, then + * caller MUST return this action on coroutine iteration. + * @return - actual number of bytes written. 0 - to indicate end-of-file. + */ + v_io_size write(const void *data, v_buff_size count, async::Action &action) override; + + void reserveBytesUpfront(v_buff_size count); + + /** + * call read and then write bytes read to output stream + * @param stream + * @param count + * @param action + * @return [1..count], IOErrors. + */ + v_io_size readAndWriteToStream(data::stream::OutputStream* stream, v_buff_size count, async::Action& action); + + /** + * call stream.read() and then write bytes read to buffer + * @param stream + * @param count + * @param action + * @return + */ + v_io_size readFromStreamAndWrite(data::stream::InputStream* stream, v_buff_size count, async::Action& action); + + /** + * flush all availableToRead bytes to stream + * @param stream + * @return + */ + v_io_size flushToStream(data::stream::OutputStream* stream); + + /** + * flush all availableToRead bytes to stream in asynchronous manner + * @param stream - &id:data::stream::OutputStream;. + * @return - &id:async::CoroutineStarter;. + */ + async::CoroutineStarter flushToStreamAsync(const std::shared_ptr& stream); +}; + +}}} + +#endif // oatpp_data_stream_FIFOStream_hpp