Merge pull request #278 from oatpp/optimize_buffer_output_stream

oatpp::data::stream::BufferOutputStream. Optimize growth.
This commit is contained in:
Leonid Stryzhevskyi 2020-07-19 03:46:12 +03:00 committed by GitHub
commit 4098c443ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 137 additions and 43 deletions

View File

@ -118,6 +118,8 @@ add_library(oatpp
oatpp/core/parser/Caret.hpp
oatpp/core/parser/ParsingError.cpp
oatpp/core/parser/ParsingError.hpp
oatpp/core/utils/Binary.cpp
oatpp/core/utils/Binary.hpp
oatpp/core/utils/ConversionUtils.cpp
oatpp/core/utils/ConversionUtils.hpp
oatpp/core/utils/Random.cpp

View File

@ -24,6 +24,8 @@
#include "BufferStream.hpp"
#include "oatpp/core/utils/Binary.hpp"
namespace oatpp { namespace data{ namespace stream {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -31,11 +33,11 @@ namespace oatpp { namespace data{ namespace stream {
data::stream::DefaultInitializedContext BufferOutputStream::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_INFINITE);
BufferOutputStream::BufferOutputStream(v_buff_size initialCapacity, v_buff_size growBytes)
BufferOutputStream::BufferOutputStream(v_buff_size initialCapacity)
: m_data(new v_char8[initialCapacity])
, m_capacity(initialCapacity)
, m_position(0)
, m_growBytes(growBytes)
, m_maxCapacity(-1)
, m_ioMode(IOMode::ASYNCHRONOUS)
{}
@ -70,20 +72,20 @@ Context& BufferOutputStream::getOutputStreamContext() {
void BufferOutputStream::reserveBytesUpfront(v_buff_size count) {
if(m_position + count > m_capacity) {
v_buff_size capacityNeeded = m_position + count;
if(m_growBytes <= 0) {
throw std::runtime_error("[oatpp::data::stream::BufferOutputStream::reserveBytesUpfront()]: Error. Buffer was not allowed to grow.");
if(capacityNeeded > m_capacity) {
v_buff_size newCapacity = utils::Binary::nextP2(capacityNeeded);
if(newCapacity < 0 || (m_maxCapacity > 0 && newCapacity > m_maxCapacity)) {
newCapacity = m_maxCapacity;
}
v_buff_size extraNeeded = m_position + count - m_capacity;
v_buff_size extraChunks = extraNeeded / m_growBytes;
if(extraChunks * m_growBytes < extraNeeded) {
extraChunks ++;
if(newCapacity < capacityNeeded) {
throw std::runtime_error("[oatpp::data::stream::BufferOutputStream::reserveBytesUpfront()]: Error. Unable to allocate requested memory.");
}
v_buff_size newCapacity = m_capacity + extraChunks * m_growBytes;
p_char8 newData = new v_char8[newCapacity];
std::memcpy(newData, m_data, m_position);

View File

@ -39,7 +39,7 @@ private:
p_char8 m_data;
v_buff_size m_capacity;
v_buff_size m_position;
v_buff_size m_growBytes;
v_buff_size m_maxCapacity;
IOMode m_ioMode;
public:
@ -47,7 +47,7 @@ public:
* Constructor.
* @param growBytes
*/
BufferOutputStream(v_buff_size initialCapacity = 2048, v_buff_size growBytes = 2048);
BufferOutputStream(v_buff_size initialCapacity = 2048);
/**
* Virtual destructor.

View File

@ -0,0 +1,41 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "Binary.hpp"
namespace oatpp { namespace utils {
v_int64 Binary::nextP2(v_int64 v) {
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v |= v >> 32;
v++;
return v;
}
}}

View File

@ -0,0 +1,50 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_utils_Binary_hpp
#define oatpp_utils_Binary_hpp
#include "oatpp/core/base/Environment.hpp"
namespace oatpp { namespace utils {
/**
* Collection of methods for binary operations and arithmetics.
*/
class Binary {
public:
/**
* Calculate the next power of 2. <br>
* Example: <br>
* `nextP2(127) = 128`, `nextP2(1025) = 2048`.
* @return
*/
static v_int64 nextP2(v_int64 v);
};
}}
#endif // oatpp_utils_Binary_hpp

View File

@ -60,7 +60,7 @@ v_int32 EncoderChunked::iterate(data::buffer::InlineReadData& dataIn, data::buff
if(m_writeChunkHeader) {
async::Action action;
data::stream::BufferOutputStream stream(16, 16);
data::stream::BufferOutputStream stream(16);
if(!m_firstChunk) {
stream.write("\r\n", 2, action);
}
@ -88,7 +88,7 @@ v_int32 EncoderChunked::iterate(data::buffer::InlineReadData& dataIn, data::buff
if(m_writeChunkHeader){
async::Action action;
data::stream::BufferOutputStream stream(16, 16);
data::stream::BufferOutputStream stream(16);
if(!m_firstChunk) {
stream.write("\r\n", 2, action);
}
@ -115,7 +115,7 @@ v_int32 EncoderChunked::iterate(data::buffer::InlineReadData& dataIn, data::buff
// DecoderChunked
DecoderChunked::DecoderChunked()
: m_chunkHeaderBuffer(16, 16)
: m_chunkHeaderBuffer(16)
, m_currentChunkSize(-1)
, m_firstChunk(true)
, m_finished(false)

View File

@ -92,7 +92,7 @@ void Request::send(data::stream::OutputStream* stream){
m_headers.put_LockFree(Header::CONTENT_LENGTH, "0");
}
oatpp::data::stream::BufferOutputStream buffer(2048, 2048);
oatpp::data::stream::BufferOutputStream buffer(2048);
buffer.writeSimple(m_method.getData(), m_method.getSize());
buffer.writeSimple(" /", 2);

View File

@ -70,8 +70,8 @@ HttpProcessor::ProcessingResources::ProcessingResources(const std::shared_ptr<Co
const std::shared_ptr<oatpp::data::stream::IOStream>& pConnection)
: components(pComponents)
, connection(pConnection)
, headersInBuffer(components->config->headersInBufferInitial, components->config->headersInBufferGrow)
, headersOutBuffer(components->config->headersOutBufferInitial, components->config->headersOutBufferGrow)
, headersInBuffer(components->config->headersInBufferInitial)
, headersOutBuffer(components->config->headersOutBufferInitial)
, headersReader(&headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize)
, inStream(data::stream::InputStreamBufferedProxy::createShared(connection, base::StrBuffer::createShared(data::buffer::IOBuffer::BUFFER_SIZE)))
{}
@ -210,9 +210,9 @@ HttpProcessor::Coroutine::Coroutine(const std::shared_ptr<Components>& component
const std::shared_ptr<oatpp::data::stream::IOStream>& connection)
: m_components(components)
, m_connection(connection)
, m_headersInBuffer(components->config->headersInBufferInitial, components->config->headersInBufferGrow)
, m_headersInBuffer(components->config->headersInBufferInitial)
, m_headersReader(&m_headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize)
, m_headersOutBuffer(std::make_shared<oatpp::data::stream::BufferOutputStream>(components->config->headersOutBufferInitial, components->config->headersOutBufferGrow))
, m_headersOutBuffer(std::make_shared<oatpp::data::stream::BufferOutputStream>(components->config->headersOutBufferInitial))
, m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection, base::StrBuffer::createShared(data::buffer::IOBuffer::BUFFER_SIZE)))
, m_connectionState(oatpp::web::protocol::http::utils::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE)
{}

View File

@ -62,21 +62,11 @@ public:
*/
v_buff_size headersInBufferInitial = 2048;
/**
* Buffer used to read headers in request. Size of the chunk to grow.
*/
v_buff_size headersInBufferGrow = 2048;
/**
* Buffer used to write headers in response. Initial size of the buffer.
*/
v_buff_size headersOutBufferInitial = 2048;
/**
* Buffer used to write headers in response. Size of the chunk to grow.
*/
v_buff_size headersOutBufferGrow = 2048;
/**
* Size of the chunk used for iterative-read of headers.
*/

View File

@ -70,6 +70,18 @@
namespace {
v_int64 calcNextP2(v_int64 v) {
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v |= v >> 32;
v++;
return v;
}
void runTests() {
oatpp::base::Environment::printCompilationConfig();

View File

@ -26,6 +26,7 @@
#include "oatpp/core/data/stream/BufferStream.hpp"
#include "oatpp/core/utils/ConversionUtils.hpp"
#include "oatpp/core/utils/Binary.hpp"
namespace oatpp { namespace test { namespace core { namespace data { namespace stream {
@ -128,23 +129,19 @@ void BufferStreamTest::onRun() {
text = text + sample;
}
for(v_int32 incStep = 1; incStep <= 1024; incStep ++) {
BufferOutputStream stream(0);
BufferOutputStream stream(0, incStep);
for(v_int32 i = 0; i < 1024; i++ ) {
stream << sample;
for(v_int32 i = 0; i < 1024; i++ ) {
stream << sample;
OATPP_ASSERT(stream.getCapacity() >= stream.getCurrentPosition());
}
OATPP_ASSERT(text == stream.toString());
OATPP_ASSERT(stream.getCapacity() < 1024 * (10 + 1));
OATPP_ASSERT(stream.getCapacity() >= stream.getCurrentPosition());
}
OATPP_ASSERT(text == stream.toString());
OATPP_ASSERT(stream.getCapacity() == oatpp::utils::Binary::nextP2(1024 * (10)));
}
}