mirror of
https://gitee.com/zyjblog/oatpp.git
synced 2024-12-22 22:16:37 +08:00
Merge pull request #6 from oatpp/mem_aloc_optimization
Performance optimization for linux
This commit is contained in:
commit
af7dbbb0ab
@ -37,6 +37,32 @@
|
||||
*/
|
||||
//#define OATPP_DISABLE_ENV_OBJECT_COUNTERS
|
||||
|
||||
/**
|
||||
* Define this to disable memory-pool allocations.
|
||||
* This will make oatpp::base::memory::MemoryPool, method obtain and free call new and delete directly
|
||||
*/
|
||||
//#define OATPP_DISABLE_POOL_ALLOCATIONS
|
||||
|
||||
/**
|
||||
* Predefined value for function oatpp::concurrency::Thread::getHardwareConcurrency();
|
||||
*/
|
||||
//#define OATPP_THREAD_HARDWARE_CONCURRENCY 4
|
||||
|
||||
/**
|
||||
* Number of shards of ThreadDistributedMemoryPool (Default pool for many oatpp objects)
|
||||
* Higher number reduces threads racing for resources on each shard.
|
||||
*/
|
||||
#ifndef OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT
|
||||
#define OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT 10
|
||||
#endif
|
||||
|
||||
/**
|
||||
* AsyncHttpConnectionHandler default number of threads
|
||||
*/
|
||||
#ifndef OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT
|
||||
#define OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT 2
|
||||
#endif
|
||||
|
||||
/**
|
||||
* DISABLE logs level V
|
||||
*/
|
||||
|
@ -45,7 +45,12 @@ void StrBuffer::setAndCopy(const void* data, const void* originData, v_int32 siz
|
||||
std::shared_ptr<StrBuffer> StrBuffer::allocShared(const void* data, v_int32 size, bool copyAsOwnData) {
|
||||
if(copyAsOwnData) {
|
||||
memory::AllocationExtras extras(size + 1);
|
||||
const auto& ptr = memory::allocateSharedWithExtras<StrBuffer>(extras);
|
||||
std::shared_ptr<StrBuffer> ptr;
|
||||
if(size > getSmStringSize()) {
|
||||
ptr = memory::allocateSharedWithExtras<StrBuffer>(extras);
|
||||
} else {
|
||||
ptr = memory::customPoolAllocateSharedWithExtras<StrBuffer>(extras, getSmallStringPool());
|
||||
}
|
||||
ptr->setAndCopy(extras.extraPtr, data, size);
|
||||
return ptr;
|
||||
}
|
||||
|
@ -32,9 +32,27 @@
|
||||
|
||||
namespace oatpp { namespace base {
|
||||
|
||||
class StrBuffer : public oatpp::base::Controllable {
|
||||
public:
|
||||
OBJECT_POOL_THREAD_LOCAL(StrBuffer_Pool, StrBuffer, 32)
|
||||
class StrBuffer : public oatpp::base::Controllable {
|
||||
private:
|
||||
|
||||
static constexpr v_int32 SM_STRING_POOL_ENTRY_SIZE = 256;
|
||||
|
||||
static oatpp::base::memory::ThreadDistributedMemoryPool* getSmallStringPool() {
|
||||
static oatpp::base::memory::ThreadDistributedMemoryPool pool("Small_String_Pool", SM_STRING_POOL_ENTRY_SIZE, 16);
|
||||
return &pool;
|
||||
}
|
||||
|
||||
static v_int32 getSmStringBaseSize() {
|
||||
memory::AllocationExtras extras(0);
|
||||
auto ptr = memory::customPoolAllocateSharedWithExtras<StrBuffer>(extras, getSmallStringPool());
|
||||
return extras.baseSize;
|
||||
}
|
||||
|
||||
static v_int32 getSmStringSize() {
|
||||
static v_int32 size = SM_STRING_POOL_ENTRY_SIZE - getSmStringBaseSize();
|
||||
return size;
|
||||
}
|
||||
|
||||
private:
|
||||
p_char8 m_data;
|
||||
v_int32 m_size;
|
||||
|
@ -160,6 +160,39 @@ public:
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
template<class T, class P>
|
||||
class CustomPoolSharedObjectAllocator {
|
||||
public:
|
||||
typedef T value_type;
|
||||
public:
|
||||
AllocationExtras& m_info;
|
||||
P* m_pool;
|
||||
public:
|
||||
|
||||
CustomPoolSharedObjectAllocator(AllocationExtras& info, P* pool)
|
||||
: m_info(info)
|
||||
, m_pool(pool)
|
||||
{};
|
||||
|
||||
template<typename U>
|
||||
CustomPoolSharedObjectAllocator(const CustomPoolSharedObjectAllocator<U, P>& other)
|
||||
: m_info(other.m_info)
|
||||
, m_pool(other.m_pool)
|
||||
{};
|
||||
|
||||
T* allocate(std::size_t n) {
|
||||
void* mem = m_pool->obtain();
|
||||
m_info.baseSize = sizeof(T);
|
||||
m_info.extraPtr = &((p_char8) mem)[sizeof(T)];
|
||||
return static_cast<T*>(mem);
|
||||
}
|
||||
|
||||
void deallocate(T* ptr, size_t n) {
|
||||
oatpp::base::memory::MemoryPool::free(ptr);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
template <typename T, typename U>
|
||||
inline bool operator == (const SharedObjectAllocator<T>&, const SharedObjectAllocator<U>&) {
|
||||
@ -178,6 +211,13 @@ static std::shared_ptr<T> allocateSharedWithExtras(AllocationExtras& extras, Arg
|
||||
return std::allocate_shared<T, _Allocator>(allocator, args...);
|
||||
}
|
||||
|
||||
template<typename T, typename P, typename ... Args>
|
||||
static std::shared_ptr<T> customPoolAllocateSharedWithExtras(AllocationExtras& extras, P* pool, Args... args){
|
||||
typedef CustomPoolSharedObjectAllocator<T, P> _Allocator;
|
||||
_Allocator allocator(extras, pool);
|
||||
return std::allocate_shared<T, _Allocator>(allocator, args...);
|
||||
}
|
||||
|
||||
}}}
|
||||
|
||||
#endif /* oatpp_base_memory_Allocator_hpp */
|
||||
|
@ -24,15 +24,110 @@
|
||||
|
||||
#include "MemoryPool.hpp"
|
||||
#include "oatpp/core/utils/ConversionUtils.hpp"
|
||||
#include "oatpp/core/concurrency/Thread.hpp"
|
||||
|
||||
namespace oatpp { namespace base { namespace memory {
|
||||
|
||||
void MemoryPool::allocChunk() {
|
||||
v_int32 entryBlockSize = sizeof(EntryHeader) + m_entrySize;
|
||||
v_int32 chunkMemSize = entryBlockSize * m_chunkSize;
|
||||
p_char8 mem = new v_char8[chunkMemSize];
|
||||
m_chunks.push_back(mem);
|
||||
for(v_int32 i = 0; i < m_chunkSize; i++){
|
||||
EntryHeader* entry = new (mem + i * entryBlockSize) EntryHeader(this, m_id, m_rootEntry);
|
||||
m_rootEntry = entry;
|
||||
}
|
||||
}
|
||||
|
||||
void* MemoryPool::obtain() {
|
||||
#ifdef OATPP_DISABLE_POOL_ALLOCATIONS
|
||||
return new v_char8[m_entrySize];
|
||||
#else
|
||||
oatpp::concurrency::SpinLock lock(m_atom);
|
||||
if(m_rootEntry != nullptr) {
|
||||
auto entry = m_rootEntry;
|
||||
m_rootEntry = m_rootEntry->next;
|
||||
++ m_objectsCount;
|
||||
return ((p_char8) entry) + sizeof(EntryHeader);
|
||||
} else {
|
||||
allocChunk();
|
||||
if(m_rootEntry == nullptr) {
|
||||
throw std::runtime_error("[oatpp::base::memory::MemoryPool:obtain()]: Unable to allocate entry");
|
||||
}
|
||||
auto entry = m_rootEntry;
|
||||
m_rootEntry = m_rootEntry->next;
|
||||
++ m_objectsCount;
|
||||
return ((p_char8) entry) + sizeof(EntryHeader);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void* MemoryPool::obtainLockFree() {
|
||||
#ifdef OATPP_DISABLE_POOL_ALLOCATIONS
|
||||
return new v_char8[m_entrySize];
|
||||
#else
|
||||
if(m_rootEntry != nullptr) {
|
||||
auto entry = m_rootEntry;
|
||||
m_rootEntry = m_rootEntry->next;
|
||||
++ m_objectsCount;
|
||||
return ((p_char8) entry) + sizeof(EntryHeader);
|
||||
} else {
|
||||
allocChunk();
|
||||
if(m_rootEntry == nullptr) {
|
||||
throw std::runtime_error("[oatpp::base::memory::MemoryPool:obtainLockFree()]: Unable to allocate entry");
|
||||
}
|
||||
auto entry = m_rootEntry;
|
||||
m_rootEntry = m_rootEntry->next;
|
||||
++ m_objectsCount;
|
||||
return ((p_char8) entry) + sizeof(EntryHeader);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void MemoryPool::freeByEntryHeader(EntryHeader* entry) {
|
||||
if(entry->poolId == m_id) {
|
||||
oatpp::concurrency::SpinLock lock(m_atom);
|
||||
entry->next = m_rootEntry;
|
||||
m_rootEntry = entry;
|
||||
-- m_objectsCount;
|
||||
} else {
|
||||
throw std::runtime_error("oatpp::base::memory::MemoryPool: Invalid EntryHeader");
|
||||
}
|
||||
}
|
||||
|
||||
void MemoryPool::free(void* entry) {
|
||||
#ifdef OATPP_DISABLE_POOL_ALLOCATIONS
|
||||
delete [] ((p_char8) entry);
|
||||
#else
|
||||
EntryHeader* header = (EntryHeader*)(((p_char8) entry) - sizeof (EntryHeader));
|
||||
header->pool->freeByEntryHeader(header);
|
||||
#endif
|
||||
}
|
||||
|
||||
std::string MemoryPool::getName(){
|
||||
return m_name;
|
||||
}
|
||||
|
||||
v_int32 MemoryPool::getEntrySize(){
|
||||
return m_entrySize;
|
||||
}
|
||||
|
||||
v_int64 MemoryPool::getSize(){
|
||||
return m_chunks.size() * m_chunkSize;
|
||||
}
|
||||
|
||||
v_int32 MemoryPool::getObjectsCount(){
|
||||
return m_objectsCount;
|
||||
}
|
||||
|
||||
|
||||
|
||||
oatpp::concurrency::SpinLock::Atom MemoryPool::POOLS_ATOM(false);
|
||||
std::unordered_map<v_int64, MemoryPool*> MemoryPool::POOLS;
|
||||
std::atomic<v_int64> MemoryPool::poolIdCounter(0);
|
||||
|
||||
ThreadDistributedMemoryPool::ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize)
|
||||
: m_shardsCount(10)
|
||||
ThreadDistributedMemoryPool::ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize, v_int32 shardsCount)
|
||||
: m_shardsCount(shardsCount)
|
||||
, m_shards(new MemoryPool*[m_shardsCount])
|
||||
{
|
||||
for(v_int32 i = 0; i < m_shardsCount; i++){
|
||||
|
@ -34,8 +34,6 @@
|
||||
#include <cstring>
|
||||
//#define OATPP_DISABLE_POOL_ALLOCATIONS
|
||||
|
||||
//#ifndef OATPP_MEMORY_POOL_SHARDING
|
||||
|
||||
namespace oatpp { namespace base { namespace memory {
|
||||
|
||||
class MemoryPool {
|
||||
@ -62,18 +60,7 @@ private:
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
void allocChunk() {
|
||||
v_int32 entryBlockSize = sizeof(EntryHeader) + m_entrySize;
|
||||
v_int32 chunkMemSize = entryBlockSize * m_chunkSize;
|
||||
p_char8 mem = new v_char8[chunkMemSize];
|
||||
m_chunks.push_back(mem);
|
||||
for(v_int32 i = 0; i < m_chunkSize; i++){
|
||||
EntryHeader* entry = new (mem + i * entryBlockSize) EntryHeader(this, m_id, m_rootEntry);
|
||||
m_rootEntry = entry;
|
||||
}
|
||||
}
|
||||
|
||||
void allocChunk();
|
||||
private:
|
||||
std::string m_name;
|
||||
v_int32 m_entrySize;
|
||||
@ -110,64 +97,16 @@ public:
|
||||
POOLS.erase(m_id);
|
||||
}
|
||||
|
||||
void* obtain() {
|
||||
#ifdef OATPP_DISABLE_POOL_ALLOCATIONS
|
||||
return new v_char8[m_entrySize];
|
||||
#else
|
||||
oatpp::concurrency::SpinLock lock(m_atom);
|
||||
if(m_rootEntry != nullptr) {
|
||||
auto entry = m_rootEntry;
|
||||
m_rootEntry = m_rootEntry->next;
|
||||
++ m_objectsCount;
|
||||
return ((p_char8) entry) + sizeof(EntryHeader);
|
||||
} else {
|
||||
allocChunk();
|
||||
if(m_rootEntry == nullptr) {
|
||||
throw std::runtime_error("oatpp::base::memory::MemoryPool: Unable to allocate entry");
|
||||
}
|
||||
auto entry = m_rootEntry;
|
||||
m_rootEntry = m_rootEntry->next;
|
||||
++ m_objectsCount;
|
||||
return ((p_char8) entry) + sizeof(EntryHeader);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
void* obtain();
|
||||
void* obtainLockFree();
|
||||
|
||||
void freeByEntryHeader(EntryHeader* entry) {
|
||||
if(entry->poolId == m_id) {
|
||||
oatpp::concurrency::SpinLock lock(m_atom);
|
||||
entry->next = m_rootEntry;
|
||||
m_rootEntry = entry;
|
||||
-- m_objectsCount;
|
||||
} else {
|
||||
throw std::runtime_error("oatpp::base::memory::MemoryPool: Invalid EntryHeader");
|
||||
}
|
||||
}
|
||||
void freeByEntryHeader(EntryHeader* entry);
|
||||
static void free(void* entry);
|
||||
|
||||
static void free(void* entry) {
|
||||
#ifdef OATPP_DISABLE_POOL_ALLOCATIONS
|
||||
delete [] ((p_char8) entry);
|
||||
#else
|
||||
EntryHeader* header = (EntryHeader*)(((p_char8) entry) - sizeof (EntryHeader));
|
||||
header->pool->freeByEntryHeader(header);
|
||||
#endif
|
||||
}
|
||||
|
||||
std::string getName(){
|
||||
return m_name;
|
||||
}
|
||||
|
||||
v_int32 getEntrySize(){
|
||||
return m_entrySize;
|
||||
}
|
||||
|
||||
v_int64 getSize(){
|
||||
return m_chunks.size() * m_chunkSize;
|
||||
}
|
||||
|
||||
v_int32 getObjectsCount(){
|
||||
return m_objectsCount;
|
||||
}
|
||||
std::string getName();
|
||||
v_int32 getEntrySize();
|
||||
v_int64 getSize();
|
||||
v_int32 getObjectsCount();
|
||||
|
||||
};
|
||||
|
||||
@ -176,7 +115,8 @@ private:
|
||||
v_int32 m_shardsCount;
|
||||
MemoryPool** m_shards;
|
||||
public:
|
||||
ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize);
|
||||
ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize,
|
||||
v_int32 shardsCount = OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT);
|
||||
virtual ~ThreadDistributedMemoryPool();
|
||||
void* obtain();
|
||||
};
|
||||
|
@ -24,3 +24,55 @@
|
||||
|
||||
#include "Thread.hpp"
|
||||
|
||||
#if defined(_GNU_SOURCE)
|
||||
#include <pthread.h>
|
||||
#endif
|
||||
|
||||
namespace oatpp { namespace concurrency {
|
||||
|
||||
v_int32 Thread::setThreadAffinityToOneCpu(std::thread::native_handle_type nativeHandle, v_int32 cpuIndex) {
|
||||
return setThreadAffinityToCpuRange(nativeHandle, cpuIndex, cpuIndex);
|
||||
}
|
||||
|
||||
v_int32 Thread::setThreadAffinityToCpuRange(std::thread::native_handle_type nativeHandle, v_int32 fromCpu, v_int32 toCpu) {
|
||||
#if defined(_GNU_SOURCE)
|
||||
|
||||
cpu_set_t cpuset;
|
||||
CPU_ZERO(&cpuset);
|
||||
|
||||
for(v_int32 i = fromCpu; i <= toCpu; i++) {
|
||||
CPU_SET(i, &cpuset);
|
||||
}
|
||||
|
||||
v_int32 result = pthread_setaffinity_np(nativeHandle, sizeof(cpu_set_t), &cpuset);
|
||||
|
||||
if (result != 0) {
|
||||
OATPP_LOGD("[oatpp::concurrency::Thread::assignThreadToCpu(...)]", "error code - %d", result);
|
||||
}
|
||||
|
||||
return result;
|
||||
#else
|
||||
return -1;
|
||||
#endif
|
||||
}
|
||||
|
||||
v_int32 Thread::calcHardwareConcurrency() {
|
||||
#if !defined(OATPP_THREAD_HARDWARE_CONCURRENCY)
|
||||
v_int32 concurrency = std::thread::hardware_concurrency();
|
||||
if(concurrency == 0) {
|
||||
OATPP_LOGD("[oatpp::concurrency:Thread::calcHardwareConcurrency()]", "Warning - failed to get hardware_concurrency. Setting hardware_concurrency=1");
|
||||
concurrency = 1;
|
||||
}
|
||||
return concurrency;
|
||||
#else
|
||||
return OATPP_THREAD_HARDWARE_CONCURRENCY;
|
||||
#endif
|
||||
}
|
||||
|
||||
v_int32 Thread::getHardwareConcurrency() {
|
||||
static v_int32 concurrency = calcHardwareConcurrency();
|
||||
return concurrency;
|
||||
}
|
||||
|
||||
}}
|
||||
|
||||
|
@ -28,12 +28,9 @@
|
||||
#include "./Runnable.hpp"
|
||||
|
||||
#include "oatpp/core/base/memory/ObjectPool.hpp"
|
||||
|
||||
#include "oatpp/core/base/Controllable.hpp"
|
||||
|
||||
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
|
||||
namespace oatpp { namespace concurrency {
|
||||
|
||||
@ -41,6 +38,28 @@ class Thread : public base::Controllable {
|
||||
public:
|
||||
OBJECT_POOL(Thread_Pool, Thread, 32)
|
||||
SHARED_OBJECT_POOL(Shared_Thread_Pool, Thread, 32)
|
||||
private:
|
||||
static v_int32 calcHardwareConcurrency();
|
||||
public:
|
||||
|
||||
/**
|
||||
* Set thread affinity one thread
|
||||
*/
|
||||
static v_int32 setThreadAffinityToOneCpu(std::thread::native_handle_type nativeHandle, v_int32 cpuIndex);
|
||||
|
||||
/**
|
||||
* Set thread affinity [fromCpu..toCpu].
|
||||
* from and to indexes included
|
||||
*/
|
||||
static v_int32 setThreadAffinityToCpuRange(std::thread::native_handle_type nativeHandle, v_int32 fromCpu, v_int32 toCpu);
|
||||
|
||||
/**
|
||||
* returns OATPP_THREAD_HARDWARE_CONCURRENCY config value if set.
|
||||
* else return std::thread::hardware_concurrency()
|
||||
* else return 1
|
||||
*/
|
||||
static v_int32 getHardwareConcurrency();
|
||||
|
||||
private:
|
||||
std::thread m_thread;
|
||||
public:
|
||||
@ -65,6 +84,10 @@ public:
|
||||
m_thread.detach();
|
||||
}
|
||||
|
||||
std::thread* getStdThread() {
|
||||
return &m_thread;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}}
|
||||
|
@ -48,7 +48,7 @@ public:
|
||||
}
|
||||
|
||||
void declareHeaders(const std::shared_ptr<Headers>& headers) noexcept override {
|
||||
headers->put(oatpp::web::protocol::http::Header::CONTENT_LENGTH,
|
||||
headers->put(oatpp::String(oatpp::web::protocol::http::Header::CONTENT_LENGTH, false),
|
||||
oatpp::utils::conversion::int32ToStr(m_buffer->getSize()));
|
||||
}
|
||||
|
||||
|
71
web/protocol/http/outgoing/CommunicationUtils.cpp
Normal file
71
web/protocol/http/outgoing/CommunicationUtils.cpp
Normal file
@ -0,0 +1,71 @@
|
||||
/***************************************************************************
|
||||
*
|
||||
* Project _____ __ ____ _ _
|
||||
* ( _ ) /__\ (_ _)_| |_ _| |_
|
||||
* )(_)( /(__)\ )( (_ _)(_ _)
|
||||
* (_____)(__)(__)(__) |_| |_|
|
||||
*
|
||||
*
|
||||
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
***************************************************************************/
|
||||
|
||||
#include "CommunicationUtils.hpp"
|
||||
|
||||
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
|
||||
|
||||
bool CommunicationUtils::considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
|
||||
const std::shared_ptr<protocol::http::outgoing::Response>& response){
|
||||
|
||||
/* Set keep-alive to value specified in the client's request, if no Connection header present in response. */
|
||||
/* Set keep-alive to value specified in response otherwise */
|
||||
if(request) {
|
||||
auto& inKeepAlive = request->headers->get(String(Header::CONNECTION, false), nullptr);
|
||||
|
||||
if(inKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(inKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE)) {
|
||||
if(response->headers->putIfNotExists(String(Header::CONNECTION, false), inKeepAlive)){
|
||||
return true;
|
||||
} else {
|
||||
auto& outKeepAlive = response->headers->get(Header::CONNECTION, nullptr);
|
||||
return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* If protocol == HTTP/1.1 */
|
||||
/* Set HTTP/1.1 default Connection header value (Keep-Alive), if no Connection header present in response. */
|
||||
/* Set keep-alive to value specified in response otherwise */
|
||||
String& protocol = request->startingLine->protocol;
|
||||
if(protocol && oatpp::base::StrBuffer::equalsCI_FAST(protocol.get(), "HTTP/1.1")) {
|
||||
if(!response->headers->putIfNotExists(String(Header::CONNECTION, false), String(Header::Value::CONNECTION_KEEP_ALIVE, false))) {
|
||||
auto& outKeepAlive = response->headers->get(String(Header::CONNECTION, false), nullptr);
|
||||
return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/* If protocol != HTTP/1.1 */
|
||||
/* Set default Connection header value (Close), if no Connection header present in response. */
|
||||
/* Set keep-alive to value specified in response otherwise */
|
||||
if(!response->headers->putIfNotExists(String(Header::CONNECTION, false), String(Header::Value::CONNECTION_CLOSE, false))) {
|
||||
auto& outKeepAlive = response->headers->get(String(Header::CONNECTION, false), nullptr);
|
||||
return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE));
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
}}}}}
|
49
web/protocol/http/outgoing/CommunicationUtils.hpp
Normal file
49
web/protocol/http/outgoing/CommunicationUtils.hpp
Normal file
@ -0,0 +1,49 @@
|
||||
/***************************************************************************
|
||||
*
|
||||
* Project _____ __ ____ _ _
|
||||
* ( _ ) /__\ (_ _)_| |_ _| |_
|
||||
* )(_)( /(__)\ )( (_ _)(_ _)
|
||||
* (_____)(__)(__)(__) |_| |_|
|
||||
*
|
||||
*
|
||||
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
***************************************************************************/
|
||||
|
||||
#ifndef oatpp_web_protocol_http_outgoing_CommunicationUtils_hpp
|
||||
#define oatpp_web_protocol_http_outgoing_CommunicationUtils_hpp
|
||||
|
||||
#include "oatpp/web/protocol/http/incoming/Request.hpp"
|
||||
#include "oatpp/web/protocol/http/outgoing/Response.hpp"
|
||||
|
||||
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
|
||||
|
||||
class CommunicationUtils {
|
||||
public:
|
||||
|
||||
/**
|
||||
* Consider keep connection alive taking into account request headers, response headers and protocol version.
|
||||
* Corresponding header will be set to response if not existed before
|
||||
* return true - keep-alive
|
||||
* return false - close
|
||||
*/
|
||||
static bool considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
|
||||
const std::shared_ptr<protocol::http::outgoing::Response>& response);
|
||||
|
||||
};
|
||||
|
||||
}}}}}
|
||||
|
||||
#endif /* CommunicationUtils_hpp */
|
@ -22,8 +22,8 @@
|
||||
*
|
||||
***************************************************************************/
|
||||
|
||||
#ifndef ResponseFactory_hpp
|
||||
#define ResponseFactory_hpp
|
||||
#ifndef oatpp_web_protocol_http_outgoing_ResponseFactory_hpp
|
||||
#define oatpp_web_protocol_http_outgoing_ResponseFactory_hpp
|
||||
|
||||
#include "./Response.hpp"
|
||||
|
||||
@ -46,4 +46,4 @@ public:
|
||||
|
||||
}}}}}
|
||||
|
||||
#endif /* ResponseFactory_hpp */
|
||||
#endif /* oatpp_web_protocol_http_outgoing_ResponseFactory_hpp */
|
||||
|
@ -101,11 +101,12 @@ private:
|
||||
v_int32 m_threadCount;
|
||||
std::shared_ptr<Task>* m_tasks;
|
||||
public:
|
||||
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router)
|
||||
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router,
|
||||
v_int32 threadCount = OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT)
|
||||
: m_router(router)
|
||||
, m_errorHandler(handler::DefaultErrorHandler::createShared())
|
||||
, m_taskBalancer(0)
|
||||
, m_threadCount(2)
|
||||
, m_threadCount(threadCount)
|
||||
{
|
||||
m_tasks = new std::shared_ptr<Task>[m_threadCount];
|
||||
for(v_int32 i = 0; i < m_threadCount; i++) {
|
||||
|
@ -29,26 +29,21 @@
|
||||
#include "oatpp/web/protocol/http/incoming/Request.hpp"
|
||||
#include "oatpp/web/protocol/http/Http.hpp"
|
||||
|
||||
#include "oatpp/test/Checker.hpp"
|
||||
|
||||
#include <errno.h>
|
||||
|
||||
namespace oatpp { namespace web { namespace server {
|
||||
|
||||
void HttpConnectionHandler::Task::run(){
|
||||
|
||||
//oatpp::test::PerformanceChecker checker("task checker");
|
||||
|
||||
v_int32 bufferSize = oatpp::data::buffer::IOBuffer::BUFFER_SIZE;
|
||||
v_char8 buffer [bufferSize];
|
||||
|
||||
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize);
|
||||
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize);
|
||||
|
||||
bool keepAlive = true;
|
||||
|
||||
do {
|
||||
|
||||
|
||||
auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
|
||||
|
||||
if(response) {
|
||||
outStream->setBufferPosition(0, 0);
|
||||
response->send(outStream);
|
||||
@ -62,7 +57,19 @@ void HttpConnectionHandler::Task::run(){
|
||||
}
|
||||
|
||||
void HttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){
|
||||
|
||||
/* Create working thread */
|
||||
concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors));
|
||||
|
||||
/* Get hardware concurrency -1 in order to have 1cpu free of workers. */
|
||||
v_int32 concurrency = oatpp::concurrency::Thread::getHardwareConcurrency();
|
||||
if(concurrency > 1) {
|
||||
concurrency -= 1;
|
||||
}
|
||||
|
||||
/* Set thread affinity group CPUs [0..cpu_count - 1]. Leave one cpu free of workers */
|
||||
oatpp::concurrency::Thread::setThreadAffinityToCpuRange(thread.getStdThread()->native_handle(), 0, concurrency - 1 /* -1 because 0-based index */);
|
||||
|
||||
thread.detach();
|
||||
}
|
||||
|
||||
|
@ -45,6 +45,7 @@ namespace oatpp { namespace web { namespace server {
|
||||
|
||||
class HttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler {
|
||||
private:
|
||||
|
||||
class Task : public base::Controllable, public concurrency::Runnable{
|
||||
private:
|
||||
HttpRouter* m_router;
|
||||
@ -66,7 +67,7 @@ private:
|
||||
static std::shared_ptr<Task> createShared(HttpRouter* router,
|
||||
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
|
||||
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
|
||||
HttpProcessor::RequestInterceptors* requestInterceptors){
|
||||
HttpProcessor::RequestInterceptors* requestInterceptors) {
|
||||
return std::make_shared<Task>(router, connection, errorHandler, requestInterceptors);
|
||||
}
|
||||
|
||||
@ -78,7 +79,6 @@ private:
|
||||
std::shared_ptr<HttpRouter> m_router;
|
||||
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
|
||||
HttpProcessor::RequestInterceptors m_requestInterceptors;
|
||||
|
||||
public:
|
||||
HttpConnectionHandler(const std::shared_ptr<HttpRouter>& router)
|
||||
: m_router(router)
|
||||
|
@ -25,34 +25,11 @@
|
||||
#include "HttpProcessor.hpp"
|
||||
#include "./HttpError.hpp"
|
||||
|
||||
#include "oatpp/web/protocol/http/outgoing/CommunicationUtils.hpp"
|
||||
|
||||
namespace oatpp { namespace web { namespace server {
|
||||
|
||||
const char* HttpProcessor::RETURN_KEEP_ALIVE = "RETURN_KEEP_ALIVE";
|
||||
|
||||
bool HttpProcessor::considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
|
||||
const std::shared_ptr<protocol::http::outgoing::Response>& response){
|
||||
|
||||
if(request) {
|
||||
auto& inKeepAlive = request->headers->get(protocol::http::Header::CONNECTION, nullptr);
|
||||
|
||||
if(inKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(inKeepAlive.get(), protocol::http::Header::Value::CONNECTION_KEEP_ALIVE)) {
|
||||
if(response->headers->putIfNotExists(protocol::http::Header::CONNECTION, inKeepAlive)){
|
||||
return true;
|
||||
} else {
|
||||
auto& outKeepAlive = response->headers->get(protocol::http::Header::CONNECTION, nullptr);
|
||||
return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), protocol::http::Header::Value::CONNECTION_KEEP_ALIVE));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(!response->headers->putIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE)) {
|
||||
auto& outKeepAlive = response->headers->get(protocol::http::Header::CONNECTION, nullptr);
|
||||
return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), protocol::http::Header::Value::CONNECTION_KEEP_ALIVE));
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
std::shared_ptr<protocol::http::outgoing::Response>
|
||||
HttpProcessor::processRequest(HttpRouter* router,
|
||||
@ -111,10 +88,10 @@ HttpProcessor::processRequest(HttpRouter* router,
|
||||
return errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error");
|
||||
}
|
||||
|
||||
response->headers->putIfNotExists(protocol::http::Header::SERVER,
|
||||
protocol::http::Header::Value::SERVER);
|
||||
response->headers->putIfNotExists(oatpp::String(protocol::http::Header::SERVER, false),
|
||||
oatpp::String(protocol::http::Header::Value::SERVER, false));
|
||||
|
||||
keepAlive = HttpProcessor::considerConnectionKeepAlive(request, response);
|
||||
keepAlive = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionKeepAlive(request, response);
|
||||
return response;
|
||||
|
||||
} else {
|
||||
@ -201,7 +178,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
|
||||
|
||||
m_currentResponse->headers->putIfNotExists(protocol::http::Header::SERVER,
|
||||
protocol::http::Header::Value::SERVER);
|
||||
m_keepAlive = HttpProcessor::considerConnectionKeepAlive(m_currentRequest, m_currentResponse);
|
||||
m_keepAlive = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionKeepAlive(m_currentRequest, m_currentResponse);
|
||||
m_outStream->setBufferPosition(0, 0);
|
||||
return m_currentResponse->sendAsync(this,
|
||||
m_outStream->flushAsync(
|
||||
|
@ -110,8 +110,6 @@ public:
|
||||
};
|
||||
|
||||
public:
|
||||
static bool considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
|
||||
const std::shared_ptr<protocol::http::outgoing::Response>& response);
|
||||
|
||||
static std::shared_ptr<protocol::http::outgoing::Response>
|
||||
processRequest(HttpRouter* router,
|
||||
|
Loading…
Reference in New Issue
Block a user