diff --git a/core/base/Config.hpp b/core/base/Config.hpp index b7a115c9..99ce140d 100644 --- a/core/base/Config.hpp +++ b/core/base/Config.hpp @@ -37,6 +37,32 @@ */ //#define OATPP_DISABLE_ENV_OBJECT_COUNTERS +/** + * Define this to disable memory-pool allocations. + * This will make oatpp::base::memory::MemoryPool, method obtain and free call new and delete directly + */ +//#define OATPP_DISABLE_POOL_ALLOCATIONS + +/** + * Predefined value for function oatpp::concurrency::Thread::getHardwareConcurrency(); + */ +//#define OATPP_THREAD_HARDWARE_CONCURRENCY 4 + +/** + * Number of shards of ThreadDistributedMemoryPool (Default pool for many oatpp objects) + * Higher number reduces threads racing for resources on each shard. + */ +#ifndef OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT + #define OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT 10 +#endif + +/** + * AsyncHttpConnectionHandler default number of threads + */ +#ifndef OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT + #define OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT 2 +#endif + /** * DISABLE logs level V */ diff --git a/core/base/StrBuffer.cpp b/core/base/StrBuffer.cpp index 19ec4ee0..b9ad2f0f 100644 --- a/core/base/StrBuffer.cpp +++ b/core/base/StrBuffer.cpp @@ -45,7 +45,12 @@ void StrBuffer::setAndCopy(const void* data, const void* originData, v_int32 siz std::shared_ptr StrBuffer::allocShared(const void* data, v_int32 size, bool copyAsOwnData) { if(copyAsOwnData) { memory::AllocationExtras extras(size + 1); - const auto& ptr = memory::allocateSharedWithExtras(extras); + std::shared_ptr ptr; + if(size > getSmStringSize()) { + ptr = memory::allocateSharedWithExtras(extras); + } else { + ptr = memory::customPoolAllocateSharedWithExtras(extras, getSmallStringPool()); + } ptr->setAndCopy(extras.extraPtr, data, size); return ptr; } diff --git a/core/base/StrBuffer.hpp b/core/base/StrBuffer.hpp index f5375ba8..4951d4fc 100644 --- a/core/base/StrBuffer.hpp +++ b/core/base/StrBuffer.hpp @@ -32,9 +32,27 @@ namespace oatpp { namespace base { -class StrBuffer : public oatpp::base::Controllable { -public: - OBJECT_POOL_THREAD_LOCAL(StrBuffer_Pool, StrBuffer, 32) +class StrBuffer : public oatpp::base::Controllable { +private: + + static constexpr v_int32 SM_STRING_POOL_ENTRY_SIZE = 256; + + static oatpp::base::memory::ThreadDistributedMemoryPool* getSmallStringPool() { + static oatpp::base::memory::ThreadDistributedMemoryPool pool("Small_String_Pool", SM_STRING_POOL_ENTRY_SIZE, 16); + return &pool; + } + + static v_int32 getSmStringBaseSize() { + memory::AllocationExtras extras(0); + auto ptr = memory::customPoolAllocateSharedWithExtras(extras, getSmallStringPool()); + return extras.baseSize; + } + + static v_int32 getSmStringSize() { + static v_int32 size = SM_STRING_POOL_ENTRY_SIZE - getSmStringBaseSize(); + return size; + } + private: p_char8 m_data; v_int32 m_size; diff --git a/core/base/memory/Allocator.hpp b/core/base/memory/Allocator.hpp index 19e24c3c..fb5a8bc6 100644 --- a/core/base/memory/Allocator.hpp +++ b/core/base/memory/Allocator.hpp @@ -160,6 +160,39 @@ public: } }; + +template +class CustomPoolSharedObjectAllocator { +public: + typedef T value_type; +public: + AllocationExtras& m_info; + P* m_pool; +public: + + CustomPoolSharedObjectAllocator(AllocationExtras& info, P* pool) + : m_info(info) + , m_pool(pool) + {}; + + template + CustomPoolSharedObjectAllocator(const CustomPoolSharedObjectAllocator& other) + : m_info(other.m_info) + , m_pool(other.m_pool) + {}; + + T* allocate(std::size_t n) { + void* mem = m_pool->obtain(); + m_info.baseSize = sizeof(T); + m_info.extraPtr = &((p_char8) mem)[sizeof(T)]; + return static_cast(mem); + } + + void deallocate(T* ptr, size_t n) { + oatpp::base::memory::MemoryPool::free(ptr); + } + +}; template inline bool operator == (const SharedObjectAllocator&, const SharedObjectAllocator&) { @@ -178,6 +211,13 @@ static std::shared_ptr allocateSharedWithExtras(AllocationExtras& extras, Arg return std::allocate_shared(allocator, args...); } +template +static std::shared_ptr customPoolAllocateSharedWithExtras(AllocationExtras& extras, P* pool, Args... args){ + typedef CustomPoolSharedObjectAllocator _Allocator; + _Allocator allocator(extras, pool); + return std::allocate_shared(allocator, args...); +} + }}} #endif /* oatpp_base_memory_Allocator_hpp */ diff --git a/core/base/memory/MemoryPool.cpp b/core/base/memory/MemoryPool.cpp index 12da04d0..a73ee582 100644 --- a/core/base/memory/MemoryPool.cpp +++ b/core/base/memory/MemoryPool.cpp @@ -24,15 +24,110 @@ #include "MemoryPool.hpp" #include "oatpp/core/utils/ConversionUtils.hpp" +#include "oatpp/core/concurrency/Thread.hpp" namespace oatpp { namespace base { namespace memory { +void MemoryPool::allocChunk() { + v_int32 entryBlockSize = sizeof(EntryHeader) + m_entrySize; + v_int32 chunkMemSize = entryBlockSize * m_chunkSize; + p_char8 mem = new v_char8[chunkMemSize]; + m_chunks.push_back(mem); + for(v_int32 i = 0; i < m_chunkSize; i++){ + EntryHeader* entry = new (mem + i * entryBlockSize) EntryHeader(this, m_id, m_rootEntry); + m_rootEntry = entry; + } +} + +void* MemoryPool::obtain() { +#ifdef OATPP_DISABLE_POOL_ALLOCATIONS + return new v_char8[m_entrySize]; +#else + oatpp::concurrency::SpinLock lock(m_atom); + if(m_rootEntry != nullptr) { + auto entry = m_rootEntry; + m_rootEntry = m_rootEntry->next; + ++ m_objectsCount; + return ((p_char8) entry) + sizeof(EntryHeader); + } else { + allocChunk(); + if(m_rootEntry == nullptr) { + throw std::runtime_error("[oatpp::base::memory::MemoryPool:obtain()]: Unable to allocate entry"); + } + auto entry = m_rootEntry; + m_rootEntry = m_rootEntry->next; + ++ m_objectsCount; + return ((p_char8) entry) + sizeof(EntryHeader); + } +#endif +} + +void* MemoryPool::obtainLockFree() { +#ifdef OATPP_DISABLE_POOL_ALLOCATIONS + return new v_char8[m_entrySize]; +#else + if(m_rootEntry != nullptr) { + auto entry = m_rootEntry; + m_rootEntry = m_rootEntry->next; + ++ m_objectsCount; + return ((p_char8) entry) + sizeof(EntryHeader); + } else { + allocChunk(); + if(m_rootEntry == nullptr) { + throw std::runtime_error("[oatpp::base::memory::MemoryPool:obtainLockFree()]: Unable to allocate entry"); + } + auto entry = m_rootEntry; + m_rootEntry = m_rootEntry->next; + ++ m_objectsCount; + return ((p_char8) entry) + sizeof(EntryHeader); + } +#endif +} + +void MemoryPool::freeByEntryHeader(EntryHeader* entry) { + if(entry->poolId == m_id) { + oatpp::concurrency::SpinLock lock(m_atom); + entry->next = m_rootEntry; + m_rootEntry = entry; + -- m_objectsCount; + } else { + throw std::runtime_error("oatpp::base::memory::MemoryPool: Invalid EntryHeader"); + } +} + +void MemoryPool::free(void* entry) { +#ifdef OATPP_DISABLE_POOL_ALLOCATIONS + delete [] ((p_char8) entry); +#else + EntryHeader* header = (EntryHeader*)(((p_char8) entry) - sizeof (EntryHeader)); + header->pool->freeByEntryHeader(header); +#endif +} + +std::string MemoryPool::getName(){ + return m_name; +} + +v_int32 MemoryPool::getEntrySize(){ + return m_entrySize; +} + +v_int64 MemoryPool::getSize(){ + return m_chunks.size() * m_chunkSize; +} + +v_int32 MemoryPool::getObjectsCount(){ + return m_objectsCount; +} + + + oatpp::concurrency::SpinLock::Atom MemoryPool::POOLS_ATOM(false); std::unordered_map MemoryPool::POOLS; std::atomic MemoryPool::poolIdCounter(0); -ThreadDistributedMemoryPool::ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize) - : m_shardsCount(10) +ThreadDistributedMemoryPool::ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize, v_int32 shardsCount) + : m_shardsCount(shardsCount) , m_shards(new MemoryPool*[m_shardsCount]) { for(v_int32 i = 0; i < m_shardsCount; i++){ diff --git a/core/base/memory/MemoryPool.hpp b/core/base/memory/MemoryPool.hpp index c4c67f01..07e765ca 100644 --- a/core/base/memory/MemoryPool.hpp +++ b/core/base/memory/MemoryPool.hpp @@ -34,8 +34,6 @@ #include //#define OATPP_DISABLE_POOL_ALLOCATIONS -//#ifndef OATPP_MEMORY_POOL_SHARDING - namespace oatpp { namespace base { namespace memory { class MemoryPool { @@ -62,18 +60,7 @@ private: }; private: - - void allocChunk() { - v_int32 entryBlockSize = sizeof(EntryHeader) + m_entrySize; - v_int32 chunkMemSize = entryBlockSize * m_chunkSize; - p_char8 mem = new v_char8[chunkMemSize]; - m_chunks.push_back(mem); - for(v_int32 i = 0; i < m_chunkSize; i++){ - EntryHeader* entry = new (mem + i * entryBlockSize) EntryHeader(this, m_id, m_rootEntry); - m_rootEntry = entry; - } - } - + void allocChunk(); private: std::string m_name; v_int32 m_entrySize; @@ -110,64 +97,16 @@ public: POOLS.erase(m_id); } - void* obtain() { -#ifdef OATPP_DISABLE_POOL_ALLOCATIONS - return new v_char8[m_entrySize]; -#else - oatpp::concurrency::SpinLock lock(m_atom); - if(m_rootEntry != nullptr) { - auto entry = m_rootEntry; - m_rootEntry = m_rootEntry->next; - ++ m_objectsCount; - return ((p_char8) entry) + sizeof(EntryHeader); - } else { - allocChunk(); - if(m_rootEntry == nullptr) { - throw std::runtime_error("oatpp::base::memory::MemoryPool: Unable to allocate entry"); - } - auto entry = m_rootEntry; - m_rootEntry = m_rootEntry->next; - ++ m_objectsCount; - return ((p_char8) entry) + sizeof(EntryHeader); - } -#endif - } + void* obtain(); + void* obtainLockFree(); - void freeByEntryHeader(EntryHeader* entry) { - if(entry->poolId == m_id) { - oatpp::concurrency::SpinLock lock(m_atom); - entry->next = m_rootEntry; - m_rootEntry = entry; - -- m_objectsCount; - } else { - throw std::runtime_error("oatpp::base::memory::MemoryPool: Invalid EntryHeader"); - } - } + void freeByEntryHeader(EntryHeader* entry); + static void free(void* entry); - static void free(void* entry) { -#ifdef OATPP_DISABLE_POOL_ALLOCATIONS - delete [] ((p_char8) entry); -#else - EntryHeader* header = (EntryHeader*)(((p_char8) entry) - sizeof (EntryHeader)); - header->pool->freeByEntryHeader(header); -#endif - } - - std::string getName(){ - return m_name; - } - - v_int32 getEntrySize(){ - return m_entrySize; - } - - v_int64 getSize(){ - return m_chunks.size() * m_chunkSize; - } - - v_int32 getObjectsCount(){ - return m_objectsCount; - } + std::string getName(); + v_int32 getEntrySize(); + v_int64 getSize(); + v_int32 getObjectsCount(); }; @@ -176,7 +115,8 @@ private: v_int32 m_shardsCount; MemoryPool** m_shards; public: - ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize); + ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize, + v_int32 shardsCount = OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT); virtual ~ThreadDistributedMemoryPool(); void* obtain(); }; diff --git a/core/concurrency/Thread.cpp b/core/concurrency/Thread.cpp index 156e8f0e..47f622c5 100644 --- a/core/concurrency/Thread.cpp +++ b/core/concurrency/Thread.cpp @@ -24,3 +24,55 @@ #include "Thread.hpp" +#if defined(_GNU_SOURCE) + #include +#endif + +namespace oatpp { namespace concurrency { + +v_int32 Thread::setThreadAffinityToOneCpu(std::thread::native_handle_type nativeHandle, v_int32 cpuIndex) { + return setThreadAffinityToCpuRange(nativeHandle, cpuIndex, cpuIndex); +} + +v_int32 Thread::setThreadAffinityToCpuRange(std::thread::native_handle_type nativeHandle, v_int32 fromCpu, v_int32 toCpu) { +#if defined(_GNU_SOURCE) + + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + + for(v_int32 i = fromCpu; i <= toCpu; i++) { + CPU_SET(i, &cpuset); + } + + v_int32 result = pthread_setaffinity_np(nativeHandle, sizeof(cpu_set_t), &cpuset); + + if (result != 0) { + OATPP_LOGD("[oatpp::concurrency::Thread::assignThreadToCpu(...)]", "error code - %d", result); + } + + return result; +#else + return -1; +#endif +} + +v_int32 Thread::calcHardwareConcurrency() { +#if !defined(OATPP_THREAD_HARDWARE_CONCURRENCY) + v_int32 concurrency = std::thread::hardware_concurrency(); + if(concurrency == 0) { + OATPP_LOGD("[oatpp::concurrency:Thread::calcHardwareConcurrency()]", "Warning - failed to get hardware_concurrency. Setting hardware_concurrency=1"); + concurrency = 1; + } + return concurrency; +#else + return OATPP_THREAD_HARDWARE_CONCURRENCY; +#endif +} + +v_int32 Thread::getHardwareConcurrency() { + static v_int32 concurrency = calcHardwareConcurrency(); + return concurrency; +} + +}} + diff --git a/core/concurrency/Thread.hpp b/core/concurrency/Thread.hpp index df1e0366..5ab5b433 100644 --- a/core/concurrency/Thread.hpp +++ b/core/concurrency/Thread.hpp @@ -28,12 +28,9 @@ #include "./Runnable.hpp" #include "oatpp/core/base/memory/ObjectPool.hpp" - #include "oatpp/core/base/Controllable.hpp" - #include -#include namespace oatpp { namespace concurrency { @@ -41,6 +38,28 @@ class Thread : public base::Controllable { public: OBJECT_POOL(Thread_Pool, Thread, 32) SHARED_OBJECT_POOL(Shared_Thread_Pool, Thread, 32) +private: + static v_int32 calcHardwareConcurrency(); +public: + + /** + * Set thread affinity one thread + */ + static v_int32 setThreadAffinityToOneCpu(std::thread::native_handle_type nativeHandle, v_int32 cpuIndex); + + /** + * Set thread affinity [fromCpu..toCpu]. + * from and to indexes included + */ + static v_int32 setThreadAffinityToCpuRange(std::thread::native_handle_type nativeHandle, v_int32 fromCpu, v_int32 toCpu); + + /** + * returns OATPP_THREAD_HARDWARE_CONCURRENCY config value if set. + * else return std::thread::hardware_concurrency() + * else return 1 + */ + static v_int32 getHardwareConcurrency(); + private: std::thread m_thread; public: @@ -65,6 +84,10 @@ public: m_thread.detach(); } + std::thread* getStdThread() { + return &m_thread; + } + }; }} diff --git a/web/protocol/http/outgoing/BufferBody.hpp b/web/protocol/http/outgoing/BufferBody.hpp index f488ccfb..f947dc8f 100644 --- a/web/protocol/http/outgoing/BufferBody.hpp +++ b/web/protocol/http/outgoing/BufferBody.hpp @@ -48,7 +48,7 @@ public: } void declareHeaders(const std::shared_ptr& headers) noexcept override { - headers->put(oatpp::web::protocol::http::Header::CONTENT_LENGTH, + headers->put(oatpp::String(oatpp::web::protocol::http::Header::CONTENT_LENGTH, false), oatpp::utils::conversion::int32ToStr(m_buffer->getSize())); } diff --git a/web/protocol/http/outgoing/CommunicationUtils.cpp b/web/protocol/http/outgoing/CommunicationUtils.cpp new file mode 100644 index 00000000..b53c2ee4 --- /dev/null +++ b/web/protocol/http/outgoing/CommunicationUtils.cpp @@ -0,0 +1,71 @@ +/*************************************************************************** + * + * Project _____ __ ____ _ _ + * ( _ ) /__\ (_ _)_| |_ _| |_ + * )(_)( /(__)\ )( (_ _)(_ _) + * (_____)(__)(__)(__) |_| |_| + * + * + * Copyright 2018-present, Leonid Stryzhevskyi, + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ + +#include "CommunicationUtils.hpp" + +namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing { + +bool CommunicationUtils::considerConnectionKeepAlive(const std::shared_ptr& request, + const std::shared_ptr& response){ + + /* Set keep-alive to value specified in the client's request, if no Connection header present in response. */ + /* Set keep-alive to value specified in response otherwise */ + if(request) { + auto& inKeepAlive = request->headers->get(String(Header::CONNECTION, false), nullptr); + + if(inKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(inKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE)) { + if(response->headers->putIfNotExists(String(Header::CONNECTION, false), inKeepAlive)){ + return true; + } else { + auto& outKeepAlive = response->headers->get(Header::CONNECTION, nullptr); + return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE)); + } + } + } + + /* If protocol == HTTP/1.1 */ + /* Set HTTP/1.1 default Connection header value (Keep-Alive), if no Connection header present in response. */ + /* Set keep-alive to value specified in response otherwise */ + String& protocol = request->startingLine->protocol; + if(protocol && oatpp::base::StrBuffer::equalsCI_FAST(protocol.get(), "HTTP/1.1")) { + if(!response->headers->putIfNotExists(String(Header::CONNECTION, false), String(Header::Value::CONNECTION_KEEP_ALIVE, false))) { + auto& outKeepAlive = response->headers->get(String(Header::CONNECTION, false), nullptr); + return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE)); + } + return true; + } + + /* If protocol != HTTP/1.1 */ + /* Set default Connection header value (Close), if no Connection header present in response. */ + /* Set keep-alive to value specified in response otherwise */ + if(!response->headers->putIfNotExists(String(Header::CONNECTION, false), String(Header::Value::CONNECTION_CLOSE, false))) { + auto& outKeepAlive = response->headers->get(String(Header::CONNECTION, false), nullptr); + return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE)); + } + + return false; + +} + +}}}}} diff --git a/web/protocol/http/outgoing/CommunicationUtils.hpp b/web/protocol/http/outgoing/CommunicationUtils.hpp new file mode 100644 index 00000000..b1872df4 --- /dev/null +++ b/web/protocol/http/outgoing/CommunicationUtils.hpp @@ -0,0 +1,49 @@ +/*************************************************************************** + * + * Project _____ __ ____ _ _ + * ( _ ) /__\ (_ _)_| |_ _| |_ + * )(_)( /(__)\ )( (_ _)(_ _) + * (_____)(__)(__)(__) |_| |_| + * + * + * Copyright 2018-present, Leonid Stryzhevskyi, + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ + +#ifndef oatpp_web_protocol_http_outgoing_CommunicationUtils_hpp +#define oatpp_web_protocol_http_outgoing_CommunicationUtils_hpp + +#include "oatpp/web/protocol/http/incoming/Request.hpp" +#include "oatpp/web/protocol/http/outgoing/Response.hpp" + +namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing { + +class CommunicationUtils { +public: + + /** + * Consider keep connection alive taking into account request headers, response headers and protocol version. + * Corresponding header will be set to response if not existed before + * return true - keep-alive + * return false - close + */ + static bool considerConnectionKeepAlive(const std::shared_ptr& request, + const std::shared_ptr& response); + +}; + +}}}}} + +#endif /* CommunicationUtils_hpp */ diff --git a/web/protocol/http/outgoing/ResponseFactory.hpp b/web/protocol/http/outgoing/ResponseFactory.hpp index 39ac66f1..92ea143b 100644 --- a/web/protocol/http/outgoing/ResponseFactory.hpp +++ b/web/protocol/http/outgoing/ResponseFactory.hpp @@ -22,8 +22,8 @@ * ***************************************************************************/ -#ifndef ResponseFactory_hpp -#define ResponseFactory_hpp +#ifndef oatpp_web_protocol_http_outgoing_ResponseFactory_hpp +#define oatpp_web_protocol_http_outgoing_ResponseFactory_hpp #include "./Response.hpp" @@ -46,4 +46,4 @@ public: }}}}} -#endif /* ResponseFactory_hpp */ +#endif /* oatpp_web_protocol_http_outgoing_ResponseFactory_hpp */ diff --git a/web/server/AsyncHttpConnectionHandler.hpp b/web/server/AsyncHttpConnectionHandler.hpp index ac36dfbb..e2f3c98f 100644 --- a/web/server/AsyncHttpConnectionHandler.hpp +++ b/web/server/AsyncHttpConnectionHandler.hpp @@ -101,11 +101,12 @@ private: v_int32 m_threadCount; std::shared_ptr* m_tasks; public: - AsyncHttpConnectionHandler(const std::shared_ptr& router) + AsyncHttpConnectionHandler(const std::shared_ptr& router, + v_int32 threadCount = OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT) : m_router(router) , m_errorHandler(handler::DefaultErrorHandler::createShared()) , m_taskBalancer(0) - , m_threadCount(2) + , m_threadCount(threadCount) { m_tasks = new std::shared_ptr[m_threadCount]; for(v_int32 i = 0; i < m_threadCount; i++) { diff --git a/web/server/HttpConnectionHandler.cpp b/web/server/HttpConnectionHandler.cpp index 6b3be170..e0e1fe65 100644 --- a/web/server/HttpConnectionHandler.cpp +++ b/web/server/HttpConnectionHandler.cpp @@ -29,26 +29,21 @@ #include "oatpp/web/protocol/http/incoming/Request.hpp" #include "oatpp/web/protocol/http/Http.hpp" -#include "oatpp/test/Checker.hpp" - -#include - namespace oatpp { namespace web { namespace server { void HttpConnectionHandler::Task::run(){ - //oatpp::test::PerformanceChecker checker("task checker"); - v_int32 bufferSize = oatpp::data::buffer::IOBuffer::BUFFER_SIZE; v_char8 buffer [bufferSize]; + auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize); auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize); bool keepAlive = true; - do { - + auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive); + if(response) { outStream->setBufferPosition(0, 0); response->send(outStream); @@ -62,7 +57,19 @@ void HttpConnectionHandler::Task::run(){ } void HttpConnectionHandler::handleConnection(const std::shared_ptr& connection){ + + /* Create working thread */ concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors)); + + /* Get hardware concurrency -1 in order to have 1cpu free of workers. */ + v_int32 concurrency = oatpp::concurrency::Thread::getHardwareConcurrency(); + if(concurrency > 1) { + concurrency -= 1; + } + + /* Set thread affinity group CPUs [0..cpu_count - 1]. Leave one cpu free of workers */ + oatpp::concurrency::Thread::setThreadAffinityToCpuRange(thread.getStdThread()->native_handle(), 0, concurrency - 1 /* -1 because 0-based index */); + thread.detach(); } diff --git a/web/server/HttpConnectionHandler.hpp b/web/server/HttpConnectionHandler.hpp index 5ea4a6fa..1341a3d7 100644 --- a/web/server/HttpConnectionHandler.hpp +++ b/web/server/HttpConnectionHandler.hpp @@ -45,6 +45,7 @@ namespace oatpp { namespace web { namespace server { class HttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler { private: + class Task : public base::Controllable, public concurrency::Runnable{ private: HttpRouter* m_router; @@ -66,7 +67,7 @@ private: static std::shared_ptr createShared(HttpRouter* router, const std::shared_ptr& connection, const std::shared_ptr& errorHandler, - HttpProcessor::RequestInterceptors* requestInterceptors){ + HttpProcessor::RequestInterceptors* requestInterceptors) { return std::make_shared(router, connection, errorHandler, requestInterceptors); } @@ -78,7 +79,6 @@ private: std::shared_ptr m_router; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors m_requestInterceptors; - public: HttpConnectionHandler(const std::shared_ptr& router) : m_router(router) diff --git a/web/server/HttpProcessor.cpp b/web/server/HttpProcessor.cpp index 32bcca61..59fbea5f 100644 --- a/web/server/HttpProcessor.cpp +++ b/web/server/HttpProcessor.cpp @@ -25,34 +25,11 @@ #include "HttpProcessor.hpp" #include "./HttpError.hpp" +#include "oatpp/web/protocol/http/outgoing/CommunicationUtils.hpp" + namespace oatpp { namespace web { namespace server { const char* HttpProcessor::RETURN_KEEP_ALIVE = "RETURN_KEEP_ALIVE"; - -bool HttpProcessor::considerConnectionKeepAlive(const std::shared_ptr& request, - const std::shared_ptr& response){ - - if(request) { - auto& inKeepAlive = request->headers->get(protocol::http::Header::CONNECTION, nullptr); - - if(inKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(inKeepAlive.get(), protocol::http::Header::Value::CONNECTION_KEEP_ALIVE)) { - if(response->headers->putIfNotExists(protocol::http::Header::CONNECTION, inKeepAlive)){ - return true; - } else { - auto& outKeepAlive = response->headers->get(protocol::http::Header::CONNECTION, nullptr); - return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), protocol::http::Header::Value::CONNECTION_KEEP_ALIVE)); - } - } - } - - if(!response->headers->putIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE)) { - auto& outKeepAlive = response->headers->get(protocol::http::Header::CONNECTION, nullptr); - return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), protocol::http::Header::Value::CONNECTION_KEEP_ALIVE)); - } - - return false; - -} std::shared_ptr HttpProcessor::processRequest(HttpRouter* router, @@ -111,10 +88,10 @@ HttpProcessor::processRequest(HttpRouter* router, return errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error"); } - response->headers->putIfNotExists(protocol::http::Header::SERVER, - protocol::http::Header::Value::SERVER); + response->headers->putIfNotExists(oatpp::String(protocol::http::Header::SERVER, false), + oatpp::String(protocol::http::Header::Value::SERVER, false)); - keepAlive = HttpProcessor::considerConnectionKeepAlive(request, response); + keepAlive = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionKeepAlive(request, response); return response; } else { @@ -201,7 +178,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() { m_currentResponse->headers->putIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER); - m_keepAlive = HttpProcessor::considerConnectionKeepAlive(m_currentRequest, m_currentResponse); + m_keepAlive = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionKeepAlive(m_currentRequest, m_currentResponse); m_outStream->setBufferPosition(0, 0); return m_currentResponse->sendAsync(this, m_outStream->flushAsync( diff --git a/web/server/HttpProcessor.hpp b/web/server/HttpProcessor.hpp index cc6cc90c..35324257 100644 --- a/web/server/HttpProcessor.hpp +++ b/web/server/HttpProcessor.hpp @@ -110,8 +110,6 @@ public: }; public: - static bool considerConnectionKeepAlive(const std::shared_ptr& request, - const std::shared_ptr& response); static std::shared_ptr processRequest(HttpRouter* router,