diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..44ff4a0 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,34 @@ +compiler: +- g++ +os: +- linux +addons: + apt: + packages: + - g++-4.8.5 +env: +- ISEARCH_EVAL="CC=gcc-4.8.5 && CXX=g++-4.8.5" + +before_install: +- eval "${ISEARCH_EVAL}" + +install: +- echo ${CC} +- ${CC} --version +- echo ${CXX} +- ${CXX} --version +- cmake --version +- sudo apt-get install snappy libsnappy-dev zlib1g zlib1g-dev bzip2 liblz4-dev libasan0 openssl libmxml-dev + +script: +- cmake . +- make + +after_success: +- cp src/search_agent/bin/search_agent dockerfiles/agent/ +- cp resource/search_agent/conf/sa.conf dockerfiles/agent/ +- cd dockerfiles/agent +- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin + +- docker build -t $DOCKER_USERNAME/search_agent:latest . +- docker push $DOCKER_USERNAME/search_agent:latest \ No newline at end of file diff --git a/dockerfiles/agent/Dockerfile b/dockerfiles/agent/Dockerfile new file mode 100644 index 0000000..f0971b2 --- /dev/null +++ b/dockerfiles/agent/Dockerfile @@ -0,0 +1,12 @@ +FROM centos:centos7.2.1511 + +ARG basepath=/usr/local/isearch/search_agent + +RUN mkdir -p $basepath/bin +RUN mkdir -p $basepath/conf +RUN mkdir -p $basepath/log + +COPY search_agent $basepath/bin/search_agent +COPY sa.conf $basepath/conf/sa.conf + +CMD /usr/local/isearch/search_agent/bin/search_agent -d -c /usr/local/isearch/search_agent/conf/sa.conf -v 3 \ No newline at end of file diff --git a/src/search_local/index_read/component.cc b/src/search_local/index_read/component.cc new file mode 100644 index 0000000..2e5276e --- /dev/null +++ b/src/search_local/index_read/component.cc @@ -0,0 +1,725 @@ +/* + * ===================================================================================== + * + * Filename: component.h + * + * Description: component class definition. + * + * Version: 1.0 + * Created: 09/08/2019 + * Revision: none + * Compiler: gcc + * + * Author: zhulin, shzhulin3@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ + +#include "component.h" +#include "split_manager.h" +#include "db_manager.h" +#include "utf8_str.h" +#include +using namespace std; + +Component::Component(){ + SGlobalConfig &global_cfg = SearchConf::Instance()->GetGlobalConfig(); + m_default_query = global_cfg.sDefaultQuery; + m_jdq_switch = global_cfg.iJdqSwitch; + m_page_index = 0; + m_page_size = 0; + m_return_all = 0; + m_cache_switch = 0; + m_top_switch = 0; + m_snapshot_switch = 0; + m_sort_type = SORT_RELEVANCE; + m_appid = 10001; + m_user_id = "0"; + m_last_id = ""; + m_last_score = ""; + m_search_after = false; + distance = 0; + m_terminal_tag = 0; + m_terminal_tag_valid = false; +} + +Component::~Component(){ + +} + +int Component::ParseJson(const char *sz_json, int json_len, Json::Value &recv_packet) +{ + Json::Reader r(Json::Features::strictMode()); + int ret; + ret = r.parse(sz_json, sz_json + json_len, recv_packet); + if (0 == ret) + { + log_error("the err json string is : %s", sz_json); + log_error("parse json error , errmsg : %s", r.getFormattedErrorMessages().c_str()); + return -RT_PARSE_JSON_ERR; + } + + if (recv_packet.isMember("appid") && recv_packet["appid"].isUInt()) + { + m_appid = recv_packet["appid"].asUInt(); + } + else { + m_appid = 10001; + } + + if (recv_packet.isMember("userid") && recv_packet["userid"].isString()) + { + m_user_id = recv_packet["userid"].asString(); + } + else { + m_user_id = "0"; + } + + if (recv_packet.isMember("key") && recv_packet["key"].isString()) + { + m_Data = recv_packet["key"].asString(); + } + else { + m_Data = ""; + } + + if (recv_packet.isMember("key_and") && recv_packet["key_and"].isString()) + { + m_Data_and = recv_packet["key_and"].asString(); + } + else { + m_Data_and = ""; + } + + if (recv_packet.isMember("key_invert") && recv_packet["key_invert"].isString()) + { + m_Data_invert = recv_packet["key_invert"].asString(); + } + else { + m_Data_invert = ""; + } + + if (recv_packet.isMember("key_complete") && recv_packet["key_complete"].isString()) + { + m_Data_complete = recv_packet["key_complete"].asString(); + } + else { + m_Data_complete = ""; + } + + if (recv_packet.isMember("page_index") && recv_packet["page_index"].isString()) + { + m_page_index = atoi(recv_packet["page_index"].asString().c_str()); + } + else { + m_page_index = 1 ; + } + + if (recv_packet.isMember("page_size") && recv_packet["page_size"].isString()) + { + m_page_size = atoi(recv_packet["page_size"].asString().c_str()); + } + else { + m_page_size = 10; + } + + if(recv_packet.isMember("sort_type") && recv_packet["sort_type"].isString()) + { + m_sort_type = atoi(recv_packet["sort_type"].asString().c_str()); + } + else { + m_sort_type = SORT_RELEVANCE; + } + + if(recv_packet.isMember("sort_field") && recv_packet["sort_field"].isString()) + { + m_sort_field = recv_packet["sort_field"].asString(); + } + else { + m_sort_field = ""; + } + + if (recv_packet.isMember("return_all") && recv_packet["return_all"].isString()) + { + m_return_all = atoi(recv_packet["return_all"].asString().c_str()); + } + else { + m_return_all = 0; + } + + if(recv_packet.isMember("fields") && recv_packet["fields"].isString()) + { + string fields = recv_packet["fields"].asString(); + m_fields = splitEx(fields, ","); + } + + if (recv_packet.isMember("terminal_tag") && recv_packet["terminal_tag"].isString()) + { + m_terminal_tag = atoi(recv_packet["terminal_tag"].asString().c_str()); + } + else { + m_terminal_tag = 0; + } + + if(m_terminal_tag == 1){ + if(m_Data_and == "" || m_Data != "" || m_Data_invert != ""){ + log_error("terminal_tag is true, only key_and is available."); + return -RT_PARSE_JSON_ERR; + } + } + + if(recv_packet.isMember("last_id") && recv_packet["last_id"].isString()) + { + m_last_id = recv_packet["last_id"].asString(); + } + else { + m_last_id = ""; + } + + bool score_flag = true; + if (recv_packet.isMember("last_score") && recv_packet["last_score"].isString()) + { + m_last_score = recv_packet["last_score"].asString(); + } + else { + score_flag = false; + m_last_score = "0"; + } + if(m_last_id != "" && score_flag == true){ + m_search_after = true; + } + if(m_search_after == true && m_sort_type != SORT_FIELD_DESC && m_sort_type != SORT_FIELD_ASC){ + log_error("in search_after mode, sort_type must be SORT_FIELD_DESC or SORT_FIELD_ASC."); + return -RT_PARSE_JSON_ERR; + } + if ("" == m_Data && "" == m_Data_and && "" == m_Data_complete) { + m_Data = m_default_query; + } + log_debug("parse success, m_Data: %s, m_Data_and: %s, m_Data_invert: %s, m_page_index: %u, m_return_all: %u", + m_Data.c_str(), m_Data_and.c_str(), m_Data_invert.c_str(), m_page_index, m_return_all); + + return 0; +} + +void Component::InitSwitch() +{ + AppInfo app_info; + bool res = SearchConf::Instance()->GetAppInfo(m_appid, app_info); + if (true == res){ + m_cache_switch = app_info.cache_switch; + m_top_switch = app_info.top_switch; + m_snapshot_switch = app_info.snapshot_switch; + } +} + +void Component::GetQueryWord(uint32_t &m_has_gis){ + GetFieldWords(MAINKEY, m_Data, m_appid, m_has_gis); + GetFieldWords(ANDKEY, m_Data_and, m_appid, m_has_gis); + GetFieldWords(INVERTKEY, m_Data_invert, m_appid, m_has_gis); +} + +void Component::GetFieldWords(int type, string dataStr, uint32_t appid, uint32_t &m_has_gis){ + if (dataStr == "") + return ; + string latitude_tmp = ""; + string longitude_tmp = ""; + string gisip_tmp = ""; + string field_Data = ""; + string primary_Data = ""; + vector joinFieldInfos; + int i = dataStr.find(":"); + if (i == -1) { + primary_Data = dataStr; + } else { + int j = dataStr.substr(0, i).rfind(" "); + if (j == -1) { + field_Data = dataStr; + primary_Data = ""; + } else { + primary_Data = dataStr.substr(0, j); + field_Data = dataStr.substr(j+1); + } + } + + if (type == 0) { + m_Query_Word = primary_Data; + } + + if (primary_Data.length() > MAX_SEARCH_LEN) { // 超长进行截断 + primary_Data = primary_Data.substr(0, MAX_SEARCH_LEN); + } + + string probably_key = ""; + bool is_correct = false; + if(primary_Data != "" && primary_Data.length() <= SINGLE_WORD_LEN) // 判断输入的词语是否正确,如果超过一定长度,则认为是多个词组成 + { + JudgeWord(appid, primary_Data, is_correct, probably_key); + m_probably_data = probably_key; + } + vector primaryInfo; + FieldInfo pInfo; + string split_data; + if (is_correct == true) { + pInfo.field = INT_MAX; + pInfo.word = primary_Data; + primaryInfo.push_back(pInfo); + DataManager::Instance()->GetSynonymByKey(primary_Data, primaryInfo); + } + else if (probably_key != "") { + pInfo.word = probably_key; + primaryInfo.push_back(pInfo); + DataManager::Instance()->GetSynonymByKey(probably_key, primaryInfo); + } + else if (primary_Data != ""){ + split_data = SplitManager::Instance()->split(primary_Data, appid); + log_debug("split_data: %s", split_data.c_str()); + vector split_datas = splitEx(split_data, "|"); + for(size_t i = 0; i < split_datas.size(); i++) //是否有重复的同义词存在? + { + pInfo.field = INT_MAX; + pInfo.word = split_datas[i]; + primaryInfo.push_back(pInfo); + DataManager::Instance()->GetSynonymByKey(split_datas[i], primaryInfo); + } + } + AddToFieldList(type, primaryInfo); + + vector gisCode; + vector vec = splitEx(field_Data, " "); + vector::iterator iter; + map > field_keys_map; + uint32_t range_query = 0; + vector lng_arr; + vector lat_arr; + for (iter = vec.begin(); iter < vec.end(); iter++) + { + vector fieldInfos; + if ((*iter)[0] == '\0') + continue; + vector tmp = splitEx(*iter, ":"); + if (tmp.size() != 2) + continue; + uint32_t segment_tag = 0; + FieldInfo fieldInfo; + string fieldname = tmp[0]; + + uint32_t field = DBManager::Instance()->GetWordField(segment_tag, appid, fieldname, fieldInfo); + if(field != 0 && fieldInfo.index_tag == 0){ + ExtraFilterKey extra_filter_key; + extra_filter_key.field_name = fieldname; + extra_filter_key.field_value = tmp[1]; + extra_filter_key.field_type = fieldInfo.field_type; + if(type == 0){ + extra_filter_keys.push_back(extra_filter_key); + } else if (type == 1) { + extra_filter_and_keys.push_back(extra_filter_key); + } else if (type == 2) { + extra_filter_invert_keys.push_back(extra_filter_key); + } + continue; + } + if (field != 0 && segment_tag == 1) + { + string split_data = SplitManager::Instance()->split(tmp[1], appid); + log_debug("split_data: %s", split_data.c_str()); + vector split_datas = splitEx(split_data, "|"); + for(size_t index = 0; index < split_datas.size(); index++) + { + FieldInfo info; + info.field = fieldInfo.field; + info.field_type = fieldInfo.field_type; + info.word = split_datas[index]; + info.segment_tag = fieldInfo.segment_tag; + fieldInfos.push_back(info); + } + } + else if (field != 0 && segment_tag == 5) { + range_query++; + string str = tmp[1]; + str.erase(0, str.find_first_not_of(" ")); + str.erase(str.find_last_not_of(" ") + 1); + if (str.size() == 0) { + log_error("field[%s] content is null", fieldname.c_str()); + continue; + } + if (str[0] == '[') { // 范围查询 + int l = str.find(","); + if (l == -1 || (str[str.size() - 1] != ']' && str[str.size() - 1] != ')')) { + log_error("field[%s] content[%s] invalid", fieldname.c_str(), str.c_str()); + continue; + } + istringstream iss(str.substr(1, l).c_str()); + iss >> fieldInfo.start; + string end_str = str.substr(l + 1, str.size() - l - 2); + end_str.erase(0, end_str.find_first_not_of(" ")); + istringstream end_iss(end_str); + end_iss >> fieldInfo.end; + + if (str[str.size() - 1] == ']') { + fieldInfo.range_type = RANGE_GELE; + } + else { + if (end_str.size() == 0) { + fieldInfo.range_type = RANGE_GE; + } + else { + fieldInfo.range_type = RANGE_GELT; + } + } + fieldInfos.push_back(fieldInfo); + } + else if (str[0] == '(') { + int l = str.find(","); + if (l == -1 || (str[str.size() - 1] != ']' && str[str.size() - 1] != ')')) { + log_error("field[%s] content[%s] invalid", fieldname.c_str(), str.c_str()); + continue; + } + string start_str = str.substr(1, l).c_str(); + string end_str = str.substr(l + 1, str.size() - l - 2); + start_str.erase(0, start_str.find_first_not_of(" ")); + end_str.erase(0, end_str.find_first_not_of(" ")); + istringstream start_iss(start_str); + start_iss >> fieldInfo.start; + istringstream end_iss(end_str); + end_iss >> fieldInfo.end; + + if (str[str.size() - 1] == ']') { + if (start_str.size() == 0) { + fieldInfo.range_type = RANGE_LE; + } + else { + fieldInfo.range_type = RANGE_GTLE; + } + } + else { + if (start_str.size() != 0 && end_str.size() != 0) { + fieldInfo.range_type = RANGE_GTLT; + } + else if (start_str.size() == 0 && end_str.size() != 0) { + fieldInfo.range_type = RANGE_LT; + } + else if (start_str.size() != 0 && end_str.size() == 0) { + fieldInfo.range_type = RANGE_GT; + } + else { + log_error("field[%s] content[%s] invalid", fieldname.c_str(), str.c_str()); + continue; + } + } + fieldInfos.push_back(fieldInfo); + } + else { + fieldInfo.word = tmp[1]; + fieldInfos.push_back(fieldInfo); + } + log_debug("range_type: %d, start: %u, end: %u, segment_tag: %d, word: %s", fieldInfo.range_type, fieldInfo.start, fieldInfo.end, fieldInfo.segment_tag, fieldInfo.word.c_str()); + } + else if (field != 0) + { + fieldInfo.word = tmp[1]; + fieldInfos.push_back(fieldInfo); + } + else if (field == 0) + { + if (fieldInfo.field_type == 5) { + longitude_tmp = tmp[1]; + longitude = longitude_tmp; + } else if (fieldInfo.field_type == 6) { + latitude_tmp = tmp[1]; + latitude = latitude_tmp; + } else if (fieldInfo.field_type == 8) { + distance = strToDouble(tmp[1]); + } else if (fieldInfo.field_type == 7) { + gisip_tmp = tmp[1]; + gisip = gisip_tmp; + } else if (fieldInfo.field_type == FIELD_WKT) { + string str = tmp[1]; + str = delPrefix(str); + vector str_vec = splitEx(str, ","); + for(uint32_t str_vec_idx = 0; str_vec_idx < str_vec.size(); str_vec_idx++){ + string wkt_str = trim(str_vec[str_vec_idx]); + vector wkt_vec = splitEx(wkt_str, "-"); + if(wkt_vec.size() == 2){ + lng_arr.push_back(wkt_vec[0]); + lat_arr.push_back(wkt_vec[1]); + } + } + } + } + if (fieldInfos.size() != 0) { + field_keys_map.insert(make_pair(fieldInfo.field, fieldInfos)); + } + } + + double distance_tmp = 2; // 不指定distance时最多返回2km内的数据 + if(distance > 1e-6 && distance_tmp > distance + 1e-6){ // distance大于0小于2时取distance的值 + distance_tmp = distance; + } + GetGisCode(longitude_tmp, latitude_tmp, gisip_tmp, distance_tmp, gisCode); + log_debug("lng_arr size: %d, lat_arr size: %d", (int)lng_arr.size(), (int)lat_arr.size()); + if (gisCode.size() == 0 && lng_arr.size() > 0){ + GetGisCode(lng_arr, lat_arr, gisCode); + } + if(gisCode.size() > 0){ + vector fieldInfos; + uint32_t segment_tag = 0; + FieldInfo fieldInfo; + uint32_t field = DBManager::Instance()->GetWordField(segment_tag, appid, "gis", fieldInfo); + if (field != 0 && segment_tag == 0) { + m_has_gis = 1; + for (size_t index = 0; index < gisCode.size(); index++) { + FieldInfo info; + info.field = fieldInfo.field; + info.field_type = fieldInfo.field_type; + info.segment_tag = fieldInfo.segment_tag; + info.word = gisCode[index]; + fieldInfos.push_back(info); + } + } + if (fieldInfos.size() != 0) { + field_keys_map.insert(make_pair(fieldInfo.field, fieldInfos)); + } + } + + //如果key_and查询的field匹配到联合索引,则将查询词拼接起来作为新的查询词 + if(type == 1){ + vector union_key_vec; + DBManager::Instance()->GetUnionKeyField(appid, union_key_vec); + vector::iterator union_key_iter = union_key_vec.begin(); + for(; union_key_iter != union_key_vec.end(); union_key_iter++){ + string union_key = *union_key_iter; + vector union_field_vec = splitInt(union_key, ","); + vector::iterator union_field_iter = union_field_vec.begin(); + bool hit_union_key = true; + for(; union_field_iter != union_field_vec.end(); union_field_iter++){ + if(field_keys_map.find(*union_field_iter) == field_keys_map.end()){ + hit_union_key = false; + break; + } + } + if(hit_union_key == true){ + vector > keys_vvec; + vector unionFieldInfos; + for(union_field_iter = union_field_vec.begin(); union_field_iter != union_field_vec.end(); union_field_iter++){ + vector field_info_vec = field_keys_map.at(*union_field_iter); + vector key_vec; + GetKeyFromFieldInfo(field_info_vec, key_vec); + keys_vvec.push_back(key_vec); + field_keys_map.erase(*union_field_iter); // 命中union_key的需要从field_keys_map中删除 + } + vector union_keys = Combination(keys_vvec); + for(int m = 0 ; m < (int)union_keys.size(); m++){ + FieldInfo info; + info.field = 0; + info.field_type = FIELD_STRING; + info.segment_tag = 1; + info.word = union_keys[m]; + unionFieldInfos.push_back(info); + } + AddToFieldList(type, unionFieldInfos); + log_debug("hit union key index."); + break; + } + } + map >::iterator field_key_map_iter = field_keys_map.begin(); + for(; field_key_map_iter != field_keys_map.end(); field_key_map_iter++){ + AddToFieldList(type, field_key_map_iter->second); + } + } else { + map >::iterator field_key_map_iter = field_keys_map.begin(); + for(; field_key_map_iter != field_keys_map.end(); field_key_map_iter++){ + AddToFieldList(type, field_key_map_iter->second); + } + } + + if(type == 1){ // terminal_tag为1时,key_and中必须只带有一个范围查询 + if(m_terminal_tag == 1 && range_query == 1 && and_keys.size() == 1){ + m_terminal_tag_valid = true; + } + } + + return ; +} + +void Component::AddToFieldList(int type, vector& fields) +{ + if (fields.size() == 0) + return ; + if (type == 0) { + keys.push_back(fields); + } else if (type == 1) { + and_keys.push_back(fields); + } else if (type == 2) { + invert_keys.push_back(fields); + } + return ; +} + +const vector >& Component::Keys(){ + return keys; +} + +const vector >& Component::AndKeys(){ + return and_keys; +} + +const vector >& Component::InvertKeys(){ + return invert_keys; +} + +const vector& Component::ExtraFilterKeys(){ + return extra_filter_keys; +} + +const vector& Component::ExtraFilterAndKeys(){ + return extra_filter_and_keys; +} + +const vector& Component::ExtraFilterInvertKeys(){ + return extra_filter_invert_keys; +} + +string Component::QueryWord(){ + return m_Query_Word; +} + +void Component::SetQueryWord(string query_word){ + m_Query_Word = query_word; +} + +string Component::ProbablyData(){ + return m_probably_data; +} + +void Component::SetProbablyData(string probably_data){ + m_probably_data = probably_data; +} + +string Component::Latitude(){ + return latitude; +} + +string Component::Longitude(){ + return longitude; +} + +double Component::Distance(){ + return distance; +} + +string Component::Data(){ + return m_Data; +} + +uint32_t Component::JdqSwitch(){ + return m_jdq_switch; +} + +uint32_t Component::Appid(){ + return m_appid; +} + +string Component::DataAnd(){ + return m_Data_and; +} + +string Component::DataInvert(){ + return m_Data_invert; +} + +string Component::DataComplete(){ + return m_Data_complete; +} + +uint32_t Component::SortType(){ + return m_sort_type; +} + +uint32_t Component::PageIndex(){ + return m_page_index; +} +uint32_t Component::PageSize(){ + return m_page_size; +} + +uint32_t Component::ReturnAll(){ + return m_return_all; +} + +uint32_t Component::CacheSwitch(){ + return m_cache_switch; +} + +uint32_t Component::TopSwitch(){ + return m_top_switch; +} + +uint32_t Component::SnapshotSwitch(){ + return m_snapshot_switch; +} + +string Component::SortField(){ + return m_sort_field; +} + +string Component::LastId(){ + return m_last_id; +} + +string Component::LastScore(){ + return m_last_score; +} + +bool Component::SearchAfter(){ + return m_search_after; +} + +vector& Component::Fields(){ + return m_fields; +} + +uint32_t Component::TerminalTag(){ + return m_terminal_tag; +} + +bool Component::TerminalTagValid(){ + return m_terminal_tag_valid; +} + +void Component::GetKeyFromFieldInfo(const vector& field_info_vec, vector& key_vec){ + vector::const_iterator iter = field_info_vec.begin(); + for(; iter != field_info_vec.end(); iter++){ + key_vec.push_back((*iter).word); + } +} + +/* +** 通过递归求出二维vector每一维vector中取一个数的各种组合 +** 输入:[[a],[b1,b2],[c1,c2,c3]] +** 输出:[a_b1_c1,a_b1_c2,a_b1_c3,a_b2_c1,a_b2_c2,a_b2_c3] +*/ +vector Component::Combination(vector > &dimensionalArr){ + int FLength = dimensionalArr.size(); + if(FLength >= 2){ + int SLength1 = dimensionalArr[0].size(); + int SLength2 = dimensionalArr[1].size(); + int DLength = SLength1 * SLength2; + vector temporary(DLength); + int index = 0; + for(int i = 0; i < SLength1; i++){ + for (int j = 0; j < SLength2; j++) { + temporary[index] = dimensionalArr[0][i] +"_"+ dimensionalArr[1][j]; + index++; + } + } + vector > new_arr; + new_arr.push_back(temporary); + for(int i = 2; i < (int)dimensionalArr.size(); i++){ + new_arr.push_back(dimensionalArr[i]); + } + return Combination(new_arr); + } else { + return dimensionalArr[0]; + } +} \ No newline at end of file diff --git a/src/search_local/index_read/component.h b/src/search_local/index_read/component.h new file mode 100644 index 0000000..e103c5c --- /dev/null +++ b/src/search_local/index_read/component.h @@ -0,0 +1,114 @@ +/* + * ===================================================================================== + * + * Filename: component.h + * + * Description: component class definition. + * + * Version: 1.0 + * Created: 09/08/2019 + * Revision: none + * Compiler: gcc + * + * Author: zhulin, shzhulin3@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ + +#ifndef __COMPONENT_H__ +#define __COMPONENT_H__ +#include "comm.h" +#include "json/json.h" +#include +#include +using namespace std; + +class Component +{ +public: + Component(); + ~Component(); + + void GetQueryWord(uint32_t &m_has_gis); + const vector >& Keys(); + const vector >& AndKeys(); + const vector >& InvertKeys(); + const vector& ExtraFilterKeys(); + const vector& ExtraFilterAndKeys(); + const vector& ExtraFilterInvertKeys(); + int ParseJson(const char *sz_json, int json_len, Json::Value &recv_packet); + void InitSwitch(); + string QueryWord(); + void SetQueryWord(string query_word); + string ProbablyData(); + void SetProbablyData(string probably_data); + string Latitude(); + string Longitude(); + double Distance(); + string Data(); + string DataAnd(); + string DataInvert(); + string DataComplete(); + uint32_t JdqSwitch(); + uint32_t Appid(); + uint32_t SortType(); + uint32_t PageIndex(); + uint32_t PageSize(); + uint32_t ReturnAll(); + uint32_t CacheSwitch(); + uint32_t TopSwitch(); + uint32_t SnapshotSwitch(); + string SortField(); + string LastId(); + string LastScore(); + bool SearchAfter(); + vector& Fields(); + uint32_t TerminalTag(); + bool TerminalTagValid(); + +private: + void GetFieldWords(int type, string dataStr, uint32_t appid, uint32_t &m_has_gis); + void AddToFieldList(int type, vector& fields); + void GetKeyFromFieldInfo(const vector& field_info_vec, vector& key_vec); + vector Combination(vector > &dimensionalArr); + +private: + vector > keys; + vector > and_keys; + vector > invert_keys; + vector extra_filter_keys; + vector extra_filter_and_keys; + vector extra_filter_invert_keys; + + string m_Query_Word; + string m_probably_data; + string latitude; + string longitude; + string gisip; + double distance; + + string m_Data; //查询词 + string m_Data_and; // 包含该查询词 + string m_Data_invert; // 不包含该查询词 + string m_Data_complete; // 完整关键词 + uint32_t m_page_index; + uint32_t m_page_size; + uint32_t m_return_all; + uint32_t m_cache_switch; + uint32_t m_top_switch; + uint32_t m_snapshot_switch; + uint32_t m_sort_type; + uint32_t m_appid; + string m_user_id; + string m_sort_field; + string m_last_id; + string m_last_score; + bool m_search_after; + vector m_fields; + string m_default_query; + uint32_t m_jdq_switch; + uint32_t m_terminal_tag; + bool m_terminal_tag_valid; +}; +#endif \ No newline at end of file diff --git a/src/search_local/index_read/index_sync/rocksdb_direct_context.h b/src/search_local/index_read/index_sync/rocksdb_direct_context.h new file mode 100644 index 0000000..bb3fd31 --- /dev/null +++ b/src/search_local/index_read/index_sync/rocksdb_direct_context.h @@ -0,0 +1,254 @@ +/* + * ===================================================================================== + * + * Filename: rocksdb_direct_context.h + * + * Description: rocksdb direct context class definition. + * + * Version: 1.0 + * Created: 09/08/2020 + * Revision: none + * Compiler: gcc + * + * Author: zhulin, shzhulin3@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ + +#ifndef __ROCKSDB_DIRECT_CONTEXT_H__ +#define __ROCKSDB_DIRECT_CONTEXT_H__ + +#if 1 + +#include +#include +#include +#include +#include +#include + +static const uint16_t sgMagicNum = 12345; // global magic number + +// operator must be matched with DTC with the same order +enum CondOpr +{ + eEQ, // == + eNE, // != + eLT, // < + eLE, // <= + eGT, // > + eGE // >= +}; + +#pragma pack(push, 1) +struct QueryCond +{ + uint8_t sFieldIndex; + uint8_t sCondOpr; // CondOpr + std::string sCondValue; + +private: + int binarySize() + { + static int fixHeaderLen = sizeof(sFieldIndex) + sizeof(sCondOpr) + sizeof(int)/* value len */; + return fixHeaderLen + sCondValue.length(); + } + + void serializeTo(char* &data) + { + *(uint8_t*)data = sFieldIndex; + data += sizeof(uint8_t); + + *(uint8_t*)data = sCondOpr; + data += sizeof(uint8_t); + + *(int*)data = sCondValue.length(); + data += sizeof(int); + + memmove((void*)data, (void*)sCondValue.c_str(), sCondValue.length()); + data += sCondValue.length(); + } + + void serializeFrom(const char* data, int& condLen) + { + const char *begPos = data; + + sFieldIndex = *(uint8_t*)data; + data += sizeof(uint8_t); + + sCondOpr = *(uint8_t*)data; + data += sizeof(uint8_t); + + int len = *(int*)data; + data += sizeof(int); + + sCondValue.assign(data, len); + condLen = data - begPos + len; + } + + friend class DirectRequestContext; +}; + +struct LimitCond +{ + int sLimitStart; + int sLimitStep; + LimitCond(){ + sLimitStart = 0; + sLimitStep = 10; + } + + void reset() { sLimitStart = -1, sLimitStep = -1; } +}; + +struct DirectRequestContext +{ + uint16_t sMagicNum; + uint64_t sSequenceId; + + std::vector sFieldConds; + std::vector > sOrderbyFields; + LimitCond sLimitCond; + + void reset() + { + sMagicNum = 0; + sSequenceId = 0; + sFieldConds.clear(); + sLimitCond.reset(); + } + + // binary format size for transporting in + int binarySize() + { + static int fixHeaderLen = sizeof(sMagicNum) + sizeof(sSequenceId) + sizeof(uint8_t) * 2; + + int len = fixHeaderLen; + for ( size_t idx = 0; idx < sFieldConds.size(); idx++ ) + { + len += sFieldConds[idx].binarySize(); + } + for (size_t idx = 0; idx < sOrderbyFields.size(); idx++) + { + len += (sizeof(int) + sizeof(bool)); + } + len += sizeof(LimitCond); + + return len; + } + + // before call this function, should call 'binarySize' to evaluate the size of the struct + void serializeTo(char *data, int len) + { + *(uint16_t*)data = sMagicNum; + data += sizeof(uint16_t); + + *(uint64_t*)data = sSequenceId; + data += sizeof(uint64_t); + + *(uint8_t*)data = sFieldConds.size(); + data += sizeof(uint8_t); + + for ( size_t idx = 0; idx < sFieldConds.size(); idx++ ) + { + sFieldConds[idx].serializeTo(data); + } + + *(uint8_t*)data = sOrderbyFields.size(); + data += sizeof(uint8_t); + for ( size_t idx = 0; idx < sOrderbyFields.size(); idx++ ) + { + *(int*)data = sOrderbyFields[idx].first; + data += sizeof(int); + *(bool*)data = sOrderbyFields[idx].second; + data += sizeof(bool); + } + + memmove((void*)data, (void*)&sLimitCond, sizeof(LimitCond)); + data += sizeof(LimitCond); + log_debug("serializeTo, sLimitStart: %d, sLimitStep: %d", sLimitCond.sLimitStart, sLimitCond.sLimitStep); + } + + void serializeFrom(const char *data, int dataLen) + { + sMagicNum = *(uint16_t*)data; + data += sizeof(uint16_t); + dataLen -= sizeof(uint16_t); + + sSequenceId = *(uint64_t*)data; + data += sizeof(uint64_t); + dataLen -= sizeof(uint64_t); + + uint8_t condNum = *(uint8_t*)data; + data += sizeof(uint8_t); + dataLen -= sizeof(uint8_t); + + QueryCond cond; + int condLen = 0; + for ( uint8_t idx = 0; idx < condNum; idx++ ) + { + cond.serializeFrom(data, condLen); + data += condLen; + dataLen -= condLen; + + sFieldConds.push_back(cond); + } + std::pair orPair; + uint8_t orderNum = *(uint8_t*)data; + data += sizeof(uint8_t); + dataLen -= sizeof(uint8_t); + for ( uint8_t idx = 0; idx < orderNum; idx++ ) + { + orPair.first = *(int*)data; + data += sizeof(int); + dataLen -= sizeof(int); + + orPair.second = *(bool*)data; + data += sizeof(bool); + dataLen -= sizeof(bool); + + sOrderbyFields.push_back(orPair); + } + memmove((void*)&sLimitCond, (void*)data, sizeof(LimitCond)); + dataLen -= sizeof(LimitCond); + log_debug("serializeFrom, sLimitStart: %d, sLimitStep: %d", sLimitCond.sLimitStart, sLimitCond.sLimitStep); + + assert( dataLen == 0 ); + } +}; + +struct DirectResponseContext +{ + uint16_t sMagicNum; + uint64_t sSequenceId; + int16_t sRowNums; // number of matched rows or errno + std::deque sRowValues; + + void serializeTo(std::string& data) + { + static int headerLen = sizeof(uint16_t) + sizeof(uint64_t) + sizeof(int16_t); + + data.clear(); + data = (std::string((char*)this, headerLen)); + + for ( int16_t idx = 0; idx < sRowNums; idx++ ) + { + data.append(sRowValues.front()); + sRowValues.pop_front(); + } + } + + void free() + { + sMagicNum = 0; + sSequenceId = 0; + sRowNums = -1; + sRowValues.clear(); + } +}; +#pragma pack(pop) + +#endif + +#endif // __ROCKSDB_DIRECT_CONTEXT_H__ diff --git a/src/search_local/index_read/logical_operate.cc b/src/search_local/index_read/logical_operate.cc new file mode 100644 index 0000000..5d7ec43 --- /dev/null +++ b/src/search_local/index_read/logical_operate.cc @@ -0,0 +1,341 @@ +/* + * ===================================================================================== + * + * Filename: logical_operate.h + * + * Description: logical operate class definition. + * + * Version: 1.0 + * Created: 09/08/2018 + * Revision: none + * Compiler: gcc + * + * Author: zhulin, shzhulin3@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ + +#include "logical_operate.h" +#include "search_util.h" +#include "cachelist_unit.h" +#include "data_manager.h" +#include "json/reader.h" +#include "json/writer.h" +#include "index_tbl_op.h" +#include "index_sync/sync_index_timer.h" +#include "index_sync/sequence_search_index.h" +#include "stem.h" +#include +#include +using namespace std; + +extern SyncIndexTimer *globalSyncIndexTimer; +extern CCacheListUnit *indexcachelist; + +LogicalOperate::LogicalOperate(uint32_t a, uint32_t s, uint32_t h, uint32_t c):m_appid(a), m_sort_type(s), m_has_gis(h), m_cache_switch(c) +{ + +} + +LogicalOperate::~LogicalOperate(){ + +} + +void LogicalOperate::SetFunc(logical_func func){ + m_func = func; +} + +int LogicalOperate::Process(const vector >& keys, vector& vecs, set& highlightWord, map &ves, map &key_in_doc){ + for (size_t index = 0; index < keys.size(); index++) + { + vector doc_id_vec; + vector fieldInfos = keys[index]; + vector::iterator it; + for (it = fieldInfos.begin(); it != fieldInfos.end(); it++) { + vector doc_info; + if ((*it).segment_tag == 3) { + int ret = GetDocByShiftWord(*it, doc_info, m_appid, highlightWord); + if (ret != 0) { + doc_id_vec.clear(); + return -RT_GET_DOC_ERR; + } + sort(doc_info.begin(), doc_info.end()); + for (size_t doc_info_idx = 0; doc_info_idx < doc_info.size(); doc_info_idx++){ + KeyInfo info; + info.word_freq = 1; + info.field = (*it).field; + info.word = (*it).word; + ves[doc_info[doc_info_idx].doc_id].push_back(info); + } + } else if ((*it).segment_tag == 4) { + int ret = GetDocByShiftEnWord(*it, doc_info, m_appid, highlightWord); + if (ret != 0) { + doc_id_vec.clear(); + return -RT_GET_DOC_ERR; + } + sort(doc_info.begin(), doc_info.end()); + for (size_t doc_info_idx = 0; doc_info_idx < doc_info.size(); doc_info_idx++){ + KeyInfo info; + info.word_freq = 1; + info.field = (*it).field; + info.word = (*it).word; + ves[doc_info[doc_info_idx].doc_id].push_back(info); + } + } else if ((*it).segment_tag == 5 && (*it).word == "") { // 范围查询 + stringstream ss; + ss << m_appid; + InvertIndexEntry startEntry(ss.str(), (*it).field, (double)(*it).start); + InvertIndexEntry endEntry(ss.str(), (*it).field, (double)(*it).end); + std::vector resultEntry; + globalSyncIndexTimer->GetSearchIndex()->GetRangeIndex((*it).range_type, startEntry, endEntry, resultEntry); + std::vector::iterator iter = resultEntry.begin(); + for (; iter != resultEntry.end(); iter ++) { + IndexInfo info; + info.doc_id = (*iter)._InvertIndexDocId; + info.doc_version = (*iter)._InvertIndexDocVersion; + doc_info.push_back(info); + } + log_debug("appid: %s, field: %d, count: %d", startEntry._InvertIndexAppid.c_str(), (*it).field, (int)resultEntry.size()); + } else { + int ret = GetDocIdSetByWord(*it, doc_info); + if (ret != 0){ + return -RT_GET_DOC_ERR; + } + if (doc_info.size() == 0) + continue; + if (!m_has_gis || !isAllNumber((*it).word)) + highlightWord.insert((*it).word); + if(!m_has_gis && (m_sort_type == SORT_RELEVANCE || m_sort_type == SORT_TIMESTAMP)){ + CalculateByWord(*it, doc_info, ves, key_in_doc); + } + } + doc_id_vec = vec_union(doc_id_vec, doc_info); + } + if(index == 0){ // 第一个直接赋值给vecs,后续的依次与前面的进行逻辑运算 + vecs.assign(doc_id_vec.begin(), doc_id_vec.end()); + } else { + vecs = m_func(vecs, doc_id_vec); + } + } + return 0; +} + +int LogicalOperate::ProcessTerminal(const vector >& and_keys, const TerminalQryCond& query_cond, vector& vecs){ + if(and_keys.size() != 1){ + return 0; + } + vector field_vec = and_keys[0]; + if(field_vec.size() != 1){ + return 0; + } + FieldInfo field_info = field_vec[0]; + if(field_info.segment_tag != SEGMENT_RANGE){ + return 0; + } + + stringstream ss; + ss << m_appid; + InvertIndexEntry beginEntry(ss.str(), field_info.field, (double)field_info.start); + InvertIndexEntry endEntry(ss.str(), field_info.field, (double)field_info.end); + std::vector resultEntry; + globalSyncIndexTimer->GetSearchIndex()->GetRangeIndexInTerminal(field_info.range_type, beginEntry, endEntry, query_cond, resultEntry); + std::vector::iterator iter = resultEntry.begin(); + for (; iter != resultEntry.end(); iter ++) { + TerminalRes info; + info.doc_id = (*iter)._InvertIndexDocId; + info.score = (*iter)._InvertIndexKey; + vecs.push_back(info); + } + return 0; +} + +int LogicalOperate::ProcessComplete(const vector& complete_keys, vector& complete_vecs, vector& word_vec, map &ves, map &key_in_doc){ + vector::const_iterator iter; + for (iter = complete_keys.begin(); iter != complete_keys.end(); iter++) { + vector doc_info; + int ret = GetDocIdSetByWord(*iter, doc_info); + if (ret != 0) { + return -RT_GET_DOC_ERR; + } + + word_vec.push_back((*iter).word); + + if(m_sort_type == SORT_RELEVANCE || m_sort_type == SORT_TIMESTAMP){ + CalculateByWord(*iter, doc_info, ves, key_in_doc); + } + + if(iter == complete_keys.begin()){ + complete_vecs.assign(doc_info.begin(), doc_info.end()); + } else { + complete_vecs = vec_intersection(complete_vecs, doc_info); + } + } + return 0; +} + +void LogicalOperate::CalculateByWord(FieldInfo fieldInfo, const vector &doc_info, map &ves, map &key_in_doc) { + string doc_id; + uint32_t word_freq = 0; + uint32_t field = 0; + uint32_t created_time; + string pos_str = ""; + for (size_t i = 0; i < doc_info.size(); i++) { + doc_id = doc_info[i].doc_id; + word_freq = doc_info[i].word_freq; + field = doc_info[i].field; + created_time = doc_info[i].created_time; + pos_str = doc_info[i].pos; + vector pos_vec; + if (pos_str != "" && pos_str.size() > 2) { + pos_str = pos_str.substr(1, pos_str.size() - 2); + pos_vec = splitInt(pos_str, ","); + } + KeyInfo info; + info.word_freq = word_freq; + info.field = field; + info.word = fieldInfo.word; + info.created_time = created_time; + info.pos_vec = pos_vec; + ves[doc_id].push_back(info); + } + key_in_doc[fieldInfo.word] = doc_info.size(); +} + + +bool LogicalOperate::GetDocIndexCache(string word, uint32_t field, vector &doc_info) { + log_debug("get doc index start"); + bool res = false; + uint8_t value[MAX_VALUE_LEN] = { 0 }; + unsigned vsize = 0; + string output = ""; + string indexCache = word + "|" + ToString(field); + if (m_cache_switch == 1 && indexcachelist->in_list(indexCache.c_str(), indexCache.size(), value, vsize)) + { + log_debug("hit index cache."); + value[vsize] = '\0'; + output = (char *)value; + res = true; + } + + if (res) { + Json::Value packet; + Json::Reader r(Json::Features::strictMode()); + int ret; + ret = r.parse(output.c_str(), output.c_str() + output.size(), packet); + if (0 == ret) + { + log_error("the err json string is : %s, errmsg : %s", output.c_str(), r.getFormattedErrorMessages().c_str()); + res = false; + return res; + } + + for (uint32_t i = 0; i < packet.size(); ++i) { + IndexInfo info; + Json::Value& index_cache = packet[i]; + if (index_cache.isMember("appid") && index_cache["appid"].isUInt() && + index_cache.isMember("id") && index_cache["id"].isString() && + index_cache.isMember("version") && index_cache["version"].isUInt() && + index_cache.isMember("field") && index_cache["field"].isUInt() && + index_cache.isMember("freq") && index_cache["freq"].isUInt() && + index_cache.isMember("time") && index_cache["time"].isUInt() && + index_cache.isMember("pos") && index_cache["pos"].isString()) + { + info.appid = index_cache["appid"].asUInt(); + info.doc_id = index_cache["id"].asString(); + info.doc_version = index_cache["version"].asUInt(); + info.field = index_cache["field"].asUInt(); + info.word_freq = index_cache["freq"].asUInt(); + info.created_time = index_cache["time"].asUInt(); + info.pos = index_cache["pos"].asString(); + doc_info.push_back(info); + } + else { + log_error("parse index_cache error, no appid"); + doc_info.clear(); + res = false; + break; + } + } + } + return res; +} + +void LogicalOperate::SetDocIndexCache(const vector &doc_info, string& indexJsonStr) { + Json::Value indexJson; + Json::FastWriter writer; + for (size_t i = 0; i < doc_info.size(); i++) { + Json::Value json_tmp; + json_tmp["appid"] = doc_info[i].appid; + json_tmp["id"] = doc_info[i].doc_id; + json_tmp["version"] = doc_info[i].doc_version; + json_tmp["field"] = doc_info[i].field; + json_tmp["freq"] = doc_info[i].word_freq; + json_tmp["time"] = doc_info[i].created_time; + json_tmp["pos"] = doc_info[i].pos; + indexJson.append(json_tmp); + } + indexJsonStr = writer.write(indexJson); +} + + +int LogicalOperate::GetDocIdSetByWord(FieldInfo fieldInfo, vector &doc_info) { + bool bRet = false; + if (DataManager::Instance()->IsSensitiveWord(fieldInfo.word)) { + log_debug("%s is a sensitive word.", fieldInfo.word.c_str()); + return 0; + } + + stringstream ss_key; + ss_key << m_appid; + ss_key << "#00#"; + if(fieldInfo.segment_tag == 5){ + stringstream ss; + ss << setw(20) << setfill('0') << fieldInfo.word; + ss_key << ss.str(); + } + else if (fieldInfo.field_type == FIELD_INT || fieldInfo.field_type == FIELD_DOUBLE || fieldInfo.field_type == FIELD_LONG) { + ss_key << fieldInfo.word; + } + else if (fieldInfo.field_type == FIELD_IP) { + uint32_t word_id = GetIpNum(fieldInfo.word); + if (word_id == 0) + return 0; + ss_key << word_id; + } + else if (fieldInfo.word.find("_") != string::npos) { // 联合索引 + ss_key << fieldInfo.word; + } + else { + string word_new = stem(fieldInfo.word); + ss_key << word_new; + } + + log_debug("appid [%u], key[%s]", m_appid, ss_key.str().c_str()); + if (m_has_gis && GetDocIndexCache(ss_key.str(), fieldInfo.field, doc_info)) { + return 0; + } + + bRet = g_IndexInstance.GetDocInfo(m_appid, ss_key.str(), fieldInfo.field, doc_info); + if (false == bRet) { + log_error("GetDocInfo error."); + return -RT_DTC_ERR; + } + + if (m_cache_switch == 1 && m_has_gis == 1 && doc_info.size() > 0 && doc_info.size() <= 1000) { + string index_str; + SetDocIndexCache(doc_info, index_str); + if (index_str != "" && index_str.size() < MAX_VALUE_LEN) { + string indexCache = ss_key.str() + "|" + ToString(fieldInfo.field); + unsigned data_size = indexCache.size(); + int ret = indexcachelist->add_list(indexCache.c_str(), index_str.c_str(), data_size, index_str.size()); + if (ret != 0) { + log_error("add to index_cache_list error, ret: %d.", ret); + } + else { + log_debug("add to index_cache_list: %s.", indexCache.c_str()); + } + } + } + return 0; +} \ No newline at end of file diff --git a/src/search_local/index_read/logical_operate.h b/src/search_local/index_read/logical_operate.h new file mode 100644 index 0000000..4d97e7c --- /dev/null +++ b/src/search_local/index_read/logical_operate.h @@ -0,0 +1,51 @@ +/* + * ===================================================================================== + * + * Filename: logical_operate.h + * + * Description: logical operate class definition. + * + * Version: 1.0 + * Created: 09/08/2018 + * Revision: none + * Compiler: gcc + * + * Author: zhulin, shzhulin3@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ + +#include "component.h" +#include +#include +using namespace std; + +typedef vector vec; +typedef vector (*logical_func)(vector &a, vector &b); + +class LogicalOperate +{ +public: + LogicalOperate(uint32_t appid, uint32_t sort_type, uint32_t has_gis, uint32_t cache_switch); + ~LogicalOperate(); + + int Process(const vector >& keys, vector& vecs, set& highlightWord, map &ves, map &key_in_doc); + int ProcessComplete(const vector& complete_keys, vector& complete_vecs, vector& word_vec, map &ves, map &key_in_doc); + void SetFunc(logical_func func); + int ProcessTerminal(const vector >& and_keys, const TerminalQryCond& query_cond, vector& vecs); + +private: + void CalculateByWord(FieldInfo fieldInfo, const vector &doc_info, map &ves, map &key_in_doc); + void SetDocIndexCache(const vector &doc_info, string& indexJsonStr); + bool GetDocIndexCache(string word, uint32_t field, vector &doc_info); + int GetDocIdSetByWord(FieldInfo fieldInfo, vector &doc_info); + +private: + uint32_t m_appid; + uint32_t m_sort_type; + uint32_t m_has_gis; + uint32_t m_cache_switch; + logical_func m_func; +}; + diff --git a/src/search_local/index_storage/rocksdb_helper/comm_main.cc b/src/search_local/index_storage/rocksdb_helper/comm_main.cc new file mode 100644 index 0000000..5d1d787 --- /dev/null +++ b/src/search_local/index_storage/rocksdb_helper/comm_main.cc @@ -0,0 +1,392 @@ +/* + * ===================================================================================== + * + * Filename: comm_main.cc + * + * Description: + * + * Version: 1.0 + * Created: 09/08/2020 10:02:05 PM + * Revision: none + * Compiler: gcc + * + * Author: Norton, yangshuang68@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "comm_process.h" +#include +#include +#include +#include +#include +#include + +const char service_file[] = "./helper-service.so"; +const char create_handle_name[] = "create_process"; +const char progname[] = "custom-helper"; +const char usage_argv[] = "machId addr [port]"; +char cacheFile[256] = CACHE_CONF_NAME; +char tableFile[256] = TABLE_CONF_NAME; + +static CreateHandle CreateHelper = NULL; +static CommHelper *helperProc; +static unsigned int procTimeout; + +class HelperMain +{ +public: + HelperMain(CommHelper *helper) : h(helper){}; + + void attach(DTCTask *task) { h->attach((void *)task); } + void init_title(int group, int role) { h->init_title(group, role); } + void set_title(const char *status) { h->SetTitle(status); } + const char *Name() { return h->Name(); } + int pre_init(int gid, int r) + { + if (dbConfig->machineCnt <= gid) + { + log_error("parse config error, machineCnt[%d] <= GroupID[%d]", dbConfig->machineCnt, gid); + return (-1); + } + h->GroupID = gid; + h->Role = r; + h->_dbconfig = dbConfig; + h->_tdef = gTableDef[0]; + h->_config = gConfig; + h->_server_string = dbConfig->mach[gid].role[r].addr; + h->logapi.init_target(h->_server_string); + return 0; + } + +private: + CommHelper *h; +}; + +static int load_service(const char *dll_file) +{ + void *dll; + + dll = dlopen(dll_file, RTLD_NOW | RTLD_GLOBAL); + if (dll == (void *)NULL) + { + log_crit("dlopen(%s) error: %s", dll_file, dlerror()); + return -1; + } + + CreateHelper = (CreateHandle)dlsym(dll, create_handle_name); + if (CreateHelper == NULL) + { + log_crit("function[%s] not found", create_handle_name); + return -2; + } + + return 0; +} + +static int sync_decode(DTCTask *task, int netfd, CommHelper *helperProc) +{ + SimpleReceiver receiver(netfd); + int code; + do + { + code = task->Decode(receiver); + if (code == DecodeFatalError) + { + if (errno != 0) + log_notice("decode fatal error, fd=%d, %m", netfd); + return -1; + } + if (code == DecodeDataError) + { + if (task->result_code() == 0 || task->result_code() == -EC_EXTRA_SECTION_DATA) // -EC_EXTRA_SECTION_DATA verify package + return 0; + log_notice("decode error, fd=%d, %d", netfd, task->result_code()); + return -1; + } + HelperMain(helperProc).set_title("Receiving..."); + } while (!stop && code != DecodeDone); + + if (task->result_code() < 0) + { + log_notice("register result, fd=%d, %d", netfd, task->result_code()); + return -1; + } + return 0; +} + +static int sync_send(Packet *reply, int netfd) +{ + int code; + do + { + code = reply->Send(netfd); + if (code == SendResultError) + { + log_notice("send error, fd=%d, %m", netfd); + return -1; + } + } while (!stop && code != SendResultDone); + + return 0; +} + +static void alarm_handler(int signo) +{ + if (background == 0 && getppid() == 1) + exit(0); + alarm(10); +} + +static int accept_connection(int fd) +{ + HelperMain(helperProc).set_title("listener"); + signal(SIGALRM, alarm_handler); + while (!stop) + { + alarm(10); + int newfd; + if ((newfd = accept(fd, NULL, 0)) >= 0) + { + alarm(0); + return newfd; + } + if (newfd < 0 && errno == EINVAL) + { + if (getppid() == (pid_t)1) + { // Ѿ˳ + log_error("dtc father process not exist. helper[%d] exit now.", getpid()); + exit(0); + } + usleep(10000); + } + } + exit(0); +} + +static void proc_timeout_handler(int signo) +{ + log_error("comm-helper process timeout(more than %u seconds), helper[pid: %d] exit now.", procTimeout, getpid()); + exit(-1); +} + +struct THelperProcParameter +{ + int netfd; + int gid; + int role; +}; + +static int helper_proc_run(struct THelperProcParameter *args) +{ + // close listen fd + close(0); + open("/dev/null", O_RDONLY); + + HelperMain(helperProc).set_title("Initializing..."); + + if (procTimeout > 0) + signal(SIGALRM, proc_timeout_handler); + + alarm(procTimeout); + if (helperProc->Init() != 0) + { + log_error("%s", "helper process init failed"); + exit(-1); + } + + alarm(0); + + unsigned int timeout; + + while (!stop) + { + HelperMain(helperProc).set_title("Waiting..."); + DTCTask *task = new DTCTask(gTableDef[0]); + if (sync_decode(task, args->netfd, helperProc) < 0) + { + delete task; + break; + } + + if (task->result_code() == 0) + { + switch (task->request_code()) + { + case DRequest::Insert: + case DRequest::Update: + case DRequest::Delete: + case DRequest::Replace: + timeout = 2 * procTimeout; + default: + timeout = procTimeout; + } + HelperMain(helperProc).attach(task); + alarm(timeout); + helperProc->Execute(); + alarm(0); + } + + HelperMain(helperProc).set_title("Sending..."); + Packet *reply = new Packet; + reply->encode_result(task); + + delete task; + if (sync_send(reply, args->netfd) < 0) + { + delete reply; + break; + } + delete reply; + } + close(args->netfd); + HelperMain(helperProc).set_title("Exiting..."); + + delete helperProc; + daemon_cleanup(); +#if MEMCHECK + log_info("%s v%s: stopped", progname, version); + dump_non_delete(); + log_debug("memory allocated %lu virtual %lu", count_alloc_size(), count_virtual_size()); +#endif + exit(0); + return 0; +} + +int main(int argc, char **argv) +{ + init_proc_title(argc, argv); + if (dtc_daemon_init(argc, argv) < 0) + return -1; + + argc -= optind; + argv += optind; + + struct THelperProcParameter helperArgs = {0, 0, 0}; + char *addr = NULL; + + if (argc > 0) + { + char *p; + helperArgs.gid = strtol(argv[0], &p, 0); + if (*p == '\0' || *p == MACHINEROLESTRING[0]) + helperArgs.role = 0; + else if (*p == MACHINEROLESTRING[1]) + helperArgs.role = 1; + else + { + log_error("Bad machine id: %s", argv[0]); + return -1; + } + } + + if (argc != 2 && argc != 3) + { + show_usage(); + return -1; + } + + int backlog = gConfig->get_int_val("cache", "MaxListenCount", 256); + int helperTimeout = gConfig->get_int_val("cache", "HelperTimeout", 30); + if (helperTimeout > 1) + procTimeout = helperTimeout - 1; + else + procTimeout = 0; + addr = argv[1]; + + // load dll file + const char *file = getenv("HELPER_SERVICE_FILE"); + if (file == NULL || file[0] == 0) + file = service_file; + if (load_service(file) != 0) + return -1; + + helperProc = CreateHelper(); + if (helperProc == NULL) + { + log_crit("create helper error"); + return -1; + } + if (HelperMain(helperProc).pre_init(helperArgs.gid, helperArgs.role) < 0) + { + log_error("%s", "helper prepare init failed"); + exit(-1); + } + if (helperProc->global_init() != 0) + { + log_crit("helper gobal-init error"); + return -1; + } + + int fd = -1; + if (!strcmp(addr, "-")) + fd = 0; + else + { + if (strcasecmp(gConfig->get_str_val("cache", "CacheShmKey") ?: "", "none") != 0) + { + log_warning("standalone %s need CacheShmKey set to NONE", progname); + return -1; + } + + SocketAddress sockaddr; + const char *err = sockaddr.set_address(addr, argc == 2 ? NULL : argv[2]); + if (err) + { + log_warning("host %s port %s: %s", addr, argc == 2 ? "NULL" : argv[2], err); + return -1; + } + if (sockaddr.socket_type() != SOCK_STREAM) + { + log_warning("standalone %s don't support UDP protocol", progname); + return -1; + } + fd = sock_bind(&sockaddr, backlog); + if (fd < 0) + return -1; + } + + daemon_start(); + +#if HAS_LOGAPI + helperProc->logapi.Init( + gConfig->get_int_val("LogApi", "MessageId", 0), + gConfig->get_int_val("LogApi", "CallerId", 0), + gConfig->get_int_val("LogApi", "TargetId", 0), + gConfig->get_int_val("LogApi", "InterfaceId", 0)); +#endif + + HelperMain(helperProc).init_title(helperArgs.gid, helperArgs.role); + if (procTimeout > 1) + helperProc->set_proc_timeout(procTimeout - 1); + while (!stop) + { + helperArgs.netfd = accept_connection(fd); + + watch_dog_fork(HelperMain(helperProc).Name(), (int (*)(void *))helper_proc_run, (void *)&helperArgs); + + close(helperArgs.netfd); + } + + if (fd > 0 && addr && addr[0] == '/') + unlink(addr); + return 0; +} diff --git a/src/search_local/index_storage/rocksdb_helper/rocksdb_direct_context.h b/src/search_local/index_storage/rocksdb_helper/rocksdb_direct_context.h new file mode 100644 index 0000000..d1dae51 --- /dev/null +++ b/src/search_local/index_storage/rocksdb_helper/rocksdb_direct_context.h @@ -0,0 +1,260 @@ + +/* + * ===================================================================================== + * + * Filename: rocksdb_direct_context.h + * + * Description: + * + * Version: 1.0 + * Created: 09/08/2020 10:02:05 PM + * Revision: none + * Compiler: gcc + * + * Author: zhuyao, zhuyao28@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ +#ifndef __ROCKSDB_DIRECT_CONTEXT_H__ +#define __ROCKSDB_DIRECT_CONTEXT_H__ + +#if 1 + +#include +#include +#include +#include +#include +#include + +static const uint16_t sgMagicNum = 12345; // global magic number + +// operator must be matched with DTC with the same order +enum class CondOpr : uint8_t +{ + eEQ, // == 0 + eNE, // != 1 + eLT, // < 2 + eLE, // <= 3 + eGT, // > 4 + eGE // >= 5 +}; + +bool operator==(const CondOpr lc, const CondOpr rc) +{ + return (int)lc == (int)rc; +} + +#pragma pack(push, 1) +struct QueryCond +{ + uint8_t sFieldIndex; + uint8_t sCondOpr; // CondOpr + // uint8_t sCondValueLen; + // char sCondValue[]; + std::string sCondValue; + +private: + int binary_size() + { + static int fixHeaderLen = sizeof(sFieldIndex) + sizeof(sCondOpr) + sizeof(int) /* value len */; + return fixHeaderLen + sCondValue.length(); + } + + void serialize_to(char *&data) + { + *(uint8_t *)data = sFieldIndex; + data += sizeof(uint8_t); + + *(uint8_t *)data = sCondOpr; + data += sizeof(uint8_t); + + *(int *)data = sCondValue.length(); + data += sizeof(int); + + memmove((void *)data, (void *)sCondValue.c_str(), sCondValue.length()); + data += sCondValue.length(); + } + + void serialize_from(const char *data, int &condLen) + { + const char *begPos = data; + + sFieldIndex = *(uint8_t *)data; + data += sizeof(uint8_t); + + sCondOpr = *(uint8_t *)data; + data += sizeof(uint8_t); + + int len = *(int *)data; + data += sizeof(int); + + sCondValue.assign(data, len); + condLen = data - begPos + len; + } + + friend class DirectRequestContext; +}; + +struct LimitCond +{ + int sLimitStart = -1; + int sLimitStep = -1; + + void reset() { sLimitStart = -1, sLimitStep = -1; } +}; + +struct DirectRequestContext +{ + uint16_t sMagicNum; + uint64_t sSequenceId; + // uint8_t sCondValueNum; + // char sContextValue[]; + std::vector sFieldConds; + std::vector> sOrderbyFields; + LimitCond sLimitCond; + + void reset() + { + sMagicNum = 0; + sSequenceId = 0; + sFieldConds.clear(); + sLimitCond.reset(); + } + + // binary format size for transporting in + int binary_size() + { + static int fixHeaderLen = sizeof(sMagicNum) + sizeof(sSequenceId) + sizeof(uint8_t) * 2; + + int len = fixHeaderLen; + for (size_t idx = 0; idx < sFieldConds.size(); idx++) + { + len += sFieldConds[idx].binary_size(); + } + + for (size_t idx = 0; idx < sOrderbyFields.size(); idx++) + { + len += (sizeof(int) + sizeof(bool)); + } + len += sizeof(LimitCond); + + return len; + } + + // before call this function, should call 'binary_size' to evaluate the size of the struct + void serialize_to(char *data, int len) + { + *(uint16_t *)data = sMagicNum; + data += sizeof(uint16_t); + + *(uint64_t *)data = sSequenceId; + data += sizeof(uint64_t); + + *(uint8_t *)data = sFieldConds.size(); + data += sizeof(uint8_t); + for (size_t idx = 0; idx < sFieldConds.size(); idx++) + { + sFieldConds[idx].serialize_to(data); + } + + *(uint8_t *)data = sOrderbyFields.size(); + data += sizeof(uint8_t); + for (size_t idx = 0; idx < sOrderbyFields.size(); idx++) + { + *(int *)data = sOrderbyFields[idx].first; + data += sizeof(int); + + *(bool *)data = sOrderbyFields[idx].second; + data += sizeof(bool); + } + + memmove((void *)data, (void *)&sLimitCond, sizeof(LimitCond)); + data += sizeof(LimitCond); + } + + void serialize_from(const char *data, int dataLen) + { + sMagicNum = *(uint16_t *)data; + data += sizeof(uint16_t); + dataLen -= sizeof(uint16_t); + + sSequenceId = *(uint64_t *)data; + data += sizeof(uint64_t); + dataLen -= sizeof(uint64_t); + + uint8_t condNum = *(uint8_t *)data; + data += sizeof(uint8_t); + dataLen -= sizeof(uint8_t); + + QueryCond cond; + int condLen = 0; + for (uint8_t idx = 0; idx < condNum; idx++) + { + cond.serialize_from(data, condLen); + data += condLen; + dataLen -= condLen; + + sFieldConds.push_back(cond); + } + + std::pair orPair; + uint8_t orderNum = *(uint8_t *)data; + data += sizeof(uint8_t); + dataLen -= sizeof(uint8_t); + for (uint8_t idx = 0; idx < orderNum; idx++) + { + orPair.first = *(int *)data; + data += sizeof(int); + dataLen -= sizeof(int); + + orPair.second = *(bool *)data; + data += sizeof(bool); + dataLen -= sizeof(bool); + + sOrderbyFields.push_back(orPair); + } + + memmove((void *)&sLimitCond, (void *)data, sizeof(LimitCond)); + dataLen -= sizeof(LimitCond); + + assert(dataLen == 0); + } +}; + +struct DirectResponseContext +{ + uint16_t sMagicNum; + uint64_t sSequenceId; + int16_t sRowNums; // number of matched rows or errno + std::deque sRowValues; + // char* sRowValues; + + void serialize_to(std::string &data) + { + static int headerLen = sizeof(uint16_t) + sizeof(uint64_t) + sizeof(int16_t); + + data.clear(); + data = std::move(std::string((char *)this, headerLen)); + + for (size_t idx = 0; idx < sRowNums; idx++) + { + data.append(sRowValues.front()); + sRowValues.pop_front(); + } + } + + void free() + { + sMagicNum = 0; + sSequenceId = 0; + sRowNums = -1; + sRowValues.clear(); + } +}; +#pragma pack(pop) + +#endif + +#endif // __ROCKSDB_DIRECT_CONTEXT_H__ diff --git a/src/search_local/index_write/add_request_proc.cc b/src/search_local/index_write/add_request_proc.cc new file mode 100644 index 0000000..b9817e5 --- /dev/null +++ b/src/search_local/index_write/add_request_proc.cc @@ -0,0 +1,529 @@ +/* + * ===================================================================================== + * + * Filename: add_request_proc.cc + * + * Description: AddReqProc class definition. + * + * Version: 1.0 + * Created: 09/08/2020 10:02:05 PM + * Revision: none + * Compiler: gcc + * + * Author: shrewdlin, linjinming@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ + +#include "add_request_proc.h" +#include "index_tbl_op.h" +#include "geohash.h" +#include "split_manager.h" +#include +#include +#include +#include + +AddReqProc::AddReqProc(){ +} + +AddReqProc::AddReqProc(const Json::Value& jf, InsertParam& insert_param){ + doc_version = insert_param.doc_version; + trans_version = insert_param.trans_version; + app_id = insert_param.appid; + doc_id = insert_param.doc_id; + json_field = jf; +} + +AddReqProc::~AddReqProc(){ +} + +void AddReqProc::do_stat_word_freq(vector > &strss, map &word_map, string extend) { + string word; + uint32_t id = 0; + ostringstream oss; + vector >::iterator iters = strss.begin(); + uint32_t index = 0; + + for(;iters != strss.end(); iters++){ + index++; + vector::iterator iter = iters->begin(); + + log_debug("start do_stat_word_freq, appid = %u\n",app_id); + for (; iter != iters->end(); iter++) { + + word = *iter; + if (!SplitManager::Instance()->wordValid(word, app_id, id)){ + continue; + } + if (word_map.find(word) == word_map.end()) { + item it; + it.doc_id = doc_id; + it.freq = 1; + it.extend = extend; + it.indexs.push_back(index); + word_map.insert(make_pair(word, it)); + } + else { + word_map[word].freq++; + word_map[word].indexs.push_back(index); + } + + oss << (*iter) << "|"; + } + } + log_debug("split: %s",oss.str().c_str()); +} + +void AddReqProc::do_stat_word_freq(vector &strss, map &word_map) { + string word; + vector::iterator iters = strss.begin(); + uint32_t index = 0; + + for (; iters != strss.end(); iters++) { + index++; + word = *iters; + if (word_map.find(word) == word_map.end()) { + item it; + it.doc_id = doc_id; + it.freq = 1; + it.indexs.push_back(index); + word_map.insert(make_pair(word, it)); + } + else { + word_map[word].freq++; + word_map[word].indexs.push_back(index); + } + } +} + +int AddReqProc::deal_index_tag(struct table_info *tbinfo, string field_name){ + int ret =0; + map word_map; + vector > split_content; + switch(tbinfo->field_type){ + case FIELD_STRING: + case FIELD_TEXT: + if(json_field[field_name].isString()){ + if (tbinfo->segment_tag == SEGMENT_NGRAM) { // NGram split mode + vector ngram_content = SplitManager::Instance()->split(json_field[field_name].asString()); + do_stat_word_freq(ngram_content, word_map); + } + else if (tbinfo->segment_tag == SEGMENT_CHINESE || tbinfo->segment_tag == SEGMENT_ENGLISH) { // use intelligent_info + string str = json_field[field_name].asString(); + // segment_tag为3对应的字段内容必须为全中文,为4对应的的字段不能包含中文 + if (tbinfo->segment_tag == SEGMENT_CHINESE && allChinese(str) == false) { + log_error("segment_tag is 3, the content[%s] must be Chinese.", str.c_str()); + return RT_ERROR_FIELD_FORMAT; + } + if (tbinfo->segment_tag == SEGMENT_ENGLISH && noChinese(str) == false) { + log_error("segment_tag is 4, the content[%s] can not contain Chinese.", str.c_str()); + return RT_ERROR_FIELD_FORMAT; + } + item it; + it.doc_id = doc_id; + it.freq = 1; + if(tbinfo->segment_feature == SEGMENT_FEATURE_SNAPSHOT){ + Json::FastWriter ex_writer; + it.extend = ex_writer.write(snapshot_content); + } + word_map.insert(make_pair(str, it)); + vector info; + bool flag = false; + get_intelligent(str, info, flag); + if (flag) { + stringstream ss; + ss << app_id << "#" << tbinfo->field_value; + ret = g_hanpinIndexInstance.do_insert_intelligent(ss.str(), doc_id, str, info, doc_version); + if(0 != ret){ + roll_back(); + return ret; + } + intelligent_keys.push_back(ss.str()); + } + } + else { + split_content = SplitManager::Instance()->split(json_field[field_name].asString(), app_id); + string extend = ""; + if(tbinfo->segment_feature == SEGMENT_FEATURE_SNAPSHOT){ + Json::FastWriter ex_writer; + extend = ex_writer.write(snapshot_content); + } + do_stat_word_freq(split_content, word_map, extend); + split_content.clear(); + } + ret = g_IndexInstance.do_insert_index(word_map, app_id, doc_version, tbinfo->field_value, docid_index_map); + if (0 != ret) { + roll_back(); + return ret; + } + word_map.clear(); + }else{ + log_error("field type error, not FIELD_STRING."); + return RT_ERROR_FIELD_FORMAT; + } + break; + case FIELD_INT: + if(json_field[field_name].isInt()){ + int ret; + struct item it; + it.doc_id = doc_id; + it.freq = 0; + string key = ""; + if(tbinfo->segment_tag == SEGMENT_RANGE){ // 范围查的字段将key补全到20位 + stringstream ss; + ss << setw(20) << setfill('0') << json_field[field_name].asInt(); + key = gen_dtc_key_string(app_id, "00", ss.str()); + } else { + key = gen_dtc_key_string(app_id, "00", (uint32_t)json_field[field_name].asInt()); + } + ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); + if(ret != 0){ + roll_back(); + return RT_ERROR_INSERT_INDEX_DTC; + } + }else{ + log_error("field type error, not FIELD_INT."); + return RT_ERROR_FIELD_FORMAT; + } + break; + case FIELD_LONG: + if(json_field[field_name].isInt64()){ + struct item it; + it.doc_id = doc_id; + it.freq = 0; + string key = gen_dtc_key_string(app_id, "00", (int64_t)json_field[field_name].asInt64()); + int ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); + if(0 != ret){ + roll_back(); + log_error("insert_index_dtc error, appid[%d], key[%s]", app_id, key.c_str()); + return RT_ERROR_INSERT_INDEX_DTC; + } + } else { + log_error("field type error, not FIELD_LONG."); + return RT_ERROR_FIELD_FORMAT; + } + break; + case FIELD_DOUBLE: + if(json_field[field_name].isDouble()){ + struct item it; + it.doc_id = doc_id; + it.freq = 0; + string key = gen_dtc_key_string(app_id, "00", (double)json_field[field_name].asDouble()); + int ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); + if(0 != ret){ + roll_back(); + log_error("insert_index_dtc error, appid[%d], key[%s]", app_id, key.c_str()); + return RT_ERROR_INSERT_INDEX_DTC; + } + } else { + log_error("field type error, not FIELD_DOUBLE."); + return RT_ERROR_FIELD_FORMAT; + } + break; + case FIELD_IP: + uint32_t s; + int ret; + if(json_field[field_name].isString()){ + ret = inet_pton(AF_INET, json_field[field_name].asString().c_str(), (void *)&s); + if(ret == 0){ + log_error("ip format is error\n"); + return RT_ERROR_FIELD_FORMAT; + } + struct item it; + it.doc_id = doc_id; + it.freq = 0; + string key = gen_dtc_key_string(app_id, "00", ntohl(s)); + ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); + if(ret != 0){ + roll_back(); + return RT_ERROR_INSERT_INDEX_DTC; + } + }else{ + return RT_ERROR_FIELD_FORMAT; + } + break; + case FIELD_LNG: + if(json_field[field_name].isString()){ + lng = json_field[field_name].asString(); + }else{ + return RT_ERROR_FIELD_FORMAT; + } + break; + case FIELD_LAT: + if(json_field[field_name].isString()){ + lat = json_field[field_name].asString(); + }else{ + return RT_ERROR_FIELD_FORMAT; + } + break; + case FIELD_LNG_ARRAY: + if(json_field[field_name].isArray()){ + Json::Value lngs = json_field[field_name]; + for (uint32_t lng_idx = 0; lng_idx < lngs.size(); ++lng_idx) { + if (lngs[lng_idx].isString()){ + lng_arr.push_back(lngs[lng_idx].asString()); + } else { + log_error("longitude must be string"); + return RT_ERROR_FIELD_FORMAT; + } + } + }else{ + log_error("FIELD_LNG_ARRAY must be array"); + return RT_ERROR_FIELD_FORMAT; + } + break; + case FIELD_LAT_ARRAY: + if(json_field[field_name].isArray()){ + Json::Value lats = json_field[field_name]; + for (uint32_t lat_idx = 0; lat_idx < lats.size(); ++lat_idx) { + if (lats[lat_idx].isString()){ + lat_arr.push_back(lats[lat_idx].asString()); + } else { + log_error("latitude must be string"); + return RT_ERROR_FIELD_FORMAT; + } + } + }else{ + log_error("FIELD_LAT_ARRAY must be array"); + return RT_ERROR_FIELD_FORMAT; + } + break; + case FIELD_WKT: + if(json_field[field_name].isString()){ + string str = json_field[field_name].asString(); + str = delPrefix(str); + vector str_vec = splitEx(str, ","); + for(uint32_t str_vec_idx = 0; str_vec_idx < str_vec.size(); str_vec_idx++){ + string wkt_str = trim(str_vec[str_vec_idx]); + vector wkt_vec = splitEx(wkt_str, " "); + if(wkt_vec.size() == 2){ + lng_arr.push_back(wkt_vec[0]); + lat_arr.push_back(wkt_vec[1]); + } + } + } else { + log_error("FIELD_WKT must be string"); + return RT_ERROR_FIELD_FORMAT; + } + break; + default: + break; + } + return 0; +} + +int AddReqProc::do_insert_index(UserTableContent& content_fields){ + int ret = 0; + Json::Value::Members member = json_field.getMemberNames(); + Json::Value::Members::iterator iter = member.begin(); + for(; iter != member.end(); ++iter) + { + string field_name = *iter; + struct table_info *tbinfo = NULL; + tbinfo = SplitManager::Instance()->get_table_info(app_id, field_name); + if(tbinfo == NULL){ + continue; + } + if(tbinfo->snapshot_tag == 1){ //snapshot + if(tbinfo->field_type == 1 && json_field[field_name].isInt()){ + snapshot_content[field_name] = json_field[field_name].asInt(); + }else if(tbinfo->field_type > 1 && json_field[field_name].isString()){ + snapshot_content[field_name] = json_field[field_name].asString(); + }else if(tbinfo->field_type > 1 && json_field[field_name].isDouble()){ + snapshot_content[field_name] = json_field[field_name].asDouble(); + }else if(tbinfo->field_type > 1 && json_field[field_name].isInt64()){ + snapshot_content[field_name] = json_field[field_name].asInt64(); + }else if(tbinfo->field_type > 1 && json_field[field_name].isArray()){ + snapshot_content[field_name] = json_field[field_name]; + } + } + } + for(iter = member.begin(); iter != member.end(); ++iter) + { + string field_name = *iter; + struct table_info *tbinfo = NULL; + tbinfo = SplitManager::Instance()->get_table_info(app_id, field_name); + if(tbinfo == NULL){ + continue; + } + if(tbinfo->index_tag == 1){ + ret = deal_index_tag(tbinfo, field_name); + if(0 != ret){ + log_error("deal index tag process error, ret: %d", ret); + roll_back(); + return ret; + } + } + } + if(lng.length() != 0 && lat.length() != 0){ + struct table_info *tbinfo = NULL; + tbinfo = SplitManager::Instance()->get_table_info(app_id, "gis"); + if(tbinfo == NULL){ + roll_back(); + return RT_NO_GIS_DEFINE; + } + + string gisid = encode(atof(lat.c_str()), atof(lng.c_str()), 6); + log_debug("gis code = %s",gisid.c_str()); + int ret; + uint64_t id = 0; + struct item it; + it.doc_id = doc_id; + it.freq = 0; + Json::FastWriter gis_writer; + it.extend = gis_writer.write(snapshot_content); + string key = gen_dtc_key_string(app_id, "00", gisid); + ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); + if(ret != 0){ + roll_back(); + return RT_ERROR_INSERT_INDEX_DTC; + } + log_debug("id = %llu,doc_vesion = %d,docid = %s\n",(long long unsigned int)id,doc_version,it.doc_id.c_str()); + } + log_debug("lng_arr size: %d, lat_arr size: %d", (int)lng_arr.size(), (int)lat_arr.size()); + if(lng_arr.size() > 0 && lat_arr.size() > 0){ + if(lng_arr.size() != lat_arr.size()){ + log_error("lng_arr size not equal with lat_arr size"); + return RT_ERROR_FIELD_FORMAT; + } + set gis_set; + for(uint32_t arr_idx = 0; arr_idx < lng_arr.size(); arr_idx++){ + string tmp_lng = lng_arr[arr_idx]; + string tmp_lat = lat_arr[arr_idx]; + struct table_info *tbinfo = NULL; + tbinfo = SplitManager::Instance()->get_table_info(app_id, "gis"); + if(tbinfo == NULL){ + roll_back(); + log_error("gis field not defined"); + return RT_NO_GIS_DEFINE; + } + string gisid = encode(atof(tmp_lat.c_str()), atof(tmp_lng.c_str()), 6); + if(gis_set.find(gisid) != gis_set.end()){ + continue; + } + gis_set.insert(gisid); + struct item it; + it.doc_id = doc_id; + it.freq = 0; + Json::FastWriter gis_writer; + it.extend = gis_writer.write(snapshot_content); + string key = gen_dtc_key_string(app_id, "00", gisid); + int ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map); + if(ret != 0){ + roll_back(); + return RT_ERROR_INSERT_INDEX_DTC; + } + log_debug("gis code = %s,doc_vesion = %d,docid = %s\n",gisid.c_str(),doc_version,it.doc_id.c_str()); + } + } + + vector union_key_vec; + SplitManager::Instance()->getUnionKeyField(app_id, union_key_vec); + vector::iterator union_key_iter = union_key_vec.begin(); + for(; union_key_iter != union_key_vec.end(); union_key_iter++){ + string union_key = *union_key_iter; + vector union_field_vec = splitInt(union_key, ","); + vector::iterator union_field_iter = union_field_vec.begin(); + vector > keys_vvec; + for(; union_field_iter != union_field_vec.end(); union_field_iter++){ + int union_field_value = *union_field_iter; + if(union_field_value >= (int)docid_index_map.size()){ + log_error("appid[%d] field[%d] is invalid", app_id, *union_field_iter); + break; + } + vector key_vec; + if(!docid_index_map[union_field_value].isArray()){ + log_debug("doc_id[%s] union_field_value[%d] has no keys", doc_id.c_str(), union_field_value); + break; + } + for (int key_index = 0; key_index < (int)docid_index_map[union_field_value].size(); key_index++){ + if(docid_index_map[union_field_value][key_index].isString()){ + string union_index_key = docid_index_map[union_field_value][key_index].asString(); + if(union_index_key.size() > 9){ // 倒排key的格式为:10061#00#折扣,这里只取第二个#后面的内容 + key_vec.push_back(union_index_key.substr(9)); + } + } + } + keys_vvec.push_back(key_vec); + } + if(keys_vvec.size() != union_field_vec.size()){ + log_debug("keys_vvec.size not equal union_field_vec.size"); + break; + } + vector union_keys = combination(keys_vvec); + for(int m = 0 ; m < (int)union_keys.size(); m++){ + ret = g_IndexInstance.insert_union_index_dtc(union_keys[m], doc_id, app_id, doc_version); + if(ret != 0){ + log_error("insert union key[%s] error", union_keys[m].c_str()); + } + } + } + + Json::FastWriter writer; + content_fields.content = writer.write(snapshot_content); + Json::FastWriter doc_index_writer; + string doc_index_map_string = doc_index_writer.write(docid_index_map); + if(doc_version != 1){//need update + map > index_res; + g_IndexInstance.GetIndexData(gen_dtc_key_string(content_fields.appid, "20", doc_id), doc_version - 1, index_res); + map >::iterator map_iter = index_res.begin(); + for(; map_iter != index_res.end(); map_iter++){ + uint32_t field = map_iter->first; + vector words = map_iter->second; + for(int i = 0; i < (int)words.size(); i++){ + DeleteTask::GetInstance().RegisterInfo(words[i], doc_id, doc_version - 1, field); + } + } + + int affected_rows = 0; + ret = g_IndexInstance.update_sanpshot_dtc(content_fields, doc_version, trans_version, affected_rows); + if(ret != 0 || affected_rows == 0){ + log_error("update_sanpshot_dtc error, roll back, ret: %d, affected_rows: %d.", ret, affected_rows); + roll_back(); + return RT_ERROR_UPDATE_SNAPSHOT; + } + g_IndexInstance.update_docid_index_dtc(doc_index_map_string, doc_id, app_id, doc_version); + }else{ + int affected_rows = 0; + ret = g_IndexInstance.update_sanpshot_dtc(content_fields, doc_version, trans_version, affected_rows); + if(ret != 0 || affected_rows == 0){ + log_error("update_sanpshot_dtc error, roll back, ret: %d, affected_rows: %d.", ret, affected_rows); + roll_back(); + return RT_ERROR_UPDATE_SNAPSHOT; + } + g_IndexInstance.insert_docid_index_dtc(doc_index_map_string, doc_id, app_id, doc_version); + } + return 0; +} + +int AddReqProc::roll_back(){ + // 删除hanpin_index + for(int i = 0; i < (int)intelligent_keys.size(); i++){ + g_hanpinIndexInstance.delete_intelligent(intelligent_keys[i], doc_id, trans_version); + } + + // 删除keyword_index + if(docid_index_map.isArray()){ + for(int i = 0;i < (int)docid_index_map.size();i++){ + Json::Value info = docid_index_map[i]; + if(info.isArray()){ + for(int j = 0;j < (int)info.size();j++){ + if(info[j].isString()){ + string key = info[j].asString(); + g_IndexInstance.delete_index(key, doc_id, trans_version, i); + } + } + } + } + } + // 如果trans_version=1,删除快照,否则更新快照的trans_version=trans_version-1 + Json::Value res; + if(trans_version == 1){ + g_IndexInstance.delete_snapshot_dtc(doc_id, app_id, res); + } else { + g_IndexInstance.update_sanpshot_dtc(app_id, doc_id, trans_version); + } + return 0; +} \ No newline at end of file diff --git a/src/search_local/index_write/add_request_proc.h b/src/search_local/index_write/add_request_proc.h new file mode 100644 index 0000000..68512e7 --- /dev/null +++ b/src/search_local/index_write/add_request_proc.h @@ -0,0 +1,58 @@ +/* + * ===================================================================================== + * + * Filename: add_request_proc.h + * + * Description: AddReqProc class definition. + * + * Version: 1.0 + * Created: 09/08/2020 10:02:05 PM + * Revision: none + * Compiler: gcc + * + * Author: shrewdlin, linjinming@jd.com + * Company: JD.com, Inc. + * + * ===================================================================================== + */ + +#ifndef ADD_REQUEST_PROC_H +#define ADD_REQUEST_PROC_H + +#include "log.h" +#include "json/json.h" +#include "comm.h" + +class UserTableContent; +class SplitManager; +class AddReqProc +{ +public: + AddReqProc(); + AddReqProc(const Json::Value& jf, InsertParam& insert_param); + ~AddReqProc(); + + int do_insert_index(UserTableContent& content_fields); + +private: + void do_stat_word_freq(vector > &strss, map &word_map, string extend); + void do_stat_word_freq(vector &strss, map &word_map); + int deal_index_tag(struct table_info *tbinfo, string field_name); + int roll_back(); + +private: + Json::Value json_field; + uint32_t app_id; + uint32_t doc_version; + uint32_t trans_version; + string doc_id; + string lng; + string lat; + vector lng_arr; + vector lat_arr; + vector intelligent_keys; + Json::Value snapshot_content; + Json::Value docid_index_map; +}; + +#endif \ No newline at end of file