diff --git a/README.md b/README.md index 43fac17..7ceaab3 100644 --- a/README.md +++ b/README.md @@ -162,11 +162,11 @@ fields:返回指定字段值,多个字段用逗号隔开 ### 源码编译 -建议通过isearch_env镜像进行源码编译,获取isearch_env镜像的方式为: `docker pull intelligentsearch/isearch_env:2.0` +建议通过isearch_env镜像进行源码编译,获取isearch_env镜像的方式为: `docker pull intelligentsearch/isearch_env:latest` -也可以自行编译isearch_env镜像,Dockerfile文件位于dockerfiles\env目录: `docker build -t intelligentsearch/isearch_env:2.0 .` +也可以自行编译isearch_env镜像,Dockerfile文件位于dockerfiles\env目录: `docker build -t intelligentsearch/isearch_env:latest .` -然后运行容器: `docker run -itd intelligentsearch/isearch_env:2.0` +然后运行容器: `docker run -itd intelligentsearch/isearch_env:latest` 进入容器: `docker exec -it 容器id /bin/bash` diff --git a/src/search_local/index_storage/rocksdb_helper/db_main.cc b/src/search_local/index_storage/rocksdb_helper/db_main.cc index 7a2af4d..609d6f5 100644 --- a/src/search_local/index_storage/rocksdb_helper/db_main.cc +++ b/src/search_local/index_storage/rocksdb_helper/db_main.cc @@ -133,7 +133,7 @@ static int accept_connection(int fd) if (newfd < 0 && errno == EINVAL) { if (getppid() == (pid_t)1) - { // �������Ѿ��˳� + { log_error("dtc father process not exist. helper[%d] exit now.", getpid()); exit(0); } @@ -372,7 +372,6 @@ int check_db_version(void) case 2: { log_debug("no need to check rocksdb!"); - // checker guass db version break; } } diff --git a/src/search_local/index_storage/rocksdb_helper/db_process_rocks.cc b/src/search_local/index_storage/rocksdb_helper/db_process_rocks.cc index c5bc323..0f8513e 100644 --- a/src/search_local/index_storage/rocksdb_helper/db_process_rocks.cc +++ b/src/search_local/index_storage/rocksdb_helper/db_process_rocks.cc @@ -823,6 +823,11 @@ int RocksdbProcess::saveRow( if (ret != 0) { log_error("parse field value failed! compoundValue:%s", compoundValue.c_str()); + for(int i = 0; i < mTableDef->num_fields()+1; i++){ + if((*row)[i].str.ptr != NULL){ + free((*row)[i].str.ptr); + } + } delete row; return -1; } @@ -833,6 +838,11 @@ int RocksdbProcess::saveRow( ret = condition_filter(fieldValue, fieldId, mTableDef->field_type(fieldId), Condition); if (ret < 0) { + for(int i = 0; i < mTableDef->num_fields()+1; i++){ + if((*row)[i].str.ptr != NULL){ + free((*row)[i].str.ptr); + } + } delete row; log_error("string[%s] conver to value[%d] error: %d", fieldValue.c_str(), mTableDef->field_type(fieldId), ret); return (-2); @@ -840,6 +850,11 @@ int RocksdbProcess::saveRow( else if (ret == 1) { // condition is not matched + for(int i = 0; i < mTableDef->num_fields()+1; i++){ + if((*row)[i].str.ptr != NULL){ + free((*row)[i].str.ptr); + } + } delete row; return 0; } @@ -848,6 +863,11 @@ int RocksdbProcess::saveRow( ret = str2Value(fieldValue, mTableDef->field_type(fieldId), (*row)[fieldId]); if (ret < 0) { + for(int i = 0; i < mTableDef->num_fields()+1; i++){ + if((*row)[i].str.ptr != NULL){ + free((*row)[i].str.ptr); + } + } delete row; log_error("string[%s] conver to value[%d] error: %d", fieldValue.c_str(), mTableDef->field_type(fieldId), ret); return (-2); @@ -857,6 +877,11 @@ int RocksdbProcess::saveRow( // Task->update_key(row); ret = Task->append_row(row); + for(int i = 0; i < mTableDef->num_fields()+1; i++){ + if((*row)[i].str.ptr != NULL){ + free((*row)[i].str.ptr); + } + } delete row; if (ret < 0) @@ -1755,6 +1780,54 @@ int RocksdbProcess::process_delete(DTCTask *Task) return -1; } + std::set req_ids; + std::vector quick_keys(mCompoundKeyFieldNums); + quick_keys[0] = prefixKey; + DTCFieldValue *req_condition = (DTCFieldValue*)Task->request_condition(); + if(req_condition != NULL){ + for ( int i = 0; i < req_condition->num_fields(); i++ ){ + int req_field_id = req_condition->field_id(i); + if ( mTableDef->is_volatile(req_field_id) ) + continue; + int rocks_fid = translate_field_idx(req_field_id); + assert(rocks_fid >= 0 && rocks_fid < mCompoundKeyFieldNums + mExtraValueFieldNums); + if ( req_field_id == 0 || rocks_fid == 0 || rocks_fid >= mCompoundKeyFieldNums) + continue; + req_ids.insert(rocks_fid); + std::string& va = quick_keys[rocks_fid]; + ret = value2Str(req_condition->field_value(i), req_field_id, va); + assert( ret == 0 ); + } + bool hit_all_key = true; + for ( int idx = 1; idx < mCompoundKeyFieldNums; idx++ ){ + if(req_ids.find(idx) == req_ids.end()){ + hit_all_key = false; + } + } + if(true == hit_all_key){ + std::stringstream ss; + for(auto it :req_ids) { ss << it << ","; } + std::stringstream sskey; + for(auto key_it :quick_keys) { sskey << key_it << ","; } + log_debug("hit all unique keys, goto quick delete, req_ids: %s, keys: %s, mCompoundKeyFieldNums: %d", + ss.str().c_str(), sskey.str().c_str(), mCompoundKeyFieldNums); + std::string quick_rocks_Key; + std::string keyBitmaps; + encode_bitmap_keys(quick_keys, keyBitmaps); + quick_rocks_Key = std::move(key_format::Encode(quick_keys, mKeyfield_types)); + + ret = mDBConn->delete_entry(quick_rocks_Key); + if ( ret != 0 ) { + log_error("deleteEntry failed! "); + Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "deleteEntry failed!"); + return -1; + } + log_debug("quick delete success"); + return 0; + } + req_ids.clear(); + } + if (mKeyfield_types[0] == DField::String) std::transform(prefixKey.begin(), prefixKey.end(), prefixKey.begin(), ::tolower); @@ -3102,10 +3175,10 @@ int RocksdbProcess::split_values( char *head = const_cast(compoundValue.data()); for (int idx = 0; idx < mExtraValueFieldNums; idx++) { - if(idx == mExtraValueFieldNums-1){ // extend field no need to parse - values.push_back(""); - break; - } + //if(idx == mExtraValueFieldNums-1){ // extend field no need to parse + // values.push_back(""); + // break; + //} ret = get_value_by_id(head, mFieldIndexMapping[mCompoundKeyFieldNums + idx], value); assert(ret == 0); values.push_back(std::move(value)); diff --git a/src/search_local/index_storage/rocksdb_helper/elastic_buffer.cc b/src/search_local/index_storage/rocksdb_helper/elastic_buffer.cc index 94e60f1..5909ccd 100644 --- a/src/search_local/index_storage/rocksdb_helper/elastic_buffer.cc +++ b/src/search_local/index_storage/rocksdb_helper/elastic_buffer.cc @@ -1,10 +1,20 @@ -///////////////////////////////////////////////////////////// -// -// Elastic buffer for replication between slave and master -// created by qiuyu -// 1/11/2021 -// -///////////////////////////////////////////////////////////// +/* + * ===================================================================================== + * + * Filename: elastic_buffer.cc + * + * Description: elastic buffer for replication between slave and master + * + * Version: 1.0 + * Created: 09/08/2020 10:02:05 PM + * Revision: none + * Compiler: gcc + * + * Author: Norton, yangshuang68@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ #include "elastic_buffer.h" #include "log.h" diff --git a/src/search_local/index_storage/rocksdb_helper/elastic_buffer.h b/src/search_local/index_storage/rocksdb_helper/elastic_buffer.h index 5adb0b9..7ff1f39 100644 --- a/src/search_local/index_storage/rocksdb_helper/elastic_buffer.h +++ b/src/search_local/index_storage/rocksdb_helper/elastic_buffer.h @@ -1,10 +1,20 @@ -///////////////////////////////////////////////////////////// -// -// Elastic buffer for replication between slave and master -// created by qiuyu -// 1/11/2021 -// -///////////////////////////////////////////////////////////// +/* + * ===================================================================================== + * + * Filename: elastic_buffer.h + * + * Description: elastic buffer for replication between slave and master + * + * Version: 1.0 + * Created: 09/08/2020 10:02:05 PM + * Revision: none + * Compiler: gcc + * + * Author: Norton, yangshuang68@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ #ifndef __ELASTIC_BUFFER_H__ #define __ELASTIC_BUFFER_H__ @@ -57,6 +67,7 @@ class ElasticBuffer Buffer_t* nextBuffer(const Buffer_t* curr) { return curr->sNext; } void resetElasticBuffer() { + mReadLen = 0; mWritingBuffer = mHeadBuffer; while (mWritingBuffer) { diff --git a/src/search_local/index_storage/rocksdb_helper/key_format.cc b/src/search_local/index_storage/rocksdb_helper/key_format.cc index 284e81a..a19e60d 100644 --- a/src/search_local/index_storage/rocksdb_helper/key_format.cc +++ b/src/search_local/index_storage/rocksdb_helper/key_format.cc @@ -514,14 +514,14 @@ void key_format::DecodeBytes(const std::string &src, std::string &dst) { dst = ""; } - //std::stringstream oss_dst; + std::stringstream oss_dst; for (size_t i = 0; i < src.length(); i += SEGMENT_SIZE) { char padding_bytes = ENCODER_MARKER - src[i + 7]; - //oss_dst << src.substr(i, SEGMENT_SIZE - 1 - padding_bytes); - dst += src.substr(i, SEGMENT_SIZE - 1 - padding_bytes); + oss_dst << src.substr(i, SEGMENT_SIZE - 1 - padding_bytes); + //dst += src.substr(i, SEGMENT_SIZE - 1 - padding_bytes); } - //dst = oss_dst.str(); + dst = oss_dst.str(); } void key_format::DecodeBytes(const std::string &src, uint64_t &dst) diff --git a/src/search_local/index_storage/rocksdb_helper/rocksdb_replication.cc b/src/search_local/index_storage/rocksdb_helper/rocksdb_replication.cc index 66a8550..044efd7 100644 --- a/src/search_local/index_storage/rocksdb_helper/rocksdb_replication.cc +++ b/src/search_local/index_storage/rocksdb_helper/rocksdb_replication.cc @@ -23,11 +23,11 @@ extern DTCConfig *gConfig; // operation level, [prev state][current state] static bool gReplicationState[(int)ReplicationState::eReplicationMax + 1][(int)ReplicationState::eReplicationMax + 1] = { - {true, true, true, false, false}, + {true, true, true, false, true}, {false, true, true, false, false}, {false, true, true, true, true}, {false, true, false, true, true}, - {false, false, false, false, true} + {true, false, false, false, true} }; int getReplicationState(RocksDBConn* rocksdb) @@ -248,6 +248,7 @@ void RocksReplicationChannel::handleReplication() { RocksdbReplication::ReplicationType type = GET_REQUEST_TYPE(mPacketHeader.sPacketFlag); + CLEAR_REQUEST_TYPE(mPacketHeader.sPacketFlag , type); switch (type) { case RocksdbReplication::eReplSync: @@ -549,8 +550,8 @@ int RocksReplicationChannel::masterFillRangeKV( goto RANGE_QUERY_END; } - ret = mRocksdb->retrieve_start(startKey, value, rocksItr, true); - do { + ret = mRocksdb->retrieve_start(startKey, value, rocksItr, false); + while (mPacketBuffer->getBufferSize() + 2*sizeof(int) + startKey.length() + value.length() < REPLICATION_PACKET_SIZE) { if (ret < 0) { log_error("query rocksdb failed! key:%s, ret:%d", startKey.c_str(), ret); @@ -588,15 +589,25 @@ int RocksReplicationChannel::masterFillRangeKV( } ret = mRocksdb->next_entry(rocksItr, startKey, value); - } while (mPacketBuffer->getBufferSize() < REPLICATION_PACKET_SIZE); + }; mRocksdb->retrieve_end(rocksItr); RANGE_QUERY_END: ReplicationPacket_t* respHeader = (ReplicationPacket_t*)(mPacketBuffer->getHeadBuffer()->sData); - if (finished) SET_REQUEST_TYPE(respHeader->sPacketFlag, RocksdbReplication::eReplFin); - else SET_REQUEST_TYPE(respHeader->sPacketFlag, RocksdbReplication::eReplRepAck); + if (finished) { + SET_REQUEST_TYPE(respHeader->sPacketFlag, RocksdbReplication::eReplFin); + int state = (int)ReplicationState::eReplicationFinished; + int ret = updateReplicationState(mRocksdb, state); + if (ret < 0) + { + log_error("update replication state failed!"); + return -1; + } + } else { + SET_REQUEST_TYPE(respHeader->sPacketFlag, RocksdbReplication::eReplRepAck); + } respHeader->sRawPacketLen = mPacketBuffer->getBufferSize(); @@ -691,15 +702,15 @@ int RocksReplicationChannel::slaveFillRangeKV() newEntries[key] = value; - std::string dKey, dVal; - extern HelperProcessBase* helperProc; - RocksdbProcess* p_rocksdb_process = dynamic_cast(helperProc); - if (p_rocksdb_process != NULL){ - ret = p_rocksdb_process->decodeInternalKV( - key, value, dKey, dVal); - } + //std::string dKey, dVal; + //extern HelperProcessBase* helperProc; + //RocksdbProcess* p_rocksdb_process = dynamic_cast(helperProc); + //if (p_rocksdb_process != NULL){ + // ret = p_rocksdb_process->decodeInternalKV( + // key, value, dKey, dVal); + //} - log_error("replication kv, :%s,%s", dKey.c_str(), dVal.c_str()); + //log_error("replication kv, :%s,%s", dKey.c_str(), dVal.c_str()); } ret = mRocksdb->batch_update(std::set(), newEntries, true); diff --git a/src/search_local/index_storage/rocksdb_helper/rocksdb_replication.h b/src/search_local/index_storage/rocksdb_helper/rocksdb_replication.h index dfa63d0..4fddd8b 100644 --- a/src/search_local/index_storage/rocksdb_helper/rocksdb_replication.h +++ b/src/search_local/index_storage/rocksdb_helper/rocksdb_replication.h @@ -13,7 +13,7 @@ #include "rocksdb_conn.h" #define REPLICATON_HEADER_MAGIC 0x7654321 -#define REPLICATION_PACKET_SIZE (10 * (1 << 20)) // 10M +#define REPLICATION_PACKET_SIZE (1 * (1 << 20)) // 1M #define CLEAR_BITS(packetFlag) (packetFlag &= 0x0) @@ -30,6 +30,7 @@ enum class ReplicationType; #define SET_REQUEST_TYPE(packetFlag, type) (packetFlag |= (type << 2)) #define GET_REQUEST_TYPE(packetFlag) ((RocksdbReplication::ReplicationType)((packetFlag & 0xff) >> 2)) +#define CLEAR_REQUEST_TYPE(packetFlag, type) (packetFlag &= (-((type << 2)+1))) typedef struct ReplicationPacket {