diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 44ff4a0..0000000 --- a/.travis.yml +++ /dev/null @@ -1,34 +0,0 @@ -compiler: -- g++ -os: -- linux -addons: - apt: - packages: - - g++-4.8.5 -env: -- ISEARCH_EVAL="CC=gcc-4.8.5 && CXX=g++-4.8.5" - -before_install: -- eval "${ISEARCH_EVAL}" - -install: -- echo ${CC} -- ${CC} --version -- echo ${CXX} -- ${CXX} --version -- cmake --version -- sudo apt-get install snappy libsnappy-dev zlib1g zlib1g-dev bzip2 liblz4-dev libasan0 openssl libmxml-dev - -script: -- cmake . -- make - -after_success: -- cp src/search_agent/bin/search_agent dockerfiles/agent/ -- cp resource/search_agent/conf/sa.conf dockerfiles/agent/ -- cd dockerfiles/agent -- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - -- docker build -t $DOCKER_USERNAME/search_agent:latest . -- docker push $DOCKER_USERNAME/search_agent:latest \ No newline at end of file diff --git a/dockerfiles/agent/Dockerfile b/dockerfiles/agent/Dockerfile deleted file mode 100644 index f0971b2..0000000 --- a/dockerfiles/agent/Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -FROM centos:centos7.2.1511 - -ARG basepath=/usr/local/isearch/search_agent - -RUN mkdir -p $basepath/bin -RUN mkdir -p $basepath/conf -RUN mkdir -p $basepath/log - -COPY search_agent $basepath/bin/search_agent -COPY sa.conf $basepath/conf/sa.conf - -CMD /usr/local/isearch/search_agent/bin/search_agent -d -c /usr/local/isearch/search_agent/conf/sa.conf -v 3 \ No newline at end of file diff --git a/src/search_local/index_read/component.cc b/src/search_local/index_read/component.cc deleted file mode 100644 index 2e5276e..0000000 --- a/src/search_local/index_read/component.cc +++ /dev/null @@ -1,725 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: component.h - * - * Description: component class definition. - * - * Version: 1.0 - * Created: 09/08/2019 - * Revision: none - * Compiler: gcc - * - * Author: zhulin, shzhulin3@jd.com - * Company: JD.com, Inc. - * - * ===================================================================================== - */ - -#include "component.h" -#include "split_manager.h" -#include "db_manager.h" -#include "utf8_str.h" -#include -using namespace std; - -Component::Component(){ - SGlobalConfig &global_cfg = SearchConf::Instance()->GetGlobalConfig(); - m_default_query = global_cfg.sDefaultQuery; - m_jdq_switch = global_cfg.iJdqSwitch; - m_page_index = 0; - m_page_size = 0; - m_return_all = 0; - m_cache_switch = 0; - m_top_switch = 0; - m_snapshot_switch = 0; - m_sort_type = SORT_RELEVANCE; - m_appid = 10001; - m_user_id = "0"; - m_last_id = ""; - m_last_score = ""; - m_search_after = false; - distance = 0; - m_terminal_tag = 0; - m_terminal_tag_valid = false; -} - -Component::~Component(){ - -} - -int Component::ParseJson(const char *sz_json, int json_len, Json::Value &recv_packet) -{ - Json::Reader r(Json::Features::strictMode()); - int ret; - ret = r.parse(sz_json, sz_json + json_len, recv_packet); - if (0 == ret) - { - log_error("the err json string is : %s", sz_json); - log_error("parse json error , errmsg : %s", r.getFormattedErrorMessages().c_str()); - return -RT_PARSE_JSON_ERR; - } - - if (recv_packet.isMember("appid") && recv_packet["appid"].isUInt()) - { - m_appid = recv_packet["appid"].asUInt(); - } - else { - m_appid = 10001; - } - - if (recv_packet.isMember("userid") && recv_packet["userid"].isString()) - { - m_user_id = recv_packet["userid"].asString(); - } - else { - m_user_id = "0"; - } - - if (recv_packet.isMember("key") && recv_packet["key"].isString()) - { - m_Data = recv_packet["key"].asString(); - } - else { - m_Data = ""; - } - - if (recv_packet.isMember("key_and") && recv_packet["key_and"].isString()) - { - m_Data_and = recv_packet["key_and"].asString(); - } - else { - m_Data_and = ""; - } - - if (recv_packet.isMember("key_invert") && recv_packet["key_invert"].isString()) - { - m_Data_invert = recv_packet["key_invert"].asString(); - } - else { - m_Data_invert = ""; - } - - if (recv_packet.isMember("key_complete") && recv_packet["key_complete"].isString()) - { - m_Data_complete = recv_packet["key_complete"].asString(); - } - else { - m_Data_complete = ""; - } - - if (recv_packet.isMember("page_index") && recv_packet["page_index"].isString()) - { - m_page_index = atoi(recv_packet["page_index"].asString().c_str()); - } - else { - m_page_index = 1 ; - } - - if (recv_packet.isMember("page_size") && recv_packet["page_size"].isString()) - { - m_page_size = atoi(recv_packet["page_size"].asString().c_str()); - } - else { - m_page_size = 10; - } - - if(recv_packet.isMember("sort_type") && recv_packet["sort_type"].isString()) - { - m_sort_type = atoi(recv_packet["sort_type"].asString().c_str()); - } - else { - m_sort_type = SORT_RELEVANCE; - } - - if(recv_packet.isMember("sort_field") && recv_packet["sort_field"].isString()) - { - m_sort_field = recv_packet["sort_field"].asString(); - } - else { - m_sort_field = ""; - } - - if (recv_packet.isMember("return_all") && recv_packet["return_all"].isString()) - { - m_return_all = atoi(recv_packet["return_all"].asString().c_str()); - } - else { - m_return_all = 0; - } - - if(recv_packet.isMember("fields") && recv_packet["fields"].isString()) - { - string fields = recv_packet["fields"].asString(); - m_fields = splitEx(fields, ","); - } - - if (recv_packet.isMember("terminal_tag") && recv_packet["terminal_tag"].isString()) - { - m_terminal_tag = atoi(recv_packet["terminal_tag"].asString().c_str()); - } - else { - m_terminal_tag = 0; - } - - if(m_terminal_tag == 1){ - if(m_Data_and == "" || m_Data != "" || m_Data_invert != ""){ - log_error("terminal_tag is true, only key_and is available."); - return -RT_PARSE_JSON_ERR; - } - } - - if(recv_packet.isMember("last_id") && recv_packet["last_id"].isString()) - { - m_last_id = recv_packet["last_id"].asString(); - } - else { - m_last_id = ""; - } - - bool score_flag = true; - if (recv_packet.isMember("last_score") && recv_packet["last_score"].isString()) - { - m_last_score = recv_packet["last_score"].asString(); - } - else { - score_flag = false; - m_last_score = "0"; - } - if(m_last_id != "" && score_flag == true){ - m_search_after = true; - } - if(m_search_after == true && m_sort_type != SORT_FIELD_DESC && m_sort_type != SORT_FIELD_ASC){ - log_error("in search_after mode, sort_type must be SORT_FIELD_DESC or SORT_FIELD_ASC."); - return -RT_PARSE_JSON_ERR; - } - if ("" == m_Data && "" == m_Data_and && "" == m_Data_complete) { - m_Data = m_default_query; - } - log_debug("parse success, m_Data: %s, m_Data_and: %s, m_Data_invert: %s, m_page_index: %u, m_return_all: %u", - m_Data.c_str(), m_Data_and.c_str(), m_Data_invert.c_str(), m_page_index, m_return_all); - - return 0; -} - -void Component::InitSwitch() -{ - AppInfo app_info; - bool res = SearchConf::Instance()->GetAppInfo(m_appid, app_info); - if (true == res){ - m_cache_switch = app_info.cache_switch; - m_top_switch = app_info.top_switch; - m_snapshot_switch = app_info.snapshot_switch; - } -} - -void Component::GetQueryWord(uint32_t &m_has_gis){ - GetFieldWords(MAINKEY, m_Data, m_appid, m_has_gis); - GetFieldWords(ANDKEY, m_Data_and, m_appid, m_has_gis); - GetFieldWords(INVERTKEY, m_Data_invert, m_appid, m_has_gis); -} - -void Component::GetFieldWords(int type, string dataStr, uint32_t appid, uint32_t &m_has_gis){ - if (dataStr == "") - return ; - string latitude_tmp = ""; - string longitude_tmp = ""; - string gisip_tmp = ""; - string field_Data = ""; - string primary_Data = ""; - vector joinFieldInfos; - int i = dataStr.find(":"); - if (i == -1) { - primary_Data = dataStr; - } else { - int j = dataStr.substr(0, i).rfind(" "); - if (j == -1) { - field_Data = dataStr; - primary_Data = ""; - } else { - primary_Data = dataStr.substr(0, j); - field_Data = dataStr.substr(j+1); - } - } - - if (type == 0) { - m_Query_Word = primary_Data; - } - - if (primary_Data.length() > MAX_SEARCH_LEN) { // 超长进行截断 - primary_Data = primary_Data.substr(0, MAX_SEARCH_LEN); - } - - string probably_key = ""; - bool is_correct = false; - if(primary_Data != "" && primary_Data.length() <= SINGLE_WORD_LEN) // 判断输入的词语是否正确,如果超过一定长度,则认为是多个词组成 - { - JudgeWord(appid, primary_Data, is_correct, probably_key); - m_probably_data = probably_key; - } - vector primaryInfo; - FieldInfo pInfo; - string split_data; - if (is_correct == true) { - pInfo.field = INT_MAX; - pInfo.word = primary_Data; - primaryInfo.push_back(pInfo); - DataManager::Instance()->GetSynonymByKey(primary_Data, primaryInfo); - } - else if (probably_key != "") { - pInfo.word = probably_key; - primaryInfo.push_back(pInfo); - DataManager::Instance()->GetSynonymByKey(probably_key, primaryInfo); - } - else if (primary_Data != ""){ - split_data = SplitManager::Instance()->split(primary_Data, appid); - log_debug("split_data: %s", split_data.c_str()); - vector split_datas = splitEx(split_data, "|"); - for(size_t i = 0; i < split_datas.size(); i++) //是否有重复的同义词存在? - { - pInfo.field = INT_MAX; - pInfo.word = split_datas[i]; - primaryInfo.push_back(pInfo); - DataManager::Instance()->GetSynonymByKey(split_datas[i], primaryInfo); - } - } - AddToFieldList(type, primaryInfo); - - vector gisCode; - vector vec = splitEx(field_Data, " "); - vector::iterator iter; - map > field_keys_map; - uint32_t range_query = 0; - vector lng_arr; - vector lat_arr; - for (iter = vec.begin(); iter < vec.end(); iter++) - { - vector fieldInfos; - if ((*iter)[0] == '\0') - continue; - vector tmp = splitEx(*iter, ":"); - if (tmp.size() != 2) - continue; - uint32_t segment_tag = 0; - FieldInfo fieldInfo; - string fieldname = tmp[0]; - - uint32_t field = DBManager::Instance()->GetWordField(segment_tag, appid, fieldname, fieldInfo); - if(field != 0 && fieldInfo.index_tag == 0){ - ExtraFilterKey extra_filter_key; - extra_filter_key.field_name = fieldname; - extra_filter_key.field_value = tmp[1]; - extra_filter_key.field_type = fieldInfo.field_type; - if(type == 0){ - extra_filter_keys.push_back(extra_filter_key); - } else if (type == 1) { - extra_filter_and_keys.push_back(extra_filter_key); - } else if (type == 2) { - extra_filter_invert_keys.push_back(extra_filter_key); - } - continue; - } - if (field != 0 && segment_tag == 1) - { - string split_data = SplitManager::Instance()->split(tmp[1], appid); - log_debug("split_data: %s", split_data.c_str()); - vector split_datas = splitEx(split_data, "|"); - for(size_t index = 0; index < split_datas.size(); index++) - { - FieldInfo info; - info.field = fieldInfo.field; - info.field_type = fieldInfo.field_type; - info.word = split_datas[index]; - info.segment_tag = fieldInfo.segment_tag; - fieldInfos.push_back(info); - } - } - else if (field != 0 && segment_tag == 5) { - range_query++; - string str = tmp[1]; - str.erase(0, str.find_first_not_of(" ")); - str.erase(str.find_last_not_of(" ") + 1); - if (str.size() == 0) { - log_error("field[%s] content is null", fieldname.c_str()); - continue; - } - if (str[0] == '[') { // 范围查询 - int l = str.find(","); - if (l == -1 || (str[str.size() - 1] != ']' && str[str.size() - 1] != ')')) { - log_error("field[%s] content[%s] invalid", fieldname.c_str(), str.c_str()); - continue; - } - istringstream iss(str.substr(1, l).c_str()); - iss >> fieldInfo.start; - string end_str = str.substr(l + 1, str.size() - l - 2); - end_str.erase(0, end_str.find_first_not_of(" ")); - istringstream end_iss(end_str); - end_iss >> fieldInfo.end; - - if (str[str.size() - 1] == ']') { - fieldInfo.range_type = RANGE_GELE; - } - else { - if (end_str.size() == 0) { - fieldInfo.range_type = RANGE_GE; - } - else { - fieldInfo.range_type = RANGE_GELT; - } - } - fieldInfos.push_back(fieldInfo); - } - else if (str[0] == '(') { - int l = str.find(","); - if (l == -1 || (str[str.size() - 1] != ']' && str[str.size() - 1] != ')')) { - log_error("field[%s] content[%s] invalid", fieldname.c_str(), str.c_str()); - continue; - } - string start_str = str.substr(1, l).c_str(); - string end_str = str.substr(l + 1, str.size() - l - 2); - start_str.erase(0, start_str.find_first_not_of(" ")); - end_str.erase(0, end_str.find_first_not_of(" ")); - istringstream start_iss(start_str); - start_iss >> fieldInfo.start; - istringstream end_iss(end_str); - end_iss >> fieldInfo.end; - - if (str[str.size() - 1] == ']') { - if (start_str.size() == 0) { - fieldInfo.range_type = RANGE_LE; - } - else { - fieldInfo.range_type = RANGE_GTLE; - } - } - else { - if (start_str.size() != 0 && end_str.size() != 0) { - fieldInfo.range_type = RANGE_GTLT; - } - else if (start_str.size() == 0 && end_str.size() != 0) { - fieldInfo.range_type = RANGE_LT; - } - else if (start_str.size() != 0 && end_str.size() == 0) { - fieldInfo.range_type = RANGE_GT; - } - else { - log_error("field[%s] content[%s] invalid", fieldname.c_str(), str.c_str()); - continue; - } - } - fieldInfos.push_back(fieldInfo); - } - else { - fieldInfo.word = tmp[1]; - fieldInfos.push_back(fieldInfo); - } - log_debug("range_type: %d, start: %u, end: %u, segment_tag: %d, word: %s", fieldInfo.range_type, fieldInfo.start, fieldInfo.end, fieldInfo.segment_tag, fieldInfo.word.c_str()); - } - else if (field != 0) - { - fieldInfo.word = tmp[1]; - fieldInfos.push_back(fieldInfo); - } - else if (field == 0) - { - if (fieldInfo.field_type == 5) { - longitude_tmp = tmp[1]; - longitude = longitude_tmp; - } else if (fieldInfo.field_type == 6) { - latitude_tmp = tmp[1]; - latitude = latitude_tmp; - } else if (fieldInfo.field_type == 8) { - distance = strToDouble(tmp[1]); - } else if (fieldInfo.field_type == 7) { - gisip_tmp = tmp[1]; - gisip = gisip_tmp; - } else if (fieldInfo.field_type == FIELD_WKT) { - string str = tmp[1]; - str = delPrefix(str); - vector str_vec = splitEx(str, ","); - for(uint32_t str_vec_idx = 0; str_vec_idx < str_vec.size(); str_vec_idx++){ - string wkt_str = trim(str_vec[str_vec_idx]); - vector wkt_vec = splitEx(wkt_str, "-"); - if(wkt_vec.size() == 2){ - lng_arr.push_back(wkt_vec[0]); - lat_arr.push_back(wkt_vec[1]); - } - } - } - } - if (fieldInfos.size() != 0) { - field_keys_map.insert(make_pair(fieldInfo.field, fieldInfos)); - } - } - - double distance_tmp = 2; // 不指定distance时最多返回2km内的数据 - if(distance > 1e-6 && distance_tmp > distance + 1e-6){ // distance大于0小于2时取distance的值 - distance_tmp = distance; - } - GetGisCode(longitude_tmp, latitude_tmp, gisip_tmp, distance_tmp, gisCode); - log_debug("lng_arr size: %d, lat_arr size: %d", (int)lng_arr.size(), (int)lat_arr.size()); - if (gisCode.size() == 0 && lng_arr.size() > 0){ - GetGisCode(lng_arr, lat_arr, gisCode); - } - if(gisCode.size() > 0){ - vector fieldInfos; - uint32_t segment_tag = 0; - FieldInfo fieldInfo; - uint32_t field = DBManager::Instance()->GetWordField(segment_tag, appid, "gis", fieldInfo); - if (field != 0 && segment_tag == 0) { - m_has_gis = 1; - for (size_t index = 0; index < gisCode.size(); index++) { - FieldInfo info; - info.field = fieldInfo.field; - info.field_type = fieldInfo.field_type; - info.segment_tag = fieldInfo.segment_tag; - info.word = gisCode[index]; - fieldInfos.push_back(info); - } - } - if (fieldInfos.size() != 0) { - field_keys_map.insert(make_pair(fieldInfo.field, fieldInfos)); - } - } - - //如果key_and查询的field匹配到联合索引,则将查询词拼接起来作为新的查询词 - if(type == 1){ - vector union_key_vec; - DBManager::Instance()->GetUnionKeyField(appid, union_key_vec); - vector::iterator union_key_iter = union_key_vec.begin(); - for(; union_key_iter != union_key_vec.end(); union_key_iter++){ - string union_key = *union_key_iter; - vector union_field_vec = splitInt(union_key, ","); - vector::iterator union_field_iter = union_field_vec.begin(); - bool hit_union_key = true; - for(; union_field_iter != union_field_vec.end(); union_field_iter++){ - if(field_keys_map.find(*union_field_iter) == field_keys_map.end()){ - hit_union_key = false; - break; - } - } - if(hit_union_key == true){ - vector > keys_vvec; - vector unionFieldInfos; - for(union_field_iter = union_field_vec.begin(); union_field_iter != union_field_vec.end(); union_field_iter++){ - vector field_info_vec = field_keys_map.at(*union_field_iter); - vector key_vec; - GetKeyFromFieldInfo(field_info_vec, key_vec); - keys_vvec.push_back(key_vec); - field_keys_map.erase(*union_field_iter); // 命中union_key的需要从field_keys_map中删除 - } - vector union_keys = Combination(keys_vvec); - for(int m = 0 ; m < (int)union_keys.size(); m++){ - FieldInfo info; - info.field = 0; - info.field_type = FIELD_STRING; - info.segment_tag = 1; - info.word = union_keys[m]; - unionFieldInfos.push_back(info); - } - AddToFieldList(type, unionFieldInfos); - log_debug("hit union key index."); - break; - } - } - map >::iterator field_key_map_iter = field_keys_map.begin(); - for(; field_key_map_iter != field_keys_map.end(); field_key_map_iter++){ - AddToFieldList(type, field_key_map_iter->second); - } - } else { - map >::iterator field_key_map_iter = field_keys_map.begin(); - for(; field_key_map_iter != field_keys_map.end(); field_key_map_iter++){ - AddToFieldList(type, field_key_map_iter->second); - } - } - - if(type == 1){ // terminal_tag为1时,key_and中必须只带有一个范围查询 - if(m_terminal_tag == 1 && range_query == 1 && and_keys.size() == 1){ - m_terminal_tag_valid = true; - } - } - - return ; -} - -void Component::AddToFieldList(int type, vector& fields) -{ - if (fields.size() == 0) - return ; - if (type == 0) { - keys.push_back(fields); - } else if (type == 1) { - and_keys.push_back(fields); - } else if (type == 2) { - invert_keys.push_back(fields); - } - return ; -} - -const vector >& Component::Keys(){ - return keys; -} - -const vector >& Component::AndKeys(){ - return and_keys; -} - -const vector >& Component::InvertKeys(){ - return invert_keys; -} - -const vector& Component::ExtraFilterKeys(){ - return extra_filter_keys; -} - -const vector& Component::ExtraFilterAndKeys(){ - return extra_filter_and_keys; -} - -const vector& Component::ExtraFilterInvertKeys(){ - return extra_filter_invert_keys; -} - -string Component::QueryWord(){ - return m_Query_Word; -} - -void Component::SetQueryWord(string query_word){ - m_Query_Word = query_word; -} - -string Component::ProbablyData(){ - return m_probably_data; -} - -void Component::SetProbablyData(string probably_data){ - m_probably_data = probably_data; -} - -string Component::Latitude(){ - return latitude; -} - -string Component::Longitude(){ - return longitude; -} - -double Component::Distance(){ - return distance; -} - -string Component::Data(){ - return m_Data; -} - -uint32_t Component::JdqSwitch(){ - return m_jdq_switch; -} - -uint32_t Component::Appid(){ - return m_appid; -} - -string Component::DataAnd(){ - return m_Data_and; -} - -string Component::DataInvert(){ - return m_Data_invert; -} - -string Component::DataComplete(){ - return m_Data_complete; -} - -uint32_t Component::SortType(){ - return m_sort_type; -} - -uint32_t Component::PageIndex(){ - return m_page_index; -} -uint32_t Component::PageSize(){ - return m_page_size; -} - -uint32_t Component::ReturnAll(){ - return m_return_all; -} - -uint32_t Component::CacheSwitch(){ - return m_cache_switch; -} - -uint32_t Component::TopSwitch(){ - return m_top_switch; -} - -uint32_t Component::SnapshotSwitch(){ - return m_snapshot_switch; -} - -string Component::SortField(){ - return m_sort_field; -} - -string Component::LastId(){ - return m_last_id; -} - -string Component::LastScore(){ - return m_last_score; -} - -bool Component::SearchAfter(){ - return m_search_after; -} - -vector& Component::Fields(){ - return m_fields; -} - -uint32_t Component::TerminalTag(){ - return m_terminal_tag; -} - -bool Component::TerminalTagValid(){ - return m_terminal_tag_valid; -} - -void Component::GetKeyFromFieldInfo(const vector& field_info_vec, vector& key_vec){ - vector::const_iterator iter = field_info_vec.begin(); - for(; iter != field_info_vec.end(); iter++){ - key_vec.push_back((*iter).word); - } -} - -/* -** 通过递归求出二维vector每一维vector中取一个数的各种组合 -** 输入:[[a],[b1,b2],[c1,c2,c3]] -** 输出:[a_b1_c1,a_b1_c2,a_b1_c3,a_b2_c1,a_b2_c2,a_b2_c3] -*/ -vector Component::Combination(vector > &dimensionalArr){ - int FLength = dimensionalArr.size(); - if(FLength >= 2){ - int SLength1 = dimensionalArr[0].size(); - int SLength2 = dimensionalArr[1].size(); - int DLength = SLength1 * SLength2; - vector temporary(DLength); - int index = 0; - for(int i = 0; i < SLength1; i++){ - for (int j = 0; j < SLength2; j++) { - temporary[index] = dimensionalArr[0][i] +"_"+ dimensionalArr[1][j]; - index++; - } - } - vector > new_arr; - new_arr.push_back(temporary); - for(int i = 2; i < (int)dimensionalArr.size(); i++){ - new_arr.push_back(dimensionalArr[i]); - } - return Combination(new_arr); - } else { - return dimensionalArr[0]; - } -} \ No newline at end of file diff --git a/src/search_local/index_read/component.h b/src/search_local/index_read/component.h deleted file mode 100644 index e103c5c..0000000 --- a/src/search_local/index_read/component.h +++ /dev/null @@ -1,114 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: component.h - * - * Description: component class definition. - * - * Version: 1.0 - * Created: 09/08/2019 - * Revision: none - * Compiler: gcc - * - * Author: zhulin, shzhulin3@jd.com - * Company: JD.com, Inc. - * - * ===================================================================================== - */ - -#ifndef __COMPONENT_H__ -#define __COMPONENT_H__ -#include "comm.h" -#include "json/json.h" -#include -#include -using namespace std; - -class Component -{ -public: - Component(); - ~Component(); - - void GetQueryWord(uint32_t &m_has_gis); - const vector >& Keys(); - const vector >& AndKeys(); - const vector >& InvertKeys(); - const vector& ExtraFilterKeys(); - const vector& ExtraFilterAndKeys(); - const vector& ExtraFilterInvertKeys(); - int ParseJson(const char *sz_json, int json_len, Json::Value &recv_packet); - void InitSwitch(); - string QueryWord(); - void SetQueryWord(string query_word); - string ProbablyData(); - void SetProbablyData(string probably_data); - string Latitude(); - string Longitude(); - double Distance(); - string Data(); - string DataAnd(); - string DataInvert(); - string DataComplete(); - uint32_t JdqSwitch(); - uint32_t Appid(); - uint32_t SortType(); - uint32_t PageIndex(); - uint32_t PageSize(); - uint32_t ReturnAll(); - uint32_t CacheSwitch(); - uint32_t TopSwitch(); - uint32_t SnapshotSwitch(); - string SortField(); - string LastId(); - string LastScore(); - bool SearchAfter(); - vector& Fields(); - uint32_t TerminalTag(); - bool TerminalTagValid(); - -private: - void GetFieldWords(int type, string dataStr, uint32_t appid, uint32_t &m_has_gis); - void AddToFieldList(int type, vector& fields); - void GetKeyFromFieldInfo(const vector& field_info_vec, vector& key_vec); - vector Combination(vector > &dimensionalArr); - -private: - vector > keys; - vector > and_keys; - vector > invert_keys; - vector extra_filter_keys; - vector extra_filter_and_keys; - vector extra_filter_invert_keys; - - string m_Query_Word; - string m_probably_data; - string latitude; - string longitude; - string gisip; - double distance; - - string m_Data; //查询词 - string m_Data_and; // 包含该查询词 - string m_Data_invert; // 不包含该查询词 - string m_Data_complete; // 完整关键词 - uint32_t m_page_index; - uint32_t m_page_size; - uint32_t m_return_all; - uint32_t m_cache_switch; - uint32_t m_top_switch; - uint32_t m_snapshot_switch; - uint32_t m_sort_type; - uint32_t m_appid; - string m_user_id; - string m_sort_field; - string m_last_id; - string m_last_score; - bool m_search_after; - vector m_fields; - string m_default_query; - uint32_t m_jdq_switch; - uint32_t m_terminal_tag; - bool m_terminal_tag_valid; -}; -#endif \ No newline at end of file diff --git a/src/search_local/index_read/index_sync/rocksdb_direct_context.h b/src/search_local/index_read/index_sync/rocksdb_direct_context.h deleted file mode 100644 index bb3fd31..0000000 --- a/src/search_local/index_read/index_sync/rocksdb_direct_context.h +++ /dev/null @@ -1,254 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: rocksdb_direct_context.h - * - * Description: rocksdb direct context class definition. - * - * Version: 1.0 - * Created: 09/08/2020 - * Revision: none - * Compiler: gcc - * - * Author: zhulin, shzhulin3@jd.com - * Company: JD.com, Inc. - * - * ===================================================================================== - */ - -#ifndef __ROCKSDB_DIRECT_CONTEXT_H__ -#define __ROCKSDB_DIRECT_CONTEXT_H__ - -#if 1 - -#include -#include -#include -#include -#include -#include - -static const uint16_t sgMagicNum = 12345; // global magic number - -// operator must be matched with DTC with the same order -enum CondOpr -{ - eEQ, // == - eNE, // != - eLT, // < - eLE, // <= - eGT, // > - eGE // >= -}; - -#pragma pack(push, 1) -struct QueryCond -{ - uint8_t sFieldIndex; - uint8_t sCondOpr; // CondOpr - std::string sCondValue; - -private: - int binarySize() - { - static int fixHeaderLen = sizeof(sFieldIndex) + sizeof(sCondOpr) + sizeof(int)/* value len */; - return fixHeaderLen + sCondValue.length(); - } - - void serializeTo(char* &data) - { - *(uint8_t*)data = sFieldIndex; - data += sizeof(uint8_t); - - *(uint8_t*)data = sCondOpr; - data += sizeof(uint8_t); - - *(int*)data = sCondValue.length(); - data += sizeof(int); - - memmove((void*)data, (void*)sCondValue.c_str(), sCondValue.length()); - data += sCondValue.length(); - } - - void serializeFrom(const char* data, int& condLen) - { - const char *begPos = data; - - sFieldIndex = *(uint8_t*)data; - data += sizeof(uint8_t); - - sCondOpr = *(uint8_t*)data; - data += sizeof(uint8_t); - - int len = *(int*)data; - data += sizeof(int); - - sCondValue.assign(data, len); - condLen = data - begPos + len; - } - - friend class DirectRequestContext; -}; - -struct LimitCond -{ - int sLimitStart; - int sLimitStep; - LimitCond(){ - sLimitStart = 0; - sLimitStep = 10; - } - - void reset() { sLimitStart = -1, sLimitStep = -1; } -}; - -struct DirectRequestContext -{ - uint16_t sMagicNum; - uint64_t sSequenceId; - - std::vector sFieldConds; - std::vector > sOrderbyFields; - LimitCond sLimitCond; - - void reset() - { - sMagicNum = 0; - sSequenceId = 0; - sFieldConds.clear(); - sLimitCond.reset(); - } - - // binary format size for transporting in - int binarySize() - { - static int fixHeaderLen = sizeof(sMagicNum) + sizeof(sSequenceId) + sizeof(uint8_t) * 2; - - int len = fixHeaderLen; - for ( size_t idx = 0; idx < sFieldConds.size(); idx++ ) - { - len += sFieldConds[idx].binarySize(); - } - for (size_t idx = 0; idx < sOrderbyFields.size(); idx++) - { - len += (sizeof(int) + sizeof(bool)); - } - len += sizeof(LimitCond); - - return len; - } - - // before call this function, should call 'binarySize' to evaluate the size of the struct - void serializeTo(char *data, int len) - { - *(uint16_t*)data = sMagicNum; - data += sizeof(uint16_t); - - *(uint64_t*)data = sSequenceId; - data += sizeof(uint64_t); - - *(uint8_t*)data = sFieldConds.size(); - data += sizeof(uint8_t); - - for ( size_t idx = 0; idx < sFieldConds.size(); idx++ ) - { - sFieldConds[idx].serializeTo(data); - } - - *(uint8_t*)data = sOrderbyFields.size(); - data += sizeof(uint8_t); - for ( size_t idx = 0; idx < sOrderbyFields.size(); idx++ ) - { - *(int*)data = sOrderbyFields[idx].first; - data += sizeof(int); - *(bool*)data = sOrderbyFields[idx].second; - data += sizeof(bool); - } - - memmove((void*)data, (void*)&sLimitCond, sizeof(LimitCond)); - data += sizeof(LimitCond); - log_debug("serializeTo, sLimitStart: %d, sLimitStep: %d", sLimitCond.sLimitStart, sLimitCond.sLimitStep); - } - - void serializeFrom(const char *data, int dataLen) - { - sMagicNum = *(uint16_t*)data; - data += sizeof(uint16_t); - dataLen -= sizeof(uint16_t); - - sSequenceId = *(uint64_t*)data; - data += sizeof(uint64_t); - dataLen -= sizeof(uint64_t); - - uint8_t condNum = *(uint8_t*)data; - data += sizeof(uint8_t); - dataLen -= sizeof(uint8_t); - - QueryCond cond; - int condLen = 0; - for ( uint8_t idx = 0; idx < condNum; idx++ ) - { - cond.serializeFrom(data, condLen); - data += condLen; - dataLen -= condLen; - - sFieldConds.push_back(cond); - } - std::pair orPair; - uint8_t orderNum = *(uint8_t*)data; - data += sizeof(uint8_t); - dataLen -= sizeof(uint8_t); - for ( uint8_t idx = 0; idx < orderNum; idx++ ) - { - orPair.first = *(int*)data; - data += sizeof(int); - dataLen -= sizeof(int); - - orPair.second = *(bool*)data; - data += sizeof(bool); - dataLen -= sizeof(bool); - - sOrderbyFields.push_back(orPair); - } - memmove((void*)&sLimitCond, (void*)data, sizeof(LimitCond)); - dataLen -= sizeof(LimitCond); - log_debug("serializeFrom, sLimitStart: %d, sLimitStep: %d", sLimitCond.sLimitStart, sLimitCond.sLimitStep); - - assert( dataLen == 0 ); - } -}; - -struct DirectResponseContext -{ - uint16_t sMagicNum; - uint64_t sSequenceId; - int16_t sRowNums; // number of matched rows or errno - std::deque sRowValues; - - void serializeTo(std::string& data) - { - static int headerLen = sizeof(uint16_t) + sizeof(uint64_t) + sizeof(int16_t); - - data.clear(); - data = (std::string((char*)this, headerLen)); - - for ( int16_t idx = 0; idx < sRowNums; idx++ ) - { - data.append(sRowValues.front()); - sRowValues.pop_front(); - } - } - - void free() - { - sMagicNum = 0; - sSequenceId = 0; - sRowNums = -1; - sRowValues.clear(); - } -}; -#pragma pack(pop) - -#endif - -#endif // __ROCKSDB_DIRECT_CONTEXT_H__ diff --git a/src/search_local/index_read/logical_operate.cc b/src/search_local/index_read/logical_operate.cc deleted file mode 100644 index 5d7ec43..0000000 --- a/src/search_local/index_read/logical_operate.cc +++ /dev/null @@ -1,341 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: logical_operate.h - * - * Description: logical operate class definition. - * - * Version: 1.0 - * Created: 09/08/2018 - * Revision: none - * Compiler: gcc - * - * Author: zhulin, shzhulin3@jd.com - * Company: JD.com, Inc. - * - * ===================================================================================== - */ - -#include "logical_operate.h" -#include "search_util.h" -#include "cachelist_unit.h" -#include "data_manager.h" -#include "json/reader.h" -#include "json/writer.h" -#include "index_tbl_op.h" -#include "index_sync/sync_index_timer.h" -#include "index_sync/sequence_search_index.h" -#include "stem.h" -#include -#include -using namespace std; - -extern SyncIndexTimer *globalSyncIndexTimer; -extern CCacheListUnit *indexcachelist; - -LogicalOperate::LogicalOperate(uint32_t a, uint32_t s, uint32_t h, uint32_t c):m_appid(a), m_sort_type(s), m_has_gis(h), m_cache_switch(c) -{ - -} - -LogicalOperate::~LogicalOperate(){ - -} - -void LogicalOperate::SetFunc(logical_func func){ - m_func = func; -} - -int LogicalOperate::Process(const vector >& keys, vector& vecs, set& highlightWord, map &ves, map &key_in_doc){ - for (size_t index = 0; index < keys.size(); index++) - { - vector doc_id_vec; - vector fieldInfos = keys[index]; - vector::iterator it; - for (it = fieldInfos.begin(); it != fieldInfos.end(); it++) { - vector doc_info; - if ((*it).segment_tag == 3) { - int ret = GetDocByShiftWord(*it, doc_info, m_appid, highlightWord); - if (ret != 0) { - doc_id_vec.clear(); - return -RT_GET_DOC_ERR; - } - sort(doc_info.begin(), doc_info.end()); - for (size_t doc_info_idx = 0; doc_info_idx < doc_info.size(); doc_info_idx++){ - KeyInfo info; - info.word_freq = 1; - info.field = (*it).field; - info.word = (*it).word; - ves[doc_info[doc_info_idx].doc_id].push_back(info); - } - } else if ((*it).segment_tag == 4) { - int ret = GetDocByShiftEnWord(*it, doc_info, m_appid, highlightWord); - if (ret != 0) { - doc_id_vec.clear(); - return -RT_GET_DOC_ERR; - } - sort(doc_info.begin(), doc_info.end()); - for (size_t doc_info_idx = 0; doc_info_idx < doc_info.size(); doc_info_idx++){ - KeyInfo info; - info.word_freq = 1; - info.field = (*it).field; - info.word = (*it).word; - ves[doc_info[doc_info_idx].doc_id].push_back(info); - } - } else if ((*it).segment_tag == 5 && (*it).word == "") { // 范围查询 - stringstream ss; - ss << m_appid; - InvertIndexEntry startEntry(ss.str(), (*it).field, (double)(*it).start); - InvertIndexEntry endEntry(ss.str(), (*it).field, (double)(*it).end); - std::vector resultEntry; - globalSyncIndexTimer->GetSearchIndex()->GetRangeIndex((*it).range_type, startEntry, endEntry, resultEntry); - std::vector::iterator iter = resultEntry.begin(); - for (; iter != resultEntry.end(); iter ++) { - IndexInfo info; - info.doc_id = (*iter)._InvertIndexDocId; - info.doc_version = (*iter)._InvertIndexDocVersion; - doc_info.push_back(info); - } - log_debug("appid: %s, field: %d, count: %d", startEntry._InvertIndexAppid.c_str(), (*it).field, (int)resultEntry.size()); - } else { - int ret = GetDocIdSetByWord(*it, doc_info); - if (ret != 0){ - return -RT_GET_DOC_ERR; - } - if (doc_info.size() == 0) - continue; - if (!m_has_gis || !isAllNumber((*it).word)) - highlightWord.insert((*it).word); - if(!m_has_gis && (m_sort_type == SORT_RELEVANCE || m_sort_type == SORT_TIMESTAMP)){ - CalculateByWord(*it, doc_info, ves, key_in_doc); - } - } - doc_id_vec = vec_union(doc_id_vec, doc_info); - } - if(index == 0){ // 第一个直接赋值给vecs,后续的依次与前面的进行逻辑运算 - vecs.assign(doc_id_vec.begin(), doc_id_vec.end()); - } else { - vecs = m_func(vecs, doc_id_vec); - } - } - return 0; -} - -int LogicalOperate::ProcessTerminal(const vector >& and_keys, const TerminalQryCond& query_cond, vector& vecs){ - if(and_keys.size() != 1){ - return 0; - } - vector field_vec = and_keys[0]; - if(field_vec.size() != 1){ - return 0; - } - FieldInfo field_info = field_vec[0]; - if(field_info.segment_tag != SEGMENT_RANGE){ - return 0; - } - - stringstream ss; - ss << m_appid; - InvertIndexEntry beginEntry(ss.str(), field_info.field, (double)field_info.start); - InvertIndexEntry endEntry(ss.str(), field_info.field, (double)field_info.end); - std::vector resultEntry; - globalSyncIndexTimer->GetSearchIndex()->GetRangeIndexInTerminal(field_info.range_type, beginEntry, endEntry, query_cond, resultEntry); - std::vector::iterator iter = resultEntry.begin(); - for (; iter != resultEntry.end(); iter ++) { - TerminalRes info; - info.doc_id = (*iter)._InvertIndexDocId; - info.score = (*iter)._InvertIndexKey; - vecs.push_back(info); - } - return 0; -} - -int LogicalOperate::ProcessComplete(const vector& complete_keys, vector& complete_vecs, vector& word_vec, map &ves, map &key_in_doc){ - vector::const_iterator iter; - for (iter = complete_keys.begin(); iter != complete_keys.end(); iter++) { - vector doc_info; - int ret = GetDocIdSetByWord(*iter, doc_info); - if (ret != 0) { - return -RT_GET_DOC_ERR; - } - - word_vec.push_back((*iter).word); - - if(m_sort_type == SORT_RELEVANCE || m_sort_type == SORT_TIMESTAMP){ - CalculateByWord(*iter, doc_info, ves, key_in_doc); - } - - if(iter == complete_keys.begin()){ - complete_vecs.assign(doc_info.begin(), doc_info.end()); - } else { - complete_vecs = vec_intersection(complete_vecs, doc_info); - } - } - return 0; -} - -void LogicalOperate::CalculateByWord(FieldInfo fieldInfo, const vector &doc_info, map &ves, map &key_in_doc) { - string doc_id; - uint32_t word_freq = 0; - uint32_t field = 0; - uint32_t created_time; - string pos_str = ""; - for (size_t i = 0; i < doc_info.size(); i++) { - doc_id = doc_info[i].doc_id; - word_freq = doc_info[i].word_freq; - field = doc_info[i].field; - created_time = doc_info[i].created_time; - pos_str = doc_info[i].pos; - vector pos_vec; - if (pos_str != "" && pos_str.size() > 2) { - pos_str = pos_str.substr(1, pos_str.size() - 2); - pos_vec = splitInt(pos_str, ","); - } - KeyInfo info; - info.word_freq = word_freq; - info.field = field; - info.word = fieldInfo.word; - info.created_time = created_time; - info.pos_vec = pos_vec; - ves[doc_id].push_back(info); - } - key_in_doc[fieldInfo.word] = doc_info.size(); -} - - -bool LogicalOperate::GetDocIndexCache(string word, uint32_t field, vector &doc_info) { - log_debug("get doc index start"); - bool res = false; - uint8_t value[MAX_VALUE_LEN] = { 0 }; - unsigned vsize = 0; - string output = ""; - string indexCache = word + "|" + ToString(field); - if (m_cache_switch == 1 && indexcachelist->in_list(indexCache.c_str(), indexCache.size(), value, vsize)) - { - log_debug("hit index cache."); - value[vsize] = '\0'; - output = (char *)value; - res = true; - } - - if (res) { - Json::Value packet; - Json::Reader r(Json::Features::strictMode()); - int ret; - ret = r.parse(output.c_str(), output.c_str() + output.size(), packet); - if (0 == ret) - { - log_error("the err json string is : %s, errmsg : %s", output.c_str(), r.getFormattedErrorMessages().c_str()); - res = false; - return res; - } - - for (uint32_t i = 0; i < packet.size(); ++i) { - IndexInfo info; - Json::Value& index_cache = packet[i]; - if (index_cache.isMember("appid") && index_cache["appid"].isUInt() && - index_cache.isMember("id") && index_cache["id"].isString() && - index_cache.isMember("version") && index_cache["version"].isUInt() && - index_cache.isMember("field") && index_cache["field"].isUInt() && - index_cache.isMember("freq") && index_cache["freq"].isUInt() && - index_cache.isMember("time") && index_cache["time"].isUInt() && - index_cache.isMember("pos") && index_cache["pos"].isString()) - { - info.appid = index_cache["appid"].asUInt(); - info.doc_id = index_cache["id"].asString(); - info.doc_version = index_cache["version"].asUInt(); - info.field = index_cache["field"].asUInt(); - info.word_freq = index_cache["freq"].asUInt(); - info.created_time = index_cache["time"].asUInt(); - info.pos = index_cache["pos"].asString(); - doc_info.push_back(info); - } - else { - log_error("parse index_cache error, no appid"); - doc_info.clear(); - res = false; - break; - } - } - } - return res; -} - -void LogicalOperate::SetDocIndexCache(const vector &doc_info, string& indexJsonStr) { - Json::Value indexJson; - Json::FastWriter writer; - for (size_t i = 0; i < doc_info.size(); i++) { - Json::Value json_tmp; - json_tmp["appid"] = doc_info[i].appid; - json_tmp["id"] = doc_info[i].doc_id; - json_tmp["version"] = doc_info[i].doc_version; - json_tmp["field"] = doc_info[i].field; - json_tmp["freq"] = doc_info[i].word_freq; - json_tmp["time"] = doc_info[i].created_time; - json_tmp["pos"] = doc_info[i].pos; - indexJson.append(json_tmp); - } - indexJsonStr = writer.write(indexJson); -} - - -int LogicalOperate::GetDocIdSetByWord(FieldInfo fieldInfo, vector &doc_info) { - bool bRet = false; - if (DataManager::Instance()->IsSensitiveWord(fieldInfo.word)) { - log_debug("%s is a sensitive word.", fieldInfo.word.c_str()); - return 0; - } - - stringstream ss_key; - ss_key << m_appid; - ss_key << "#00#"; - if(fieldInfo.segment_tag == 5){ - stringstream ss; - ss << setw(20) << setfill('0') << fieldInfo.word; - ss_key << ss.str(); - } - else if (fieldInfo.field_type == FIELD_INT || fieldInfo.field_type == FIELD_DOUBLE || fieldInfo.field_type == FIELD_LONG) { - ss_key << fieldInfo.word; - } - else if (fieldInfo.field_type == FIELD_IP) { - uint32_t word_id = GetIpNum(fieldInfo.word); - if (word_id == 0) - return 0; - ss_key << word_id; - } - else if (fieldInfo.word.find("_") != string::npos) { // 联合索引 - ss_key << fieldInfo.word; - } - else { - string word_new = stem(fieldInfo.word); - ss_key << word_new; - } - - log_debug("appid [%u], key[%s]", m_appid, ss_key.str().c_str()); - if (m_has_gis && GetDocIndexCache(ss_key.str(), fieldInfo.field, doc_info)) { - return 0; - } - - bRet = g_IndexInstance.GetDocInfo(m_appid, ss_key.str(), fieldInfo.field, doc_info); - if (false == bRet) { - log_error("GetDocInfo error."); - return -RT_DTC_ERR; - } - - if (m_cache_switch == 1 && m_has_gis == 1 && doc_info.size() > 0 && doc_info.size() <= 1000) { - string index_str; - SetDocIndexCache(doc_info, index_str); - if (index_str != "" && index_str.size() < MAX_VALUE_LEN) { - string indexCache = ss_key.str() + "|" + ToString(fieldInfo.field); - unsigned data_size = indexCache.size(); - int ret = indexcachelist->add_list(indexCache.c_str(), index_str.c_str(), data_size, index_str.size()); - if (ret != 0) { - log_error("add to index_cache_list error, ret: %d.", ret); - } - else { - log_debug("add to index_cache_list: %s.", indexCache.c_str()); - } - } - } - return 0; -} \ No newline at end of file diff --git a/src/search_local/index_read/logical_operate.h b/src/search_local/index_read/logical_operate.h deleted file mode 100644 index 4d97e7c..0000000 --- a/src/search_local/index_read/logical_operate.h +++ /dev/null @@ -1,51 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: logical_operate.h - * - * Description: logical operate class definition. - * - * Version: 1.0 - * Created: 09/08/2018 - * Revision: none - * Compiler: gcc - * - * Author: zhulin, shzhulin3@jd.com - * Company: JD.com, Inc. - * - * ===================================================================================== - */ - -#include "component.h" -#include -#include -using namespace std; - -typedef vector vec; -typedef vector (*logical_func)(vector &a, vector &b); - -class LogicalOperate -{ -public: - LogicalOperate(uint32_t appid, uint32_t sort_type, uint32_t has_gis, uint32_t cache_switch); - ~LogicalOperate(); - - int Process(const vector >& keys, vector& vecs, set& highlightWord, map &ves, map &key_in_doc); - int ProcessComplete(const vector& complete_keys, vector& complete_vecs, vector& word_vec, map &ves, map &key_in_doc); - void SetFunc(logical_func func); - int ProcessTerminal(const vector >& and_keys, const TerminalQryCond& query_cond, vector& vecs); - -private: - void CalculateByWord(FieldInfo fieldInfo, const vector &doc_info, map &ves, map &key_in_doc); - void SetDocIndexCache(const vector &doc_info, string& indexJsonStr); - bool GetDocIndexCache(string word, uint32_t field, vector &doc_info); - int GetDocIdSetByWord(FieldInfo fieldInfo, vector &doc_info); - -private: - uint32_t m_appid; - uint32_t m_sort_type; - uint32_t m_has_gis; - uint32_t m_cache_switch; - logical_func m_func; -}; - diff --git a/src/search_local/index_storage/rocksdb_helper/comm_main.cc b/src/search_local/index_storage/rocksdb_helper/comm_main.cc deleted file mode 100644 index 5d1d787..0000000 --- a/src/search_local/index_storage/rocksdb_helper/comm_main.cc +++ /dev/null @@ -1,392 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: comm_main.cc - * - * Description: - * - * Version: 1.0 - * Created: 09/08/2020 10:02:05 PM - * Revision: none - * Compiler: gcc - * - * Author: Norton, yangshuang68@jd.com - * Company: JD.com, Inc. - * - * ===================================================================================== - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include "comm_process.h" -#include -#include -#include -#include -#include -#include - -const char service_file[] = "./helper-service.so"; -const char create_handle_name[] = "create_process"; -const char progname[] = "custom-helper"; -const char usage_argv[] = "machId addr [port]"; -char cacheFile[256] = CACHE_CONF_NAME; -char tableFile[256] = TABLE_CONF_NAME; - -static CreateHandle CreateHelper = NULL; -static CommHelper *helperProc; -static unsigned int procTimeout; - -class HelperMain -{ -public: - HelperMain(CommHelper *helper) : h(helper){}; - - void attach(DTCTask *task) { h->attach((void *)task); } - void init_title(int group, int role) { h->init_title(group, role); } - void set_title(const char *status) { h->SetTitle(status); } - const char *Name() { return h->Name(); } - int pre_init(int gid, int r) - { - if (dbConfig->machineCnt <= gid) - { - log_error("parse config error, machineCnt[%d] <= GroupID[%d]", dbConfig->machineCnt, gid); - return (-1); - } - h->GroupID = gid; - h->Role = r; - h->_dbconfig = dbConfig; - h->_tdef = gTableDef[0]; - h->_config = gConfig; - h->_server_string = dbConfig->mach[gid].role[r].addr; - h->logapi.init_target(h->_server_string); - return 0; - } - -private: - CommHelper *h; -}; - -static int load_service(const char *dll_file) -{ - void *dll; - - dll = dlopen(dll_file, RTLD_NOW | RTLD_GLOBAL); - if (dll == (void *)NULL) - { - log_crit("dlopen(%s) error: %s", dll_file, dlerror()); - return -1; - } - - CreateHelper = (CreateHandle)dlsym(dll, create_handle_name); - if (CreateHelper == NULL) - { - log_crit("function[%s] not found", create_handle_name); - return -2; - } - - return 0; -} - -static int sync_decode(DTCTask *task, int netfd, CommHelper *helperProc) -{ - SimpleReceiver receiver(netfd); - int code; - do - { - code = task->Decode(receiver); - if (code == DecodeFatalError) - { - if (errno != 0) - log_notice("decode fatal error, fd=%d, %m", netfd); - return -1; - } - if (code == DecodeDataError) - { - if (task->result_code() == 0 || task->result_code() == -EC_EXTRA_SECTION_DATA) // -EC_EXTRA_SECTION_DATA verify package - return 0; - log_notice("decode error, fd=%d, %d", netfd, task->result_code()); - return -1; - } - HelperMain(helperProc).set_title("Receiving..."); - } while (!stop && code != DecodeDone); - - if (task->result_code() < 0) - { - log_notice("register result, fd=%d, %d", netfd, task->result_code()); - return -1; - } - return 0; -} - -static int sync_send(Packet *reply, int netfd) -{ - int code; - do - { - code = reply->Send(netfd); - if (code == SendResultError) - { - log_notice("send error, fd=%d, %m", netfd); - return -1; - } - } while (!stop && code != SendResultDone); - - return 0; -} - -static void alarm_handler(int signo) -{ - if (background == 0 && getppid() == 1) - exit(0); - alarm(10); -} - -static int accept_connection(int fd) -{ - HelperMain(helperProc).set_title("listener"); - signal(SIGALRM, alarm_handler); - while (!stop) - { - alarm(10); - int newfd; - if ((newfd = accept(fd, NULL, 0)) >= 0) - { - alarm(0); - return newfd; - } - if (newfd < 0 && errno == EINVAL) - { - if (getppid() == (pid_t)1) - { // Ѿ˳ - log_error("dtc father process not exist. helper[%d] exit now.", getpid()); - exit(0); - } - usleep(10000); - } - } - exit(0); -} - -static void proc_timeout_handler(int signo) -{ - log_error("comm-helper process timeout(more than %u seconds), helper[pid: %d] exit now.", procTimeout, getpid()); - exit(-1); -} - -struct THelperProcParameter -{ - int netfd; - int gid; - int role; -}; - -static int helper_proc_run(struct THelperProcParameter *args) -{ - // close listen fd - close(0); - open("/dev/null", O_RDONLY); - - HelperMain(helperProc).set_title("Initializing..."); - - if (procTimeout > 0) - signal(SIGALRM, proc_timeout_handler); - - alarm(procTimeout); - if (helperProc->Init() != 0) - { - log_error("%s", "helper process init failed"); - exit(-1); - } - - alarm(0); - - unsigned int timeout; - - while (!stop) - { - HelperMain(helperProc).set_title("Waiting..."); - DTCTask *task = new DTCTask(gTableDef[0]); - if (sync_decode(task, args->netfd, helperProc) < 0) - { - delete task; - break; - } - - if (task->result_code() == 0) - { - switch (task->request_code()) - { - case DRequest::Insert: - case DRequest::Update: - case DRequest::Delete: - case DRequest::Replace: - timeout = 2 * procTimeout; - default: - timeout = procTimeout; - } - HelperMain(helperProc).attach(task); - alarm(timeout); - helperProc->Execute(); - alarm(0); - } - - HelperMain(helperProc).set_title("Sending..."); - Packet *reply = new Packet; - reply->encode_result(task); - - delete task; - if (sync_send(reply, args->netfd) < 0) - { - delete reply; - break; - } - delete reply; - } - close(args->netfd); - HelperMain(helperProc).set_title("Exiting..."); - - delete helperProc; - daemon_cleanup(); -#if MEMCHECK - log_info("%s v%s: stopped", progname, version); - dump_non_delete(); - log_debug("memory allocated %lu virtual %lu", count_alloc_size(), count_virtual_size()); -#endif - exit(0); - return 0; -} - -int main(int argc, char **argv) -{ - init_proc_title(argc, argv); - if (dtc_daemon_init(argc, argv) < 0) - return -1; - - argc -= optind; - argv += optind; - - struct THelperProcParameter helperArgs = {0, 0, 0}; - char *addr = NULL; - - if (argc > 0) - { - char *p; - helperArgs.gid = strtol(argv[0], &p, 0); - if (*p == '\0' || *p == MACHINEROLESTRING[0]) - helperArgs.role = 0; - else if (*p == MACHINEROLESTRING[1]) - helperArgs.role = 1; - else - { - log_error("Bad machine id: %s", argv[0]); - return -1; - } - } - - if (argc != 2 && argc != 3) - { - show_usage(); - return -1; - } - - int backlog = gConfig->get_int_val("cache", "MaxListenCount", 256); - int helperTimeout = gConfig->get_int_val("cache", "HelperTimeout", 30); - if (helperTimeout > 1) - procTimeout = helperTimeout - 1; - else - procTimeout = 0; - addr = argv[1]; - - // load dll file - const char *file = getenv("HELPER_SERVICE_FILE"); - if (file == NULL || file[0] == 0) - file = service_file; - if (load_service(file) != 0) - return -1; - - helperProc = CreateHelper(); - if (helperProc == NULL) - { - log_crit("create helper error"); - return -1; - } - if (HelperMain(helperProc).pre_init(helperArgs.gid, helperArgs.role) < 0) - { - log_error("%s", "helper prepare init failed"); - exit(-1); - } - if (helperProc->global_init() != 0) - { - log_crit("helper gobal-init error"); - return -1; - } - - int fd = -1; - if (!strcmp(addr, "-")) - fd = 0; - else - { - if (strcasecmp(gConfig->get_str_val("cache", "CacheShmKey") ?: "", "none") != 0) - { - log_warning("standalone %s need CacheShmKey set to NONE", progname); - return -1; - } - - SocketAddress sockaddr; - const char *err = sockaddr.set_address(addr, argc == 2 ? NULL : argv[2]); - if (err) - { - log_warning("host %s port %s: %s", addr, argc == 2 ? "NULL" : argv[2], err); - return -1; - } - if (sockaddr.socket_type() != SOCK_STREAM) - { - log_warning("standalone %s don't support UDP protocol", progname); - return -1; - } - fd = sock_bind(&sockaddr, backlog); - if (fd < 0) - return -1; - } - - daemon_start(); - -#if HAS_LOGAPI - helperProc->logapi.Init( - gConfig->get_int_val("LogApi", "MessageId", 0), - gConfig->get_int_val("LogApi", "CallerId", 0), - gConfig->get_int_val("LogApi", "TargetId", 0), - gConfig->get_int_val("LogApi", "InterfaceId", 0)); -#endif - - HelperMain(helperProc).init_title(helperArgs.gid, helperArgs.role); - if (procTimeout > 1) - helperProc->set_proc_timeout(procTimeout - 1); - while (!stop) - { - helperArgs.netfd = accept_connection(fd); - - watch_dog_fork(HelperMain(helperProc).Name(), (int (*)(void *))helper_proc_run, (void *)&helperArgs); - - close(helperArgs.netfd); - } - - if (fd > 0 && addr && addr[0] == '/') - unlink(addr); - return 0; -} diff --git a/src/search_local/index_storage/rocksdb_helper/rocksdb_direct_context.h b/src/search_local/index_storage/rocksdb_helper/rocksdb_direct_context.h deleted file mode 100644 index d1dae51..0000000 --- a/src/search_local/index_storage/rocksdb_helper/rocksdb_direct_context.h +++ /dev/null @@ -1,260 +0,0 @@ - -/* - * ===================================================================================== - * - * Filename: rocksdb_direct_context.h - * - * Description: - * - * Version: 1.0 - * Created: 09/08/2020 10:02:05 PM - * Revision: none - * Compiler: gcc - * - * Author: zhuyao, zhuyao28@jd.com - * Company: JD.com, Inc. - * - * ===================================================================================== - */ -#ifndef __ROCKSDB_DIRECT_CONTEXT_H__ -#define __ROCKSDB_DIRECT_CONTEXT_H__ - -#if 1 - -#include -#include -#include -#include -#include -#include - -static const uint16_t sgMagicNum = 12345; // global magic number - -// operator must be matched with DTC with the same order -enum class CondOpr : uint8_t -{ - eEQ, // == 0 - eNE, // != 1 - eLT, // < 2 - eLE, // <= 3 - eGT, // > 4 - eGE // >= 5 -}; - -bool operator==(const CondOpr lc, const CondOpr rc) -{ - return (int)lc == (int)rc; -} - -#pragma pack(push, 1) -struct QueryCond -{ - uint8_t sFieldIndex; - uint8_t sCondOpr; // CondOpr - // uint8_t sCondValueLen; - // char sCondValue[]; - std::string sCondValue; - -private: - int binary_size() - { - static int fixHeaderLen = sizeof(sFieldIndex) + sizeof(sCondOpr) + sizeof(int) /* value len */; - return fixHeaderLen + sCondValue.length(); - } - - void serialize_to(char *&data) - { - *(uint8_t *)data = sFieldIndex; - data += sizeof(uint8_t); - - *(uint8_t *)data = sCondOpr; - data += sizeof(uint8_t); - - *(int *)data = sCondValue.length(); - data += sizeof(int); - - memmove((void *)data, (void *)sCondValue.c_str(), sCondValue.length()); - data += sCondValue.length(); - } - - void serialize_from(const char *data, int &condLen) - { - const char *begPos = data; - - sFieldIndex = *(uint8_t *)data; - data += sizeof(uint8_t); - - sCondOpr = *(uint8_t *)data; - data += sizeof(uint8_t); - - int len = *(int *)data; - data += sizeof(int); - - sCondValue.assign(data, len); - condLen = data - begPos + len; - } - - friend class DirectRequestContext; -}; - -struct LimitCond -{ - int sLimitStart = -1; - int sLimitStep = -1; - - void reset() { sLimitStart = -1, sLimitStep = -1; } -}; - -struct DirectRequestContext -{ - uint16_t sMagicNum; - uint64_t sSequenceId; - // uint8_t sCondValueNum; - // char sContextValue[]; - std::vector sFieldConds; - std::vector> sOrderbyFields; - LimitCond sLimitCond; - - void reset() - { - sMagicNum = 0; - sSequenceId = 0; - sFieldConds.clear(); - sLimitCond.reset(); - } - - // binary format size for transporting in - int binary_size() - { - static int fixHeaderLen = sizeof(sMagicNum) + sizeof(sSequenceId) + sizeof(uint8_t) * 2; - - int len = fixHeaderLen; - for (size_t idx = 0; idx < sFieldConds.size(); idx++) - { - len += sFieldConds[idx].binary_size(); - } - - for (size_t idx = 0; idx < sOrderbyFields.size(); idx++) - { - len += (sizeof(int) + sizeof(bool)); - } - len += sizeof(LimitCond); - - return len; - } - - // before call this function, should call 'binary_size' to evaluate the size of the struct - void serialize_to(char *data, int len) - { - *(uint16_t *)data = sMagicNum; - data += sizeof(uint16_t); - - *(uint64_t *)data = sSequenceId; - data += sizeof(uint64_t); - - *(uint8_t *)data = sFieldConds.size(); - data += sizeof(uint8_t); - for (size_t idx = 0; idx < sFieldConds.size(); idx++) - { - sFieldConds[idx].serialize_to(data); - } - - *(uint8_t *)data = sOrderbyFields.size(); - data += sizeof(uint8_t); - for (size_t idx = 0; idx < sOrderbyFields.size(); idx++) - { - *(int *)data = sOrderbyFields[idx].first; - data += sizeof(int); - - *(bool *)data = sOrderbyFields[idx].second; - data += sizeof(bool); - } - - memmove((void *)data, (void *)&sLimitCond, sizeof(LimitCond)); - data += sizeof(LimitCond); - } - - void serialize_from(const char *data, int dataLen) - { - sMagicNum = *(uint16_t *)data; - data += sizeof(uint16_t); - dataLen -= sizeof(uint16_t); - - sSequenceId = *(uint64_t *)data; - data += sizeof(uint64_t); - dataLen -= sizeof(uint64_t); - - uint8_t condNum = *(uint8_t *)data; - data += sizeof(uint8_t); - dataLen -= sizeof(uint8_t); - - QueryCond cond; - int condLen = 0; - for (uint8_t idx = 0; idx < condNum; idx++) - { - cond.serialize_from(data, condLen); - data += condLen; - dataLen -= condLen; - - sFieldConds.push_back(cond); - } - - std::pair orPair; - uint8_t orderNum = *(uint8_t *)data; - data += sizeof(uint8_t); - dataLen -= sizeof(uint8_t); - for (uint8_t idx = 0; idx < orderNum; idx++) - { - orPair.first = *(int *)data; - data += sizeof(int); - dataLen -= sizeof(int); - - orPair.second = *(bool *)data; - data += sizeof(bool); - dataLen -= sizeof(bool); - - sOrderbyFields.push_back(orPair); - } - - memmove((void *)&sLimitCond, (void *)data, sizeof(LimitCond)); - dataLen -= sizeof(LimitCond); - - assert(dataLen == 0); - } -}; - -struct DirectResponseContext -{ - uint16_t sMagicNum; - uint64_t sSequenceId; - int16_t sRowNums; // number of matched rows or errno - std::deque sRowValues; - // char* sRowValues; - - void serialize_to(std::string &data) - { - static int headerLen = sizeof(uint16_t) + sizeof(uint64_t) + sizeof(int16_t); - - data.clear(); - data = std::move(std::string((char *)this, headerLen)); - - for (size_t idx = 0; idx < sRowNums; idx++) - { - data.append(sRowValues.front()); - sRowValues.pop_front(); - } - } - - void free() - { - sMagicNum = 0; - sSequenceId = 0; - sRowNums = -1; - sRowValues.clear(); - } -}; -#pragma pack(pop) - -#endif - -#endif // __ROCKSDB_DIRECT_CONTEXT_H__ diff --git a/src/search_local/index_write/add_request_proc.cc b/src/search_local/index_write/add_request_proc.cc deleted file mode 100644 index b9817e5..0000000 --- a/src/search_local/index_write/add_request_proc.cc +++ /dev/null @@ -1,529 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: add_request_proc.cc - * - * Description: AddReqProc class definition. - * - * Version: 1.0 - * Created: 09/08/2020 10:02:05 PM - * Revision: none - * Compiler: gcc - * - * Author: shrewdlin, linjinming@jd.com - * Company: JD.com, Inc. - * - * ===================================================================================== - */ - -#include "add_request_proc.h" -#include "index_tbl_op.h" -#include "geohash.h" -#include "split_manager.h" -#include -#include -#include -#include - -AddReqProc::AddReqProc(){ -} - -AddReqProc::AddReqProc(const Json::Value& jf, InsertParam& insert_param){ - doc_version = insert_param.doc_version; - trans_version = insert_param.trans_version; - app_id = insert_param.appid; - doc_id = insert_param.doc_id; - json_field = jf; -} - -AddReqProc::~AddReqProc(){ -} - -void AddReqProc::do_stat_word_freq(vector > &strss, map &word_map, string extend) { - string word; - uint32_t id = 0; - ostringstream oss; - vector >::iterator iters = strss.begin(); - uint32_t index = 0; - - for(;iters != strss.end(); iters++){ - index++; - vector::iterator iter = iters->begin(); - - log_debug("start do_stat_word_freq, appid = %u\n",app_id); - for (; iter != iters->end(); iter++) { - - word = *iter; - if (!SplitManager::Instance()->wordValid(word, app_id, id)){ - continue; - } - if (word_map.find(word) == word_map.end()) { - item it; - it.doc_id = doc_id; - it.freq = 1; - it.extend = extend; - it.indexs.push_back(index); - word_map.insert(make_pair(word, it)); - } - else { - word_map[word].freq++; - word_map[word].indexs.push_back(index); - } - - oss << (*iter) << "|"; - } - } - log_debug("split: %s",oss.str().c_str()); -} - -void AddReqProc::do_stat_word_freq(vector &strss, map &word_map) { - string word; - vector::iterator iters = strss.begin(); - uint32_t index = 0; - - for (; iters != strss.end(); iters++) { - index++; - word = *iters; - if (word_map.find(word) == word_map.end()) { - item it; - it.doc_id = doc_id; - it.freq = 1; - it.indexs.push_back(index); - word_map.insert(make_pair(word, it)); - } - else { - word_map[word].freq++; - word_map[word].indexs.push_back(index); - } - } -} - -int AddReqProc::deal_index_tag(struct table_info *tbinfo, string field_name){ - int ret =0; - map word_map; - vector > split_content; - switch(tbinfo->field_type){ - case FIELD_STRING: - case FIELD_TEXT: - if(json_field[field_name].isString()){ - if (tbinfo->segment_tag == SEGMENT_NGRAM) { // NGram split mode - vector ngram_content = SplitManager::Instance()->split(json_field[field_name].asString()); - do_stat_word_freq(ngram_content, word_map); - } - else if (tbinfo->segment_tag == SEGMENT_CHINESE || tbinfo->segment_tag == SEGMENT_ENGLISH) { // use intelligent_info - string str = json_field[field_name].asString(); - // segment_tag为3对应的字段内容必须为全中文,为4对应的的字段不能包含中文 - if (tbinfo->segment_tag == SEGMENT_CHINESE && allChinese(str) == false) { - log_error("segment_tag is 3, the content[%s] must be Chinese.", str.c_str()); - return RT_ERROR_FIELD_FORMAT; - } - if (tbinfo->segment_tag == SEGMENT_ENGLISH && noChinese(str) == false) { - log_error("segment_tag is 4, the content[%s] can not contain Chinese.", str.c_str()); - return RT_ERROR_FIELD_FORMAT; - } - item it; - it.doc_id = doc_id; - it.freq = 1; - if(tbinfo->segment_feature == SEGMENT_FEATURE_SNAPSHOT){ - Json::FastWriter ex_writer; - it.extend = ex_writer.write(snapshot_content); - } - word_map.insert(make_pair(str, it)); - vector info; - bool flag = false; - get_intelligent(str, info, flag); - if (flag) { - stringstream ss; - ss << app_id << "#" << tbinfo->field_value; - ret = g_hanpinIndexInstance.do_insert_intelligent(ss.str(), doc_id, str, info, doc_version); - if(0 != ret){ - roll_back(); - return ret; - } - intelligent_keys.push_back(ss.str()); - } - } - else { - split_content = SplitManager::Instance()->split(json_field[field_name].asString(), app_id); - string extend = ""; - if(tbinfo->segment_feature == SEGMENT_FEATURE_SNAPSHOT){ - Json::FastWriter ex_writer; - extend = ex_writer.write(snapshot_content); - } - do_stat_word_freq(split_content, word_map, extend); - split_content.clear(); - } - ret = g_IndexInstance.do_insert_index(word_map, app_id, doc_version, tbinfo->field_value, docid_index_map); - if (0 != ret) { - roll_back(); - return ret; - } - word_map.clear(); - }else{ - log_error("field type error, not FIELD_STRING."); - return RT_ERROR_FIELD_FORMAT; - } - break; - case FIELD_INT: - if(json_field[field_name].isInt()){ - int ret; - struct item it; - it.doc_id = doc_id; - it.freq = 0; - string key = ""; - if(tbinfo->segment_tag == SEGMENT_RANGE){ // 范围查的字段将key补全到20位 - stringstream ss; - ss << setw(20) << setfill('0') << json_field[field_name].asInt(); - key = gen_dtc_key_string(app_id, "00", ss.str()); - } else { - key = gen_dtc_key_string(app_id, "00", (uint32_t)json_field[field_name].asInt()); - } - ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); - if(ret != 0){ - roll_back(); - return RT_ERROR_INSERT_INDEX_DTC; - } - }else{ - log_error("field type error, not FIELD_INT."); - return RT_ERROR_FIELD_FORMAT; - } - break; - case FIELD_LONG: - if(json_field[field_name].isInt64()){ - struct item it; - it.doc_id = doc_id; - it.freq = 0; - string key = gen_dtc_key_string(app_id, "00", (int64_t)json_field[field_name].asInt64()); - int ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); - if(0 != ret){ - roll_back(); - log_error("insert_index_dtc error, appid[%d], key[%s]", app_id, key.c_str()); - return RT_ERROR_INSERT_INDEX_DTC; - } - } else { - log_error("field type error, not FIELD_LONG."); - return RT_ERROR_FIELD_FORMAT; - } - break; - case FIELD_DOUBLE: - if(json_field[field_name].isDouble()){ - struct item it; - it.doc_id = doc_id; - it.freq = 0; - string key = gen_dtc_key_string(app_id, "00", (double)json_field[field_name].asDouble()); - int ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); - if(0 != ret){ - roll_back(); - log_error("insert_index_dtc error, appid[%d], key[%s]", app_id, key.c_str()); - return RT_ERROR_INSERT_INDEX_DTC; - } - } else { - log_error("field type error, not FIELD_DOUBLE."); - return RT_ERROR_FIELD_FORMAT; - } - break; - case FIELD_IP: - uint32_t s; - int ret; - if(json_field[field_name].isString()){ - ret = inet_pton(AF_INET, json_field[field_name].asString().c_str(), (void *)&s); - if(ret == 0){ - log_error("ip format is error\n"); - return RT_ERROR_FIELD_FORMAT; - } - struct item it; - it.doc_id = doc_id; - it.freq = 0; - string key = gen_dtc_key_string(app_id, "00", ntohl(s)); - ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); - if(ret != 0){ - roll_back(); - return RT_ERROR_INSERT_INDEX_DTC; - } - }else{ - return RT_ERROR_FIELD_FORMAT; - } - break; - case FIELD_LNG: - if(json_field[field_name].isString()){ - lng = json_field[field_name].asString(); - }else{ - return RT_ERROR_FIELD_FORMAT; - } - break; - case FIELD_LAT: - if(json_field[field_name].isString()){ - lat = json_field[field_name].asString(); - }else{ - return RT_ERROR_FIELD_FORMAT; - } - break; - case FIELD_LNG_ARRAY: - if(json_field[field_name].isArray()){ - Json::Value lngs = json_field[field_name]; - for (uint32_t lng_idx = 0; lng_idx < lngs.size(); ++lng_idx) { - if (lngs[lng_idx].isString()){ - lng_arr.push_back(lngs[lng_idx].asString()); - } else { - log_error("longitude must be string"); - return RT_ERROR_FIELD_FORMAT; - } - } - }else{ - log_error("FIELD_LNG_ARRAY must be array"); - return RT_ERROR_FIELD_FORMAT; - } - break; - case FIELD_LAT_ARRAY: - if(json_field[field_name].isArray()){ - Json::Value lats = json_field[field_name]; - for (uint32_t lat_idx = 0; lat_idx < lats.size(); ++lat_idx) { - if (lats[lat_idx].isString()){ - lat_arr.push_back(lats[lat_idx].asString()); - } else { - log_error("latitude must be string"); - return RT_ERROR_FIELD_FORMAT; - } - } - }else{ - log_error("FIELD_LAT_ARRAY must be array"); - return RT_ERROR_FIELD_FORMAT; - } - break; - case FIELD_WKT: - if(json_field[field_name].isString()){ - string str = json_field[field_name].asString(); - str = delPrefix(str); - vector str_vec = splitEx(str, ","); - for(uint32_t str_vec_idx = 0; str_vec_idx < str_vec.size(); str_vec_idx++){ - string wkt_str = trim(str_vec[str_vec_idx]); - vector wkt_vec = splitEx(wkt_str, " "); - if(wkt_vec.size() == 2){ - lng_arr.push_back(wkt_vec[0]); - lat_arr.push_back(wkt_vec[1]); - } - } - } else { - log_error("FIELD_WKT must be string"); - return RT_ERROR_FIELD_FORMAT; - } - break; - default: - break; - } - return 0; -} - -int AddReqProc::do_insert_index(UserTableContent& content_fields){ - int ret = 0; - Json::Value::Members member = json_field.getMemberNames(); - Json::Value::Members::iterator iter = member.begin(); - for(; iter != member.end(); ++iter) - { - string field_name = *iter; - struct table_info *tbinfo = NULL; - tbinfo = SplitManager::Instance()->get_table_info(app_id, field_name); - if(tbinfo == NULL){ - continue; - } - if(tbinfo->snapshot_tag == 1){ //snapshot - if(tbinfo->field_type == 1 && json_field[field_name].isInt()){ - snapshot_content[field_name] = json_field[field_name].asInt(); - }else if(tbinfo->field_type > 1 && json_field[field_name].isString()){ - snapshot_content[field_name] = json_field[field_name].asString(); - }else if(tbinfo->field_type > 1 && json_field[field_name].isDouble()){ - snapshot_content[field_name] = json_field[field_name].asDouble(); - }else if(tbinfo->field_type > 1 && json_field[field_name].isInt64()){ - snapshot_content[field_name] = json_field[field_name].asInt64(); - }else if(tbinfo->field_type > 1 && json_field[field_name].isArray()){ - snapshot_content[field_name] = json_field[field_name]; - } - } - } - for(iter = member.begin(); iter != member.end(); ++iter) - { - string field_name = *iter; - struct table_info *tbinfo = NULL; - tbinfo = SplitManager::Instance()->get_table_info(app_id, field_name); - if(tbinfo == NULL){ - continue; - } - if(tbinfo->index_tag == 1){ - ret = deal_index_tag(tbinfo, field_name); - if(0 != ret){ - log_error("deal index tag process error, ret: %d", ret); - roll_back(); - return ret; - } - } - } - if(lng.length() != 0 && lat.length() != 0){ - struct table_info *tbinfo = NULL; - tbinfo = SplitManager::Instance()->get_table_info(app_id, "gis"); - if(tbinfo == NULL){ - roll_back(); - return RT_NO_GIS_DEFINE; - } - - string gisid = encode(atof(lat.c_str()), atof(lng.c_str()), 6); - log_debug("gis code = %s",gisid.c_str()); - int ret; - uint64_t id = 0; - struct item it; - it.doc_id = doc_id; - it.freq = 0; - Json::FastWriter gis_writer; - it.extend = gis_writer.write(snapshot_content); - string key = gen_dtc_key_string(app_id, "00", gisid); - ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); - if(ret != 0){ - roll_back(); - return RT_ERROR_INSERT_INDEX_DTC; - } - log_debug("id = %llu,doc_vesion = %d,docid = %s\n",(long long unsigned int)id,doc_version,it.doc_id.c_str()); - } - log_debug("lng_arr size: %d, lat_arr size: %d", (int)lng_arr.size(), (int)lat_arr.size()); - if(lng_arr.size() > 0 && lat_arr.size() > 0){ - if(lng_arr.size() != lat_arr.size()){ - log_error("lng_arr size not equal with lat_arr size"); - return RT_ERROR_FIELD_FORMAT; - } - set gis_set; - for(uint32_t arr_idx = 0; arr_idx < lng_arr.size(); arr_idx++){ - string tmp_lng = lng_arr[arr_idx]; - string tmp_lat = lat_arr[arr_idx]; - struct table_info *tbinfo = NULL; - tbinfo = SplitManager::Instance()->get_table_info(app_id, "gis"); - if(tbinfo == NULL){ - roll_back(); - log_error("gis field not defined"); - return RT_NO_GIS_DEFINE; - } - string gisid = encode(atof(tmp_lat.c_str()), atof(tmp_lng.c_str()), 6); - if(gis_set.find(gisid) != gis_set.end()){ - continue; - } - gis_set.insert(gisid); - struct item it; - it.doc_id = doc_id; - it.freq = 0; - Json::FastWriter gis_writer; - it.extend = gis_writer.write(snapshot_content); - string key = gen_dtc_key_string(app_id, "00", gisid); - int ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); - if(ret != 0){ - roll_back(); - return RT_ERROR_INSERT_INDEX_DTC; - } - log_debug("gis code = %s,doc_vesion = %d,docid = %s\n",gisid.c_str(),doc_version,it.doc_id.c_str()); - } - } - - vector union_key_vec; - SplitManager::Instance()->getUnionKeyField(app_id, union_key_vec); - vector::iterator union_key_iter = union_key_vec.begin(); - for(; union_key_iter != union_key_vec.end(); union_key_iter++){ - string union_key = *union_key_iter; - vector union_field_vec = splitInt(union_key, ","); - vector::iterator union_field_iter = union_field_vec.begin(); - vector > keys_vvec; - for(; union_field_iter != union_field_vec.end(); union_field_iter++){ - int union_field_value = *union_field_iter; - if(union_field_value >= (int)docid_index_map.size()){ - log_error("appid[%d] field[%d] is invalid", app_id, *union_field_iter); - break; - } - vector key_vec; - if(!docid_index_map[union_field_value].isArray()){ - log_debug("doc_id[%s] union_field_value[%d] has no keys", doc_id.c_str(), union_field_value); - break; - } - for (int key_index = 0; key_index < (int)docid_index_map[union_field_value].size(); key_index++){ - if(docid_index_map[union_field_value][key_index].isString()){ - string union_index_key = docid_index_map[union_field_value][key_index].asString(); - if(union_index_key.size() > 9){ // 倒排key的格式为:10061#00#折扣,这里只取第二个#后面的内容 - key_vec.push_back(union_index_key.substr(9)); - } - } - } - keys_vvec.push_back(key_vec); - } - if(keys_vvec.size() != union_field_vec.size()){ - log_debug("keys_vvec.size not equal union_field_vec.size"); - break; - } - vector union_keys = combination(keys_vvec); - for(int m = 0 ; m < (int)union_keys.size(); m++){ - ret = g_IndexInstance.insert_union_index_dtc(union_keys[m], doc_id, app_id, doc_version); - if(ret != 0){ - log_error("insert union key[%s] error", union_keys[m].c_str()); - } - } - } - - Json::FastWriter writer; - content_fields.content = writer.write(snapshot_content); - Json::FastWriter doc_index_writer; - string doc_index_map_string = doc_index_writer.write(docid_index_map); - if(doc_version != 1){//need update - map > index_res; - g_IndexInstance.GetIndexData(gen_dtc_key_string(content_fields.appid, "20", doc_id), doc_version - 1, index_res); - map >::iterator map_iter = index_res.begin(); - for(; map_iter != index_res.end(); map_iter++){ - uint32_t field = map_iter->first; - vector words = map_iter->second; - for(int i = 0; i < (int)words.size(); i++){ - DeleteTask::GetInstance().RegisterInfo(words[i], doc_id, doc_version - 1, field); - } - } - - int affected_rows = 0; - ret = g_IndexInstance.update_sanpshot_dtc(content_fields, doc_version, trans_version, affected_rows); - if(ret != 0 || affected_rows == 0){ - log_error("update_sanpshot_dtc error, roll back, ret: %d, affected_rows: %d.", ret, affected_rows); - roll_back(); - return RT_ERROR_UPDATE_SNAPSHOT; - } - g_IndexInstance.update_docid_index_dtc(doc_index_map_string, doc_id, app_id, doc_version); - }else{ - int affected_rows = 0; - ret = g_IndexInstance.update_sanpshot_dtc(content_fields, doc_version, trans_version, affected_rows); - if(ret != 0 || affected_rows == 0){ - log_error("update_sanpshot_dtc error, roll back, ret: %d, affected_rows: %d.", ret, affected_rows); - roll_back(); - return RT_ERROR_UPDATE_SNAPSHOT; - } - g_IndexInstance.insert_docid_index_dtc(doc_index_map_string, doc_id, app_id, doc_version); - } - return 0; -} - -int AddReqProc::roll_back(){ - // 删除hanpin_index - for(int i = 0; i < (int)intelligent_keys.size(); i++){ - g_hanpinIndexInstance.delete_intelligent(intelligent_keys[i], doc_id, trans_version); - } - - // 删除keyword_index - if(docid_index_map.isArray()){ - for(int i = 0;i < (int)docid_index_map.size();i++){ - Json::Value info = docid_index_map[i]; - if(info.isArray()){ - for(int j = 0;j < (int)info.size();j++){ - if(info[j].isString()){ - string key = info[j].asString(); - g_IndexInstance.delete_index(key, doc_id, trans_version, i); - } - } - } - } - } - // 如果trans_version=1,删除快照,否则更新快照的trans_version=trans_version-1 - Json::Value res; - if(trans_version == 1){ - g_IndexInstance.delete_snapshot_dtc(doc_id, app_id, res); - } else { - g_IndexInstance.update_sanpshot_dtc(app_id, doc_id, trans_version); - } - return 0; -} \ No newline at end of file diff --git a/src/search_local/index_write/add_request_proc.h b/src/search_local/index_write/add_request_proc.h deleted file mode 100644 index 68512e7..0000000 --- a/src/search_local/index_write/add_request_proc.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: add_request_proc.h - * - * Description: AddReqProc class definition. - * - * Version: 1.0 - * Created: 09/08/2020 10:02:05 PM - * Revision: none - * Compiler: gcc - * - * Author: shrewdlin, linjinming@jd.com - * Company: JD.com, Inc. - * - * ===================================================================================== - */ - -#ifndef ADD_REQUEST_PROC_H -#define ADD_REQUEST_PROC_H - -#include "log.h" -#include "json/json.h" -#include "comm.h" - -class UserTableContent; -class SplitManager; -class AddReqProc -{ -public: - AddReqProc(); - AddReqProc(const Json::Value& jf, InsertParam& insert_param); - ~AddReqProc(); - - int do_insert_index(UserTableContent& content_fields); - -private: - void do_stat_word_freq(vector > &strss, map &word_map, string extend); - void do_stat_word_freq(vector &strss, map &word_map); - int deal_index_tag(struct table_info *tbinfo, string field_name); - int roll_back(); - -private: - Json::Value json_field; - uint32_t app_id; - uint32_t doc_version; - uint32_t trans_version; - string doc_id; - string lng; - string lat; - vector lng_arr; - vector lat_arr; - vector intelligent_keys; - Json::Value snapshot_content; - Json::Value docid_index_map; -}; - -#endif \ No newline at end of file