add travis

This commit is contained in:
shzhulin3 2021-09-17 10:57:34 +08:00
parent 3786f0822a
commit 6ca560169a
11 changed files with 2770 additions and 0 deletions

34
.travis.yml Normal file
View File

@ -0,0 +1,34 @@
compiler:
- g++
os:
- linux
addons:
apt:
packages:
- g++-4.8.5
env:
- ISEARCH_EVAL="CC=gcc-4.8.5 && CXX=g++-4.8.5"
before_install:
- eval "${ISEARCH_EVAL}"
install:
- echo ${CC}
- ${CC} --version
- echo ${CXX}
- ${CXX} --version
- cmake --version
- sudo apt-get install snappy libsnappy-dev zlib1g zlib1g-dev bzip2 liblz4-dev libasan0 openssl libmxml-dev
script:
- cmake .
- make
after_success:
- cp src/search_agent/bin/search_agent dockerfiles/agent/
- cp resource/search_agent/conf/sa.conf dockerfiles/agent/
- cd dockerfiles/agent
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin
- docker build -t $DOCKER_USERNAME/search_agent:latest .
- docker push $DOCKER_USERNAME/search_agent:latest

View File

@ -0,0 +1,12 @@
FROM centos:centos7.2.1511
ARG basepath=/usr/local/isearch/search_agent
RUN mkdir -p $basepath/bin
RUN mkdir -p $basepath/conf
RUN mkdir -p $basepath/log
COPY search_agent $basepath/bin/search_agent
COPY sa.conf $basepath/conf/sa.conf
CMD /usr/local/isearch/search_agent/bin/search_agent -d -c /usr/local/isearch/search_agent/conf/sa.conf -v 3

View File

@ -0,0 +1,725 @@
/*
* =====================================================================================
*
* Filename: component.h
*
* Description: component class definition.
*
* Version: 1.0
* Created: 09/08/2019
* Revision: none
* Compiler: gcc
*
* Author: zhulin, shzhulin3@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#include "component.h"
#include "split_manager.h"
#include "db_manager.h"
#include "utf8_str.h"
#include <sstream>
using namespace std;
Component::Component(){
SGlobalConfig &global_cfg = SearchConf::Instance()->GetGlobalConfig();
m_default_query = global_cfg.sDefaultQuery;
m_jdq_switch = global_cfg.iJdqSwitch;
m_page_index = 0;
m_page_size = 0;
m_return_all = 0;
m_cache_switch = 0;
m_top_switch = 0;
m_snapshot_switch = 0;
m_sort_type = SORT_RELEVANCE;
m_appid = 10001;
m_user_id = "0";
m_last_id = "";
m_last_score = "";
m_search_after = false;
distance = 0;
m_terminal_tag = 0;
m_terminal_tag_valid = false;
}
Component::~Component(){
}
int Component::ParseJson(const char *sz_json, int json_len, Json::Value &recv_packet)
{
Json::Reader r(Json::Features::strictMode());
int ret;
ret = r.parse(sz_json, sz_json + json_len, recv_packet);
if (0 == ret)
{
log_error("the err json string is : %s", sz_json);
log_error("parse json error , errmsg : %s", r.getFormattedErrorMessages().c_str());
return -RT_PARSE_JSON_ERR;
}
if (recv_packet.isMember("appid") && recv_packet["appid"].isUInt())
{
m_appid = recv_packet["appid"].asUInt();
}
else {
m_appid = 10001;
}
if (recv_packet.isMember("userid") && recv_packet["userid"].isString())
{
m_user_id = recv_packet["userid"].asString();
}
else {
m_user_id = "0";
}
if (recv_packet.isMember("key") && recv_packet["key"].isString())
{
m_Data = recv_packet["key"].asString();
}
else {
m_Data = "";
}
if (recv_packet.isMember("key_and") && recv_packet["key_and"].isString())
{
m_Data_and = recv_packet["key_and"].asString();
}
else {
m_Data_and = "";
}
if (recv_packet.isMember("key_invert") && recv_packet["key_invert"].isString())
{
m_Data_invert = recv_packet["key_invert"].asString();
}
else {
m_Data_invert = "";
}
if (recv_packet.isMember("key_complete") && recv_packet["key_complete"].isString())
{
m_Data_complete = recv_packet["key_complete"].asString();
}
else {
m_Data_complete = "";
}
if (recv_packet.isMember("page_index") && recv_packet["page_index"].isString())
{
m_page_index = atoi(recv_packet["page_index"].asString().c_str());
}
else {
m_page_index = 1 ;
}
if (recv_packet.isMember("page_size") && recv_packet["page_size"].isString())
{
m_page_size = atoi(recv_packet["page_size"].asString().c_str());
}
else {
m_page_size = 10;
}
if(recv_packet.isMember("sort_type") && recv_packet["sort_type"].isString())
{
m_sort_type = atoi(recv_packet["sort_type"].asString().c_str());
}
else {
m_sort_type = SORT_RELEVANCE;
}
if(recv_packet.isMember("sort_field") && recv_packet["sort_field"].isString())
{
m_sort_field = recv_packet["sort_field"].asString();
}
else {
m_sort_field = "";
}
if (recv_packet.isMember("return_all") && recv_packet["return_all"].isString())
{
m_return_all = atoi(recv_packet["return_all"].asString().c_str());
}
else {
m_return_all = 0;
}
if(recv_packet.isMember("fields") && recv_packet["fields"].isString())
{
string fields = recv_packet["fields"].asString();
m_fields = splitEx(fields, ",");
}
if (recv_packet.isMember("terminal_tag") && recv_packet["terminal_tag"].isString())
{
m_terminal_tag = atoi(recv_packet["terminal_tag"].asString().c_str());
}
else {
m_terminal_tag = 0;
}
if(m_terminal_tag == 1){
if(m_Data_and == "" || m_Data != "" || m_Data_invert != ""){
log_error("terminal_tag is true, only key_and is available.");
return -RT_PARSE_JSON_ERR;
}
}
if(recv_packet.isMember("last_id") && recv_packet["last_id"].isString())
{
m_last_id = recv_packet["last_id"].asString();
}
else {
m_last_id = "";
}
bool score_flag = true;
if (recv_packet.isMember("last_score") && recv_packet["last_score"].isString())
{
m_last_score = recv_packet["last_score"].asString();
}
else {
score_flag = false;
m_last_score = "0";
}
if(m_last_id != "" && score_flag == true){
m_search_after = true;
}
if(m_search_after == true && m_sort_type != SORT_FIELD_DESC && m_sort_type != SORT_FIELD_ASC){
log_error("in search_after mode, sort_type must be SORT_FIELD_DESC or SORT_FIELD_ASC.");
return -RT_PARSE_JSON_ERR;
}
if ("" == m_Data && "" == m_Data_and && "" == m_Data_complete) {
m_Data = m_default_query;
}
log_debug("parse success, m_Data: %s, m_Data_and: %s, m_Data_invert: %s, m_page_index: %u, m_return_all: %u",
m_Data.c_str(), m_Data_and.c_str(), m_Data_invert.c_str(), m_page_index, m_return_all);
return 0;
}
void Component::InitSwitch()
{
AppInfo app_info;
bool res = SearchConf::Instance()->GetAppInfo(m_appid, app_info);
if (true == res){
m_cache_switch = app_info.cache_switch;
m_top_switch = app_info.top_switch;
m_snapshot_switch = app_info.snapshot_switch;
}
}
void Component::GetQueryWord(uint32_t &m_has_gis){
GetFieldWords(MAINKEY, m_Data, m_appid, m_has_gis);
GetFieldWords(ANDKEY, m_Data_and, m_appid, m_has_gis);
GetFieldWords(INVERTKEY, m_Data_invert, m_appid, m_has_gis);
}
void Component::GetFieldWords(int type, string dataStr, uint32_t appid, uint32_t &m_has_gis){
if (dataStr == "")
return ;
string latitude_tmp = "";
string longitude_tmp = "";
string gisip_tmp = "";
string field_Data = "";
string primary_Data = "";
vector<FieldInfo> joinFieldInfos;
int i = dataStr.find(":");
if (i == -1) {
primary_Data = dataStr;
} else {
int j = dataStr.substr(0, i).rfind(" ");
if (j == -1) {
field_Data = dataStr;
primary_Data = "";
} else {
primary_Data = dataStr.substr(0, j);
field_Data = dataStr.substr(j+1);
}
}
if (type == 0) {
m_Query_Word = primary_Data;
}
if (primary_Data.length() > MAX_SEARCH_LEN) { // 超长进行截断
primary_Data = primary_Data.substr(0, MAX_SEARCH_LEN);
}
string probably_key = "";
bool is_correct = false;
if(primary_Data != "" && primary_Data.length() <= SINGLE_WORD_LEN) // 判断输入的词语是否正确,如果超过一定长度,则认为是多个词组成
{
JudgeWord(appid, primary_Data, is_correct, probably_key);
m_probably_data = probably_key;
}
vector<FieldInfo> primaryInfo;
FieldInfo pInfo;
string split_data;
if (is_correct == true) {
pInfo.field = INT_MAX;
pInfo.word = primary_Data;
primaryInfo.push_back(pInfo);
DataManager::Instance()->GetSynonymByKey(primary_Data, primaryInfo);
}
else if (probably_key != "") {
pInfo.word = probably_key;
primaryInfo.push_back(pInfo);
DataManager::Instance()->GetSynonymByKey(probably_key, primaryInfo);
}
else if (primary_Data != ""){
split_data = SplitManager::Instance()->split(primary_Data, appid);
log_debug("split_data: %s", split_data.c_str());
vector<string> split_datas = splitEx(split_data, "|");
for(size_t i = 0; i < split_datas.size(); i++) //是否有重复的同义词存在?
{
pInfo.field = INT_MAX;
pInfo.word = split_datas[i];
primaryInfo.push_back(pInfo);
DataManager::Instance()->GetSynonymByKey(split_datas[i], primaryInfo);
}
}
AddToFieldList(type, primaryInfo);
vector<string> gisCode;
vector<string> vec = splitEx(field_Data, " ");
vector<string>::iterator iter;
map<uint32_t, vector<FieldInfo> > field_keys_map;
uint32_t range_query = 0;
vector<string> lng_arr;
vector<string> lat_arr;
for (iter = vec.begin(); iter < vec.end(); iter++)
{
vector<FieldInfo> fieldInfos;
if ((*iter)[0] == '\0')
continue;
vector<string> tmp = splitEx(*iter, ":");
if (tmp.size() != 2)
continue;
uint32_t segment_tag = 0;
FieldInfo fieldInfo;
string fieldname = tmp[0];
uint32_t field = DBManager::Instance()->GetWordField(segment_tag, appid, fieldname, fieldInfo);
if(field != 0 && fieldInfo.index_tag == 0){
ExtraFilterKey extra_filter_key;
extra_filter_key.field_name = fieldname;
extra_filter_key.field_value = tmp[1];
extra_filter_key.field_type = fieldInfo.field_type;
if(type == 0){
extra_filter_keys.push_back(extra_filter_key);
} else if (type == 1) {
extra_filter_and_keys.push_back(extra_filter_key);
} else if (type == 2) {
extra_filter_invert_keys.push_back(extra_filter_key);
}
continue;
}
if (field != 0 && segment_tag == 1)
{
string split_data = SplitManager::Instance()->split(tmp[1], appid);
log_debug("split_data: %s", split_data.c_str());
vector<string> split_datas = splitEx(split_data, "|");
for(size_t index = 0; index < split_datas.size(); index++)
{
FieldInfo info;
info.field = fieldInfo.field;
info.field_type = fieldInfo.field_type;
info.word = split_datas[index];
info.segment_tag = fieldInfo.segment_tag;
fieldInfos.push_back(info);
}
}
else if (field != 0 && segment_tag == 5) {
range_query++;
string str = tmp[1];
str.erase(0, str.find_first_not_of(" "));
str.erase(str.find_last_not_of(" ") + 1);
if (str.size() == 0) {
log_error("field[%s] content is null", fieldname.c_str());
continue;
}
if (str[0] == '[') { // 范围查询
int l = str.find(",");
if (l == -1 || (str[str.size() - 1] != ']' && str[str.size() - 1] != ')')) {
log_error("field[%s] content[%s] invalid", fieldname.c_str(), str.c_str());
continue;
}
istringstream iss(str.substr(1, l).c_str());
iss >> fieldInfo.start;
string end_str = str.substr(l + 1, str.size() - l - 2);
end_str.erase(0, end_str.find_first_not_of(" "));
istringstream end_iss(end_str);
end_iss >> fieldInfo.end;
if (str[str.size() - 1] == ']') {
fieldInfo.range_type = RANGE_GELE;
}
else {
if (end_str.size() == 0) {
fieldInfo.range_type = RANGE_GE;
}
else {
fieldInfo.range_type = RANGE_GELT;
}
}
fieldInfos.push_back(fieldInfo);
}
else if (str[0] == '(') {
int l = str.find(",");
if (l == -1 || (str[str.size() - 1] != ']' && str[str.size() - 1] != ')')) {
log_error("field[%s] content[%s] invalid", fieldname.c_str(), str.c_str());
continue;
}
string start_str = str.substr(1, l).c_str();
string end_str = str.substr(l + 1, str.size() - l - 2);
start_str.erase(0, start_str.find_first_not_of(" "));
end_str.erase(0, end_str.find_first_not_of(" "));
istringstream start_iss(start_str);
start_iss >> fieldInfo.start;
istringstream end_iss(end_str);
end_iss >> fieldInfo.end;
if (str[str.size() - 1] == ']') {
if (start_str.size() == 0) {
fieldInfo.range_type = RANGE_LE;
}
else {
fieldInfo.range_type = RANGE_GTLE;
}
}
else {
if (start_str.size() != 0 && end_str.size() != 0) {
fieldInfo.range_type = RANGE_GTLT;
}
else if (start_str.size() == 0 && end_str.size() != 0) {
fieldInfo.range_type = RANGE_LT;
}
else if (start_str.size() != 0 && end_str.size() == 0) {
fieldInfo.range_type = RANGE_GT;
}
else {
log_error("field[%s] content[%s] invalid", fieldname.c_str(), str.c_str());
continue;
}
}
fieldInfos.push_back(fieldInfo);
}
else {
fieldInfo.word = tmp[1];
fieldInfos.push_back(fieldInfo);
}
log_debug("range_type: %d, start: %u, end: %u, segment_tag: %d, word: %s", fieldInfo.range_type, fieldInfo.start, fieldInfo.end, fieldInfo.segment_tag, fieldInfo.word.c_str());
}
else if (field != 0)
{
fieldInfo.word = tmp[1];
fieldInfos.push_back(fieldInfo);
}
else if (field == 0)
{
if (fieldInfo.field_type == 5) {
longitude_tmp = tmp[1];
longitude = longitude_tmp;
} else if (fieldInfo.field_type == 6) {
latitude_tmp = tmp[1];
latitude = latitude_tmp;
} else if (fieldInfo.field_type == 8) {
distance = strToDouble(tmp[1]);
} else if (fieldInfo.field_type == 7) {
gisip_tmp = tmp[1];
gisip = gisip_tmp;
} else if (fieldInfo.field_type == FIELD_WKT) {
string str = tmp[1];
str = delPrefix(str);
vector<string> str_vec = splitEx(str, ",");
for(uint32_t str_vec_idx = 0; str_vec_idx < str_vec.size(); str_vec_idx++){
string wkt_str = trim(str_vec[str_vec_idx]);
vector<string> wkt_vec = splitEx(wkt_str, "-");
if(wkt_vec.size() == 2){
lng_arr.push_back(wkt_vec[0]);
lat_arr.push_back(wkt_vec[1]);
}
}
}
}
if (fieldInfos.size() != 0) {
field_keys_map.insert(make_pair(fieldInfo.field, fieldInfos));
}
}
double distance_tmp = 2; // 不指定distance时最多返回2km内的数据
if(distance > 1e-6 && distance_tmp > distance + 1e-6){ // distance大于0小于2时取distance的值
distance_tmp = distance;
}
GetGisCode(longitude_tmp, latitude_tmp, gisip_tmp, distance_tmp, gisCode);
log_debug("lng_arr size: %d, lat_arr size: %d", (int)lng_arr.size(), (int)lat_arr.size());
if (gisCode.size() == 0 && lng_arr.size() > 0){
GetGisCode(lng_arr, lat_arr, gisCode);
}
if(gisCode.size() > 0){
vector<FieldInfo> fieldInfos;
uint32_t segment_tag = 0;
FieldInfo fieldInfo;
uint32_t field = DBManager::Instance()->GetWordField(segment_tag, appid, "gis", fieldInfo);
if (field != 0 && segment_tag == 0) {
m_has_gis = 1;
for (size_t index = 0; index < gisCode.size(); index++) {
FieldInfo info;
info.field = fieldInfo.field;
info.field_type = fieldInfo.field_type;
info.segment_tag = fieldInfo.segment_tag;
info.word = gisCode[index];
fieldInfos.push_back(info);
}
}
if (fieldInfos.size() != 0) {
field_keys_map.insert(make_pair(fieldInfo.field, fieldInfos));
}
}
//如果key_and查询的field匹配到联合索引则将查询词拼接起来作为新的查询词
if(type == 1){
vector<string> union_key_vec;
DBManager::Instance()->GetUnionKeyField(appid, union_key_vec);
vector<string>::iterator union_key_iter = union_key_vec.begin();
for(; union_key_iter != union_key_vec.end(); union_key_iter++){
string union_key = *union_key_iter;
vector<int> union_field_vec = splitInt(union_key, ",");
vector<int>::iterator union_field_iter = union_field_vec.begin();
bool hit_union_key = true;
for(; union_field_iter != union_field_vec.end(); union_field_iter++){
if(field_keys_map.find(*union_field_iter) == field_keys_map.end()){
hit_union_key = false;
break;
}
}
if(hit_union_key == true){
vector<vector<string> > keys_vvec;
vector<FieldInfo> unionFieldInfos;
for(union_field_iter = union_field_vec.begin(); union_field_iter != union_field_vec.end(); union_field_iter++){
vector<FieldInfo> field_info_vec = field_keys_map.at(*union_field_iter);
vector<string> key_vec;
GetKeyFromFieldInfo(field_info_vec, key_vec);
keys_vvec.push_back(key_vec);
field_keys_map.erase(*union_field_iter); // 命中union_key的需要从field_keys_map中删除
}
vector<string> union_keys = Combination(keys_vvec);
for(int m = 0 ; m < (int)union_keys.size(); m++){
FieldInfo info;
info.field = 0;
info.field_type = FIELD_STRING;
info.segment_tag = 1;
info.word = union_keys[m];
unionFieldInfos.push_back(info);
}
AddToFieldList(type, unionFieldInfos);
log_debug("hit union key index.");
break;
}
}
map<uint32_t, vector<FieldInfo> >::iterator field_key_map_iter = field_keys_map.begin();
for(; field_key_map_iter != field_keys_map.end(); field_key_map_iter++){
AddToFieldList(type, field_key_map_iter->second);
}
} else {
map<uint32_t, vector<FieldInfo> >::iterator field_key_map_iter = field_keys_map.begin();
for(; field_key_map_iter != field_keys_map.end(); field_key_map_iter++){
AddToFieldList(type, field_key_map_iter->second);
}
}
if(type == 1){ // terminal_tag为1时key_and中必须只带有一个范围查询
if(m_terminal_tag == 1 && range_query == 1 && and_keys.size() == 1){
m_terminal_tag_valid = true;
}
}
return ;
}
void Component::AddToFieldList(int type, vector<FieldInfo>& fields)
{
if (fields.size() == 0)
return ;
if (type == 0) {
keys.push_back(fields);
} else if (type == 1) {
and_keys.push_back(fields);
} else if (type == 2) {
invert_keys.push_back(fields);
}
return ;
}
const vector<vector<FieldInfo> >& Component::Keys(){
return keys;
}
const vector<vector<FieldInfo> >& Component::AndKeys(){
return and_keys;
}
const vector<vector<FieldInfo> >& Component::InvertKeys(){
return invert_keys;
}
const vector<ExtraFilterKey>& Component::ExtraFilterKeys(){
return extra_filter_keys;
}
const vector<ExtraFilterKey>& Component::ExtraFilterAndKeys(){
return extra_filter_and_keys;
}
const vector<ExtraFilterKey>& Component::ExtraFilterInvertKeys(){
return extra_filter_invert_keys;
}
string Component::QueryWord(){
return m_Query_Word;
}
void Component::SetQueryWord(string query_word){
m_Query_Word = query_word;
}
string Component::ProbablyData(){
return m_probably_data;
}
void Component::SetProbablyData(string probably_data){
m_probably_data = probably_data;
}
string Component::Latitude(){
return latitude;
}
string Component::Longitude(){
return longitude;
}
double Component::Distance(){
return distance;
}
string Component::Data(){
return m_Data;
}
uint32_t Component::JdqSwitch(){
return m_jdq_switch;
}
uint32_t Component::Appid(){
return m_appid;
}
string Component::DataAnd(){
return m_Data_and;
}
string Component::DataInvert(){
return m_Data_invert;
}
string Component::DataComplete(){
return m_Data_complete;
}
uint32_t Component::SortType(){
return m_sort_type;
}
uint32_t Component::PageIndex(){
return m_page_index;
}
uint32_t Component::PageSize(){
return m_page_size;
}
uint32_t Component::ReturnAll(){
return m_return_all;
}
uint32_t Component::CacheSwitch(){
return m_cache_switch;
}
uint32_t Component::TopSwitch(){
return m_top_switch;
}
uint32_t Component::SnapshotSwitch(){
return m_snapshot_switch;
}
string Component::SortField(){
return m_sort_field;
}
string Component::LastId(){
return m_last_id;
}
string Component::LastScore(){
return m_last_score;
}
bool Component::SearchAfter(){
return m_search_after;
}
vector<string>& Component::Fields(){
return m_fields;
}
uint32_t Component::TerminalTag(){
return m_terminal_tag;
}
bool Component::TerminalTagValid(){
return m_terminal_tag_valid;
}
void Component::GetKeyFromFieldInfo(const vector<FieldInfo>& field_info_vec, vector<string>& key_vec){
vector<FieldInfo>::const_iterator iter = field_info_vec.begin();
for(; iter != field_info_vec.end(); iter++){
key_vec.push_back((*iter).word);
}
}
/*
** vector每一维vector中取一个数的各种组合
** [[a],[b1,b2],[c1,c2,c3]]
** [a_b1_c1,a_b1_c2,a_b1_c3,a_b2_c1,a_b2_c2,a_b2_c3]
*/
vector<string> Component::Combination(vector<vector<string> > &dimensionalArr){
int FLength = dimensionalArr.size();
if(FLength >= 2){
int SLength1 = dimensionalArr[0].size();
int SLength2 = dimensionalArr[1].size();
int DLength = SLength1 * SLength2;
vector<string> temporary(DLength);
int index = 0;
for(int i = 0; i < SLength1; i++){
for (int j = 0; j < SLength2; j++) {
temporary[index] = dimensionalArr[0][i] +"_"+ dimensionalArr[1][j];
index++;
}
}
vector<vector<string> > new_arr;
new_arr.push_back(temporary);
for(int i = 2; i < (int)dimensionalArr.size(); i++){
new_arr.push_back(dimensionalArr[i]);
}
return Combination(new_arr);
} else {
return dimensionalArr[0];
}
}

View File

@ -0,0 +1,114 @@
/*
* =====================================================================================
*
* Filename: component.h
*
* Description: component class definition.
*
* Version: 1.0
* Created: 09/08/2019
* Revision: none
* Compiler: gcc
*
* Author: zhulin, shzhulin3@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#ifndef __COMPONENT_H__
#define __COMPONENT_H__
#include "comm.h"
#include "json/json.h"
#include <string>
#include <vector>
using namespace std;
class Component
{
public:
Component();
~Component();
void GetQueryWord(uint32_t &m_has_gis);
const vector<vector<FieldInfo> >& Keys();
const vector<vector<FieldInfo> >& AndKeys();
const vector<vector<FieldInfo> >& InvertKeys();
const vector<ExtraFilterKey>& ExtraFilterKeys();
const vector<ExtraFilterKey>& ExtraFilterAndKeys();
const vector<ExtraFilterKey>& ExtraFilterInvertKeys();
int ParseJson(const char *sz_json, int json_len, Json::Value &recv_packet);
void InitSwitch();
string QueryWord();
void SetQueryWord(string query_word);
string ProbablyData();
void SetProbablyData(string probably_data);
string Latitude();
string Longitude();
double Distance();
string Data();
string DataAnd();
string DataInvert();
string DataComplete();
uint32_t JdqSwitch();
uint32_t Appid();
uint32_t SortType();
uint32_t PageIndex();
uint32_t PageSize();
uint32_t ReturnAll();
uint32_t CacheSwitch();
uint32_t TopSwitch();
uint32_t SnapshotSwitch();
string SortField();
string LastId();
string LastScore();
bool SearchAfter();
vector<string>& Fields();
uint32_t TerminalTag();
bool TerminalTagValid();
private:
void GetFieldWords(int type, string dataStr, uint32_t appid, uint32_t &m_has_gis);
void AddToFieldList(int type, vector<FieldInfo>& fields);
void GetKeyFromFieldInfo(const vector<FieldInfo>& field_info_vec, vector<string>& key_vec);
vector<string> Combination(vector<vector<string> > &dimensionalArr);
private:
vector<vector<FieldInfo> > keys;
vector<vector<FieldInfo> > and_keys;
vector<vector<FieldInfo> > invert_keys;
vector<ExtraFilterKey> extra_filter_keys;
vector<ExtraFilterKey> extra_filter_and_keys;
vector<ExtraFilterKey> extra_filter_invert_keys;
string m_Query_Word;
string m_probably_data;
string latitude;
string longitude;
string gisip;
double distance;
string m_Data; //查询词
string m_Data_and; // 包含该查询词
string m_Data_invert; // 不包含该查询词
string m_Data_complete; // 完整关键词
uint32_t m_page_index;
uint32_t m_page_size;
uint32_t m_return_all;
uint32_t m_cache_switch;
uint32_t m_top_switch;
uint32_t m_snapshot_switch;
uint32_t m_sort_type;
uint32_t m_appid;
string m_user_id;
string m_sort_field;
string m_last_id;
string m_last_score;
bool m_search_after;
vector<string> m_fields;
string m_default_query;
uint32_t m_jdq_switch;
uint32_t m_terminal_tag;
bool m_terminal_tag_valid;
};
#endif

View File

@ -0,0 +1,254 @@
/*
* =====================================================================================
*
* Filename: rocksdb_direct_context.h
*
* Description: rocksdb direct context class definition.
*
* Version: 1.0
* Created: 09/08/2020
* Revision: none
* Compiler: gcc
*
* Author: zhulin, shzhulin3@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#ifndef __ROCKSDB_DIRECT_CONTEXT_H__
#define __ROCKSDB_DIRECT_CONTEXT_H__
#if 1
#include <stdint.h>
#include <string>
#include <vector>
#include <deque>
#include <assert.h>
#include <string.h>
static const uint16_t sgMagicNum = 12345; // global magic number
// operator must be matched with DTC with the same order
enum CondOpr
{
eEQ, // ==
eNE, // !=
eLT, // <
eLE, // <=
eGT, // >
eGE // >=
};
#pragma pack(push, 1)
struct QueryCond
{
uint8_t sFieldIndex;
uint8_t sCondOpr; // CondOpr
std::string sCondValue;
private:
int binarySize()
{
static int fixHeaderLen = sizeof(sFieldIndex) + sizeof(sCondOpr) + sizeof(int)/* value len */;
return fixHeaderLen + sCondValue.length();
}
void serializeTo(char* &data)
{
*(uint8_t*)data = sFieldIndex;
data += sizeof(uint8_t);
*(uint8_t*)data = sCondOpr;
data += sizeof(uint8_t);
*(int*)data = sCondValue.length();
data += sizeof(int);
memmove((void*)data, (void*)sCondValue.c_str(), sCondValue.length());
data += sCondValue.length();
}
void serializeFrom(const char* data, int& condLen)
{
const char *begPos = data;
sFieldIndex = *(uint8_t*)data;
data += sizeof(uint8_t);
sCondOpr = *(uint8_t*)data;
data += sizeof(uint8_t);
int len = *(int*)data;
data += sizeof(int);
sCondValue.assign(data, len);
condLen = data - begPos + len;
}
friend class DirectRequestContext;
};
struct LimitCond
{
int sLimitStart;
int sLimitStep;
LimitCond(){
sLimitStart = 0;
sLimitStep = 10;
}
void reset() { sLimitStart = -1, sLimitStep = -1; }
};
struct DirectRequestContext
{
uint16_t sMagicNum;
uint64_t sSequenceId;
std::vector<QueryCond> sFieldConds;
std::vector<std::pair<int, bool/* asc or not*/> > sOrderbyFields;
LimitCond sLimitCond;
void reset()
{
sMagicNum = 0;
sSequenceId = 0;
sFieldConds.clear();
sLimitCond.reset();
}
// binary format size for transporting in
int binarySize()
{
static int fixHeaderLen = sizeof(sMagicNum) + sizeof(sSequenceId) + sizeof(uint8_t) * 2;
int len = fixHeaderLen;
for ( size_t idx = 0; idx < sFieldConds.size(); idx++ )
{
len += sFieldConds[idx].binarySize();
}
for (size_t idx = 0; idx < sOrderbyFields.size(); idx++)
{
len += (sizeof(int) + sizeof(bool));
}
len += sizeof(LimitCond);
return len;
}
// before call this function, should call 'binarySize' to evaluate the size of the struct
void serializeTo(char *data, int len)
{
*(uint16_t*)data = sMagicNum;
data += sizeof(uint16_t);
*(uint64_t*)data = sSequenceId;
data += sizeof(uint64_t);
*(uint8_t*)data = sFieldConds.size();
data += sizeof(uint8_t);
for ( size_t idx = 0; idx < sFieldConds.size(); idx++ )
{
sFieldConds[idx].serializeTo(data);
}
*(uint8_t*)data = sOrderbyFields.size();
data += sizeof(uint8_t);
for ( size_t idx = 0; idx < sOrderbyFields.size(); idx++ )
{
*(int*)data = sOrderbyFields[idx].first;
data += sizeof(int);
*(bool*)data = sOrderbyFields[idx].second;
data += sizeof(bool);
}
memmove((void*)data, (void*)&sLimitCond, sizeof(LimitCond));
data += sizeof(LimitCond);
log_debug("serializeTo, sLimitStart: %d, sLimitStep: %d", sLimitCond.sLimitStart, sLimitCond.sLimitStep);
}
void serializeFrom(const char *data, int dataLen)
{
sMagicNum = *(uint16_t*)data;
data += sizeof(uint16_t);
dataLen -= sizeof(uint16_t);
sSequenceId = *(uint64_t*)data;
data += sizeof(uint64_t);
dataLen -= sizeof(uint64_t);
uint8_t condNum = *(uint8_t*)data;
data += sizeof(uint8_t);
dataLen -= sizeof(uint8_t);
QueryCond cond;
int condLen = 0;
for ( uint8_t idx = 0; idx < condNum; idx++ )
{
cond.serializeFrom(data, condLen);
data += condLen;
dataLen -= condLen;
sFieldConds.push_back(cond);
}
std::pair<int, bool> orPair;
uint8_t orderNum = *(uint8_t*)data;
data += sizeof(uint8_t);
dataLen -= sizeof(uint8_t);
for ( uint8_t idx = 0; idx < orderNum; idx++ )
{
orPair.first = *(int*)data;
data += sizeof(int);
dataLen -= sizeof(int);
orPair.second = *(bool*)data;
data += sizeof(bool);
dataLen -= sizeof(bool);
sOrderbyFields.push_back(orPair);
}
memmove((void*)&sLimitCond, (void*)data, sizeof(LimitCond));
dataLen -= sizeof(LimitCond);
log_debug("serializeFrom, sLimitStart: %d, sLimitStep: %d", sLimitCond.sLimitStart, sLimitCond.sLimitStep);
assert( dataLen == 0 );
}
};
struct DirectResponseContext
{
uint16_t sMagicNum;
uint64_t sSequenceId;
int16_t sRowNums; // number of matched rows or errno
std::deque<std::string> sRowValues;
void serializeTo(std::string& data)
{
static int headerLen = sizeof(uint16_t) + sizeof(uint64_t) + sizeof(int16_t);
data.clear();
data = (std::string((char*)this, headerLen));
for ( int16_t idx = 0; idx < sRowNums; idx++ )
{
data.append(sRowValues.front());
sRowValues.pop_front();
}
}
void free()
{
sMagicNum = 0;
sSequenceId = 0;
sRowNums = -1;
sRowValues.clear();
}
};
#pragma pack(pop)
#endif
#endif // __ROCKSDB_DIRECT_CONTEXT_H__

View File

@ -0,0 +1,341 @@
/*
* =====================================================================================
*
* Filename: logical_operate.h
*
* Description: logical operate class definition.
*
* Version: 1.0
* Created: 09/08/2018
* Revision: none
* Compiler: gcc
*
* Author: zhulin, shzhulin3@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#include "logical_operate.h"
#include "search_util.h"
#include "cachelist_unit.h"
#include "data_manager.h"
#include "json/reader.h"
#include "json/writer.h"
#include "index_tbl_op.h"
#include "index_sync/sync_index_timer.h"
#include "index_sync/sequence_search_index.h"
#include "stem.h"
#include <sstream>
#include <iomanip>
using namespace std;
extern SyncIndexTimer *globalSyncIndexTimer;
extern CCacheListUnit *indexcachelist;
LogicalOperate::LogicalOperate(uint32_t a, uint32_t s, uint32_t h, uint32_t c):m_appid(a), m_sort_type(s), m_has_gis(h), m_cache_switch(c)
{
}
LogicalOperate::~LogicalOperate(){
}
void LogicalOperate::SetFunc(logical_func func){
m_func = func;
}
int LogicalOperate::Process(const vector<vector<FieldInfo> >& keys, vector<IndexInfo>& vecs, set<string>& highlightWord, map<string, vec> &ves, map<string, uint32_t> &key_in_doc){
for (size_t index = 0; index < keys.size(); index++)
{
vector<IndexInfo> doc_id_vec;
vector<FieldInfo> fieldInfos = keys[index];
vector<FieldInfo>::iterator it;
for (it = fieldInfos.begin(); it != fieldInfos.end(); it++) {
vector<IndexInfo> doc_info;
if ((*it).segment_tag == 3) {
int ret = GetDocByShiftWord(*it, doc_info, m_appid, highlightWord);
if (ret != 0) {
doc_id_vec.clear();
return -RT_GET_DOC_ERR;
}
sort(doc_info.begin(), doc_info.end());
for (size_t doc_info_idx = 0; doc_info_idx < doc_info.size(); doc_info_idx++){
KeyInfo info;
info.word_freq = 1;
info.field = (*it).field;
info.word = (*it).word;
ves[doc_info[doc_info_idx].doc_id].push_back(info);
}
} else if ((*it).segment_tag == 4) {
int ret = GetDocByShiftEnWord(*it, doc_info, m_appid, highlightWord);
if (ret != 0) {
doc_id_vec.clear();
return -RT_GET_DOC_ERR;
}
sort(doc_info.begin(), doc_info.end());
for (size_t doc_info_idx = 0; doc_info_idx < doc_info.size(); doc_info_idx++){
KeyInfo info;
info.word_freq = 1;
info.field = (*it).field;
info.word = (*it).word;
ves[doc_info[doc_info_idx].doc_id].push_back(info);
}
} else if ((*it).segment_tag == 5 && (*it).word == "") { // 范围查询
stringstream ss;
ss << m_appid;
InvertIndexEntry startEntry(ss.str(), (*it).field, (double)(*it).start);
InvertIndexEntry endEntry(ss.str(), (*it).field, (double)(*it).end);
std::vector<InvertIndexEntry> resultEntry;
globalSyncIndexTimer->GetSearchIndex()->GetRangeIndex((*it).range_type, startEntry, endEntry, resultEntry);
std::vector<InvertIndexEntry>::iterator iter = resultEntry.begin();
for (; iter != resultEntry.end(); iter ++) {
IndexInfo info;
info.doc_id = (*iter)._InvertIndexDocId;
info.doc_version = (*iter)._InvertIndexDocVersion;
doc_info.push_back(info);
}
log_debug("appid: %s, field: %d, count: %d", startEntry._InvertIndexAppid.c_str(), (*it).field, (int)resultEntry.size());
} else {
int ret = GetDocIdSetByWord(*it, doc_info);
if (ret != 0){
return -RT_GET_DOC_ERR;
}
if (doc_info.size() == 0)
continue;
if (!m_has_gis || !isAllNumber((*it).word))
highlightWord.insert((*it).word);
if(!m_has_gis && (m_sort_type == SORT_RELEVANCE || m_sort_type == SORT_TIMESTAMP)){
CalculateByWord(*it, doc_info, ves, key_in_doc);
}
}
doc_id_vec = vec_union(doc_id_vec, doc_info);
}
if(index == 0){ // 第一个直接赋值给vecs后续的依次与前面的进行逻辑运算
vecs.assign(doc_id_vec.begin(), doc_id_vec.end());
} else {
vecs = m_func(vecs, doc_id_vec);
}
}
return 0;
}
int LogicalOperate::ProcessTerminal(const vector<vector<FieldInfo> >& and_keys, const TerminalQryCond& query_cond, vector<TerminalRes>& vecs){
if(and_keys.size() != 1){
return 0;
}
vector<FieldInfo> field_vec = and_keys[0];
if(field_vec.size() != 1){
return 0;
}
FieldInfo field_info = field_vec[0];
if(field_info.segment_tag != SEGMENT_RANGE){
return 0;
}
stringstream ss;
ss << m_appid;
InvertIndexEntry beginEntry(ss.str(), field_info.field, (double)field_info.start);
InvertIndexEntry endEntry(ss.str(), field_info.field, (double)field_info.end);
std::vector<InvertIndexEntry> resultEntry;
globalSyncIndexTimer->GetSearchIndex()->GetRangeIndexInTerminal(field_info.range_type, beginEntry, endEntry, query_cond, resultEntry);
std::vector<InvertIndexEntry>::iterator iter = resultEntry.begin();
for (; iter != resultEntry.end(); iter ++) {
TerminalRes info;
info.doc_id = (*iter)._InvertIndexDocId;
info.score = (*iter)._InvertIndexKey;
vecs.push_back(info);
}
return 0;
}
int LogicalOperate::ProcessComplete(const vector<FieldInfo>& complete_keys, vector<IndexInfo>& complete_vecs, vector<string>& word_vec, map<string, vec> &ves, map<string, uint32_t> &key_in_doc){
vector<FieldInfo>::const_iterator iter;
for (iter = complete_keys.begin(); iter != complete_keys.end(); iter++) {
vector<IndexInfo> doc_info;
int ret = GetDocIdSetByWord(*iter, doc_info);
if (ret != 0) {
return -RT_GET_DOC_ERR;
}
word_vec.push_back((*iter).word);
if(m_sort_type == SORT_RELEVANCE || m_sort_type == SORT_TIMESTAMP){
CalculateByWord(*iter, doc_info, ves, key_in_doc);
}
if(iter == complete_keys.begin()){
complete_vecs.assign(doc_info.begin(), doc_info.end());
} else {
complete_vecs = vec_intersection(complete_vecs, doc_info);
}
}
return 0;
}
void LogicalOperate::CalculateByWord(FieldInfo fieldInfo, const vector<IndexInfo> &doc_info, map<string, vec> &ves, map<string, uint32_t> &key_in_doc) {
string doc_id;
uint32_t word_freq = 0;
uint32_t field = 0;
uint32_t created_time;
string pos_str = "";
for (size_t i = 0; i < doc_info.size(); i++) {
doc_id = doc_info[i].doc_id;
word_freq = doc_info[i].word_freq;
field = doc_info[i].field;
created_time = doc_info[i].created_time;
pos_str = doc_info[i].pos;
vector<int> pos_vec;
if (pos_str != "" && pos_str.size() > 2) {
pos_str = pos_str.substr(1, pos_str.size() - 2);
pos_vec = splitInt(pos_str, ",");
}
KeyInfo info;
info.word_freq = word_freq;
info.field = field;
info.word = fieldInfo.word;
info.created_time = created_time;
info.pos_vec = pos_vec;
ves[doc_id].push_back(info);
}
key_in_doc[fieldInfo.word] = doc_info.size();
}
bool LogicalOperate::GetDocIndexCache(string word, uint32_t field, vector<IndexInfo> &doc_info) {
log_debug("get doc index start");
bool res = false;
uint8_t value[MAX_VALUE_LEN] = { 0 };
unsigned vsize = 0;
string output = "";
string indexCache = word + "|" + ToString(field);
if (m_cache_switch == 1 && indexcachelist->in_list(indexCache.c_str(), indexCache.size(), value, vsize))
{
log_debug("hit index cache.");
value[vsize] = '\0';
output = (char *)value;
res = true;
}
if (res) {
Json::Value packet;
Json::Reader r(Json::Features::strictMode());
int ret;
ret = r.parse(output.c_str(), output.c_str() + output.size(), packet);
if (0 == ret)
{
log_error("the err json string is : %s, errmsg : %s", output.c_str(), r.getFormattedErrorMessages().c_str());
res = false;
return res;
}
for (uint32_t i = 0; i < packet.size(); ++i) {
IndexInfo info;
Json::Value& index_cache = packet[i];
if (index_cache.isMember("appid") && index_cache["appid"].isUInt() &&
index_cache.isMember("id") && index_cache["id"].isString() &&
index_cache.isMember("version") && index_cache["version"].isUInt() &&
index_cache.isMember("field") && index_cache["field"].isUInt() &&
index_cache.isMember("freq") && index_cache["freq"].isUInt() &&
index_cache.isMember("time") && index_cache["time"].isUInt() &&
index_cache.isMember("pos") && index_cache["pos"].isString())
{
info.appid = index_cache["appid"].asUInt();
info.doc_id = index_cache["id"].asString();
info.doc_version = index_cache["version"].asUInt();
info.field = index_cache["field"].asUInt();
info.word_freq = index_cache["freq"].asUInt();
info.created_time = index_cache["time"].asUInt();
info.pos = index_cache["pos"].asString();
doc_info.push_back(info);
}
else {
log_error("parse index_cache error, no appid");
doc_info.clear();
res = false;
break;
}
}
}
return res;
}
void LogicalOperate::SetDocIndexCache(const vector<IndexInfo> &doc_info, string& indexJsonStr) {
Json::Value indexJson;
Json::FastWriter writer;
for (size_t i = 0; i < doc_info.size(); i++) {
Json::Value json_tmp;
json_tmp["appid"] = doc_info[i].appid;
json_tmp["id"] = doc_info[i].doc_id;
json_tmp["version"] = doc_info[i].doc_version;
json_tmp["field"] = doc_info[i].field;
json_tmp["freq"] = doc_info[i].word_freq;
json_tmp["time"] = doc_info[i].created_time;
json_tmp["pos"] = doc_info[i].pos;
indexJson.append(json_tmp);
}
indexJsonStr = writer.write(indexJson);
}
int LogicalOperate::GetDocIdSetByWord(FieldInfo fieldInfo, vector<IndexInfo> &doc_info) {
bool bRet = false;
if (DataManager::Instance()->IsSensitiveWord(fieldInfo.word)) {
log_debug("%s is a sensitive word.", fieldInfo.word.c_str());
return 0;
}
stringstream ss_key;
ss_key << m_appid;
ss_key << "#00#";
if(fieldInfo.segment_tag == 5){
stringstream ss;
ss << setw(20) << setfill('0') << fieldInfo.word;
ss_key << ss.str();
}
else if (fieldInfo.field_type == FIELD_INT || fieldInfo.field_type == FIELD_DOUBLE || fieldInfo.field_type == FIELD_LONG) {
ss_key << fieldInfo.word;
}
else if (fieldInfo.field_type == FIELD_IP) {
uint32_t word_id = GetIpNum(fieldInfo.word);
if (word_id == 0)
return 0;
ss_key << word_id;
}
else if (fieldInfo.word.find("_") != string::npos) { // 联合索引
ss_key << fieldInfo.word;
}
else {
string word_new = stem(fieldInfo.word);
ss_key << word_new;
}
log_debug("appid [%u], key[%s]", m_appid, ss_key.str().c_str());
if (m_has_gis && GetDocIndexCache(ss_key.str(), fieldInfo.field, doc_info)) {
return 0;
}
bRet = g_IndexInstance.GetDocInfo(m_appid, ss_key.str(), fieldInfo.field, doc_info);
if (false == bRet) {
log_error("GetDocInfo error.");
return -RT_DTC_ERR;
}
if (m_cache_switch == 1 && m_has_gis == 1 && doc_info.size() > 0 && doc_info.size() <= 1000) {
string index_str;
SetDocIndexCache(doc_info, index_str);
if (index_str != "" && index_str.size() < MAX_VALUE_LEN) {
string indexCache = ss_key.str() + "|" + ToString(fieldInfo.field);
unsigned data_size = indexCache.size();
int ret = indexcachelist->add_list(indexCache.c_str(), index_str.c_str(), data_size, index_str.size());
if (ret != 0) {
log_error("add to index_cache_list error, ret: %d.", ret);
}
else {
log_debug("add to index_cache_list: %s.", indexCache.c_str());
}
}
}
return 0;
}

View File

@ -0,0 +1,51 @@
/*
* =====================================================================================
*
* Filename: logical_operate.h
*
* Description: logical operate class definition.
*
* Version: 1.0
* Created: 09/08/2018
* Revision: none
* Compiler: gcc
*
* Author: zhulin, shzhulin3@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#include "component.h"
#include <map>
#include <set>
using namespace std;
typedef vector<KeyInfo> vec;
typedef vector<IndexInfo> (*logical_func)(vector<IndexInfo> &a, vector<IndexInfo> &b);
class LogicalOperate
{
public:
LogicalOperate(uint32_t appid, uint32_t sort_type, uint32_t has_gis, uint32_t cache_switch);
~LogicalOperate();
int Process(const vector<vector<FieldInfo> >& keys, vector<IndexInfo>& vecs, set<string>& highlightWord, map<string, vec> &ves, map<string, uint32_t> &key_in_doc);
int ProcessComplete(const vector<FieldInfo>& complete_keys, vector<IndexInfo>& complete_vecs, vector<string>& word_vec, map<string, vec> &ves, map<string, uint32_t> &key_in_doc);
void SetFunc(logical_func func);
int ProcessTerminal(const vector<vector<FieldInfo> >& and_keys, const TerminalQryCond& query_cond, vector<TerminalRes>& vecs);
private:
void CalculateByWord(FieldInfo fieldInfo, const vector<IndexInfo> &doc_info, map<string, vec> &ves, map<string, uint32_t> &key_in_doc);
void SetDocIndexCache(const vector<IndexInfo> &doc_info, string& indexJsonStr);
bool GetDocIndexCache(string word, uint32_t field, vector<IndexInfo> &doc_info);
int GetDocIdSetByWord(FieldInfo fieldInfo, vector<IndexInfo> &doc_info);
private:
uint32_t m_appid;
uint32_t m_sort_type;
uint32_t m_has_gis;
uint32_t m_cache_switch;
logical_func m_func;
};

View File

@ -0,0 +1,392 @@
/*
* =====================================================================================
*
* Filename: comm_main.cc
*
* Description:
*
* Version: 1.0
* Created: 09/08/2020 10:02:05 PM
* Revision: none
* Compiler: gcc
*
* Author: Norton, yangshuang68@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <sched.h>
#include <dlfcn.h>
#include <dtc_global.h>
#include <version.h>
#include <proc_title.h>
#include <log.h>
#include <config.h>
#include <dbconfig.h>
#include "comm_process.h"
#include <daemon.h>
#include <net_addr.h>
#include <listener.h>
#include <unix_socket.h>
#include <watchdog_listener.h>
#include <task_base.h>
const char service_file[] = "./helper-service.so";
const char create_handle_name[] = "create_process";
const char progname[] = "custom-helper";
const char usage_argv[] = "machId addr [port]";
char cacheFile[256] = CACHE_CONF_NAME;
char tableFile[256] = TABLE_CONF_NAME;
static CreateHandle CreateHelper = NULL;
static CommHelper *helperProc;
static unsigned int procTimeout;
class HelperMain
{
public:
HelperMain(CommHelper *helper) : h(helper){};
void attach(DTCTask *task) { h->attach((void *)task); }
void init_title(int group, int role) { h->init_title(group, role); }
void set_title(const char *status) { h->SetTitle(status); }
const char *Name() { return h->Name(); }
int pre_init(int gid, int r)
{
if (dbConfig->machineCnt <= gid)
{
log_error("parse config error, machineCnt[%d] <= GroupID[%d]", dbConfig->machineCnt, gid);
return (-1);
}
h->GroupID = gid;
h->Role = r;
h->_dbconfig = dbConfig;
h->_tdef = gTableDef[0];
h->_config = gConfig;
h->_server_string = dbConfig->mach[gid].role[r].addr;
h->logapi.init_target(h->_server_string);
return 0;
}
private:
CommHelper *h;
};
static int load_service(const char *dll_file)
{
void *dll;
dll = dlopen(dll_file, RTLD_NOW | RTLD_GLOBAL);
if (dll == (void *)NULL)
{
log_crit("dlopen(%s) error: %s", dll_file, dlerror());
return -1;
}
CreateHelper = (CreateHandle)dlsym(dll, create_handle_name);
if (CreateHelper == NULL)
{
log_crit("function[%s] not found", create_handle_name);
return -2;
}
return 0;
}
static int sync_decode(DTCTask *task, int netfd, CommHelper *helperProc)
{
SimpleReceiver receiver(netfd);
int code;
do
{
code = task->Decode(receiver);
if (code == DecodeFatalError)
{
if (errno != 0)
log_notice("decode fatal error, fd=%d, %m", netfd);
return -1;
}
if (code == DecodeDataError)
{
if (task->result_code() == 0 || task->result_code() == -EC_EXTRA_SECTION_DATA) // -EC_EXTRA_SECTION_DATA verify package
return 0;
log_notice("decode error, fd=%d, %d", netfd, task->result_code());
return -1;
}
HelperMain(helperProc).set_title("Receiving...");
} while (!stop && code != DecodeDone);
if (task->result_code() < 0)
{
log_notice("register result, fd=%d, %d", netfd, task->result_code());
return -1;
}
return 0;
}
static int sync_send(Packet *reply, int netfd)
{
int code;
do
{
code = reply->Send(netfd);
if (code == SendResultError)
{
log_notice("send error, fd=%d, %m", netfd);
return -1;
}
} while (!stop && code != SendResultDone);
return 0;
}
static void alarm_handler(int signo)
{
if (background == 0 && getppid() == 1)
exit(0);
alarm(10);
}
static int accept_connection(int fd)
{
HelperMain(helperProc).set_title("listener");
signal(SIGALRM, alarm_handler);
while (!stop)
{
alarm(10);
int newfd;
if ((newfd = accept(fd, NULL, 0)) >= 0)
{
alarm(0);
return newfd;
}
if (newfd < 0 && errno == EINVAL)
{
if (getppid() == (pid_t)1)
{ // ¸¸½ø³ÌÒѾ­Í˳ö
log_error("dtc father process not exist. helper[%d] exit now.", getpid());
exit(0);
}
usleep(10000);
}
}
exit(0);
}
static void proc_timeout_handler(int signo)
{
log_error("comm-helper process timeout(more than %u seconds), helper[pid: %d] exit now.", procTimeout, getpid());
exit(-1);
}
struct THelperProcParameter
{
int netfd;
int gid;
int role;
};
static int helper_proc_run(struct THelperProcParameter *args)
{
// close listen fd
close(0);
open("/dev/null", O_RDONLY);
HelperMain(helperProc).set_title("Initializing...");
if (procTimeout > 0)
signal(SIGALRM, proc_timeout_handler);
alarm(procTimeout);
if (helperProc->Init() != 0)
{
log_error("%s", "helper process init failed");
exit(-1);
}
alarm(0);
unsigned int timeout;
while (!stop)
{
HelperMain(helperProc).set_title("Waiting...");
DTCTask *task = new DTCTask(gTableDef[0]);
if (sync_decode(task, args->netfd, helperProc) < 0)
{
delete task;
break;
}
if (task->result_code() == 0)
{
switch (task->request_code())
{
case DRequest::Insert:
case DRequest::Update:
case DRequest::Delete:
case DRequest::Replace:
timeout = 2 * procTimeout;
default:
timeout = procTimeout;
}
HelperMain(helperProc).attach(task);
alarm(timeout);
helperProc->Execute();
alarm(0);
}
HelperMain(helperProc).set_title("Sending...");
Packet *reply = new Packet;
reply->encode_result(task);
delete task;
if (sync_send(reply, args->netfd) < 0)
{
delete reply;
break;
}
delete reply;
}
close(args->netfd);
HelperMain(helperProc).set_title("Exiting...");
delete helperProc;
daemon_cleanup();
#if MEMCHECK
log_info("%s v%s: stopped", progname, version);
dump_non_delete();
log_debug("memory allocated %lu virtual %lu", count_alloc_size(), count_virtual_size());
#endif
exit(0);
return 0;
}
int main(int argc, char **argv)
{
init_proc_title(argc, argv);
if (dtc_daemon_init(argc, argv) < 0)
return -1;
argc -= optind;
argv += optind;
struct THelperProcParameter helperArgs = {0, 0, 0};
char *addr = NULL;
if (argc > 0)
{
char *p;
helperArgs.gid = strtol(argv[0], &p, 0);
if (*p == '\0' || *p == MACHINEROLESTRING[0])
helperArgs.role = 0;
else if (*p == MACHINEROLESTRING[1])
helperArgs.role = 1;
else
{
log_error("Bad machine id: %s", argv[0]);
return -1;
}
}
if (argc != 2 && argc != 3)
{
show_usage();
return -1;
}
int backlog = gConfig->get_int_val("cache", "MaxListenCount", 256);
int helperTimeout = gConfig->get_int_val("cache", "HelperTimeout", 30);
if (helperTimeout > 1)
procTimeout = helperTimeout - 1;
else
procTimeout = 0;
addr = argv[1];
// load dll file
const char *file = getenv("HELPER_SERVICE_FILE");
if (file == NULL || file[0] == 0)
file = service_file;
if (load_service(file) != 0)
return -1;
helperProc = CreateHelper();
if (helperProc == NULL)
{
log_crit("create helper error");
return -1;
}
if (HelperMain(helperProc).pre_init(helperArgs.gid, helperArgs.role) < 0)
{
log_error("%s", "helper prepare init failed");
exit(-1);
}
if (helperProc->global_init() != 0)
{
log_crit("helper gobal-init error");
return -1;
}
int fd = -1;
if (!strcmp(addr, "-"))
fd = 0;
else
{
if (strcasecmp(gConfig->get_str_val("cache", "CacheShmKey") ?: "", "none") != 0)
{
log_warning("standalone %s need CacheShmKey set to NONE", progname);
return -1;
}
SocketAddress sockaddr;
const char *err = sockaddr.set_address(addr, argc == 2 ? NULL : argv[2]);
if (err)
{
log_warning("host %s port %s: %s", addr, argc == 2 ? "NULL" : argv[2], err);
return -1;
}
if (sockaddr.socket_type() != SOCK_STREAM)
{
log_warning("standalone %s don't support UDP protocol", progname);
return -1;
}
fd = sock_bind(&sockaddr, backlog);
if (fd < 0)
return -1;
}
daemon_start();
#if HAS_LOGAPI
helperProc->logapi.Init(
gConfig->get_int_val("LogApi", "MessageId", 0),
gConfig->get_int_val("LogApi", "CallerId", 0),
gConfig->get_int_val("LogApi", "TargetId", 0),
gConfig->get_int_val("LogApi", "InterfaceId", 0));
#endif
HelperMain(helperProc).init_title(helperArgs.gid, helperArgs.role);
if (procTimeout > 1)
helperProc->set_proc_timeout(procTimeout - 1);
while (!stop)
{
helperArgs.netfd = accept_connection(fd);
watch_dog_fork(HelperMain(helperProc).Name(), (int (*)(void *))helper_proc_run, (void *)&helperArgs);
close(helperArgs.netfd);
}
if (fd > 0 && addr && addr[0] == '/')
unlink(addr);
return 0;
}

View File

@ -0,0 +1,260 @@
/*
* =====================================================================================
*
* Filename: rocksdb_direct_context.h
*
* Description:
*
* Version: 1.0
* Created: 09/08/2020 10:02:05 PM
* Revision: none
* Compiler: gcc
*
* Author: zhuyao, zhuyao28@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#ifndef __ROCKSDB_DIRECT_CONTEXT_H__
#define __ROCKSDB_DIRECT_CONTEXT_H__
#if 1
#include <stdint.h>
#include <string>
#include <vector>
#include <deque>
#include <assert.h>
#include <string.h>
static const uint16_t sgMagicNum = 12345; // global magic number
// operator must be matched with DTC with the same order
enum class CondOpr : uint8_t
{
eEQ, // == 0
eNE, // != 1
eLT, // < 2
eLE, // <= 3
eGT, // > 4
eGE // >= 5
};
bool operator==(const CondOpr lc, const CondOpr rc)
{
return (int)lc == (int)rc;
}
#pragma pack(push, 1)
struct QueryCond
{
uint8_t sFieldIndex;
uint8_t sCondOpr; // CondOpr
// uint8_t sCondValueLen;
// char sCondValue[];
std::string sCondValue;
private:
int binary_size()
{
static int fixHeaderLen = sizeof(sFieldIndex) + sizeof(sCondOpr) + sizeof(int) /* value len */;
return fixHeaderLen + sCondValue.length();
}
void serialize_to(char *&data)
{
*(uint8_t *)data = sFieldIndex;
data += sizeof(uint8_t);
*(uint8_t *)data = sCondOpr;
data += sizeof(uint8_t);
*(int *)data = sCondValue.length();
data += sizeof(int);
memmove((void *)data, (void *)sCondValue.c_str(), sCondValue.length());
data += sCondValue.length();
}
void serialize_from(const char *data, int &condLen)
{
const char *begPos = data;
sFieldIndex = *(uint8_t *)data;
data += sizeof(uint8_t);
sCondOpr = *(uint8_t *)data;
data += sizeof(uint8_t);
int len = *(int *)data;
data += sizeof(int);
sCondValue.assign(data, len);
condLen = data - begPos + len;
}
friend class DirectRequestContext;
};
struct LimitCond
{
int sLimitStart = -1;
int sLimitStep = -1;
void reset() { sLimitStart = -1, sLimitStep = -1; }
};
struct DirectRequestContext
{
uint16_t sMagicNum;
uint64_t sSequenceId;
// uint8_t sCondValueNum;
// char sContextValue[];
std::vector<QueryCond> sFieldConds;
std::vector<std::pair<int, bool /* asc or not*/>> sOrderbyFields;
LimitCond sLimitCond;
void reset()
{
sMagicNum = 0;
sSequenceId = 0;
sFieldConds.clear();
sLimitCond.reset();
}
// binary format size for transporting in
int binary_size()
{
static int fixHeaderLen = sizeof(sMagicNum) + sizeof(sSequenceId) + sizeof(uint8_t) * 2;
int len = fixHeaderLen;
for (size_t idx = 0; idx < sFieldConds.size(); idx++)
{
len += sFieldConds[idx].binary_size();
}
for (size_t idx = 0; idx < sOrderbyFields.size(); idx++)
{
len += (sizeof(int) + sizeof(bool));
}
len += sizeof(LimitCond);
return len;
}
// before call this function, should call 'binary_size' to evaluate the size of the struct
void serialize_to(char *data, int len)
{
*(uint16_t *)data = sMagicNum;
data += sizeof(uint16_t);
*(uint64_t *)data = sSequenceId;
data += sizeof(uint64_t);
*(uint8_t *)data = sFieldConds.size();
data += sizeof(uint8_t);
for (size_t idx = 0; idx < sFieldConds.size(); idx++)
{
sFieldConds[idx].serialize_to(data);
}
*(uint8_t *)data = sOrderbyFields.size();
data += sizeof(uint8_t);
for (size_t idx = 0; idx < sOrderbyFields.size(); idx++)
{
*(int *)data = sOrderbyFields[idx].first;
data += sizeof(int);
*(bool *)data = sOrderbyFields[idx].second;
data += sizeof(bool);
}
memmove((void *)data, (void *)&sLimitCond, sizeof(LimitCond));
data += sizeof(LimitCond);
}
void serialize_from(const char *data, int dataLen)
{
sMagicNum = *(uint16_t *)data;
data += sizeof(uint16_t);
dataLen -= sizeof(uint16_t);
sSequenceId = *(uint64_t *)data;
data += sizeof(uint64_t);
dataLen -= sizeof(uint64_t);
uint8_t condNum = *(uint8_t *)data;
data += sizeof(uint8_t);
dataLen -= sizeof(uint8_t);
QueryCond cond;
int condLen = 0;
for (uint8_t idx = 0; idx < condNum; idx++)
{
cond.serialize_from(data, condLen);
data += condLen;
dataLen -= condLen;
sFieldConds.push_back(cond);
}
std::pair<int, bool> orPair;
uint8_t orderNum = *(uint8_t *)data;
data += sizeof(uint8_t);
dataLen -= sizeof(uint8_t);
for (uint8_t idx = 0; idx < orderNum; idx++)
{
orPair.first = *(int *)data;
data += sizeof(int);
dataLen -= sizeof(int);
orPair.second = *(bool *)data;
data += sizeof(bool);
dataLen -= sizeof(bool);
sOrderbyFields.push_back(orPair);
}
memmove((void *)&sLimitCond, (void *)data, sizeof(LimitCond));
dataLen -= sizeof(LimitCond);
assert(dataLen == 0);
}
};
struct DirectResponseContext
{
uint16_t sMagicNum;
uint64_t sSequenceId;
int16_t sRowNums; // number of matched rows or errno
std::deque<std::string> sRowValues;
// char* sRowValues;
void serialize_to(std::string &data)
{
static int headerLen = sizeof(uint16_t) + sizeof(uint64_t) + sizeof(int16_t);
data.clear();
data = std::move(std::string((char *)this, headerLen));
for (size_t idx = 0; idx < sRowNums; idx++)
{
data.append(sRowValues.front());
sRowValues.pop_front();
}
}
void free()
{
sMagicNum = 0;
sSequenceId = 0;
sRowNums = -1;
sRowValues.clear();
}
};
#pragma pack(pop)
#endif
#endif // __ROCKSDB_DIRECT_CONTEXT_H__

View File

@ -0,0 +1,529 @@
/*
* =====================================================================================
*
* Filename: add_request_proc.cc
*
* Description: AddReqProc class definition.
*
* Version: 1.0
* Created: 09/08/2020 10:02:05 PM
* Revision: none
* Compiler: gcc
*
* Author: shrewdlin, linjinming@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#include "add_request_proc.h"
#include "index_tbl_op.h"
#include "geohash.h"
#include "split_manager.h"
#include <sstream>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <iomanip>
AddReqProc::AddReqProc(){
}
AddReqProc::AddReqProc(const Json::Value& jf, InsertParam& insert_param){
doc_version = insert_param.doc_version;
trans_version = insert_param.trans_version;
app_id = insert_param.appid;
doc_id = insert_param.doc_id;
json_field = jf;
}
AddReqProc::~AddReqProc(){
}
void AddReqProc::do_stat_word_freq(vector<vector<string> > &strss, map<string, item> &word_map, string extend) {
string word;
uint32_t id = 0;
ostringstream oss;
vector<vector<string> >::iterator iters = strss.begin();
uint32_t index = 0;
for(;iters != strss.end(); iters++){
index++;
vector<string>::iterator iter = iters->begin();
log_debug("start do_stat_word_freq, appid = %u\n",app_id);
for (; iter != iters->end(); iter++) {
word = *iter;
if (!SplitManager::Instance()->wordValid(word, app_id, id)){
continue;
}
if (word_map.find(word) == word_map.end()) {
item it;
it.doc_id = doc_id;
it.freq = 1;
it.extend = extend;
it.indexs.push_back(index);
word_map.insert(make_pair(word, it));
}
else {
word_map[word].freq++;
word_map[word].indexs.push_back(index);
}
oss << (*iter) << "|";
}
}
log_debug("split: %s",oss.str().c_str());
}
void AddReqProc::do_stat_word_freq(vector<string> &strss, map<string, item> &word_map) {
string word;
vector<string>::iterator iters = strss.begin();
uint32_t index = 0;
for (; iters != strss.end(); iters++) {
index++;
word = *iters;
if (word_map.find(word) == word_map.end()) {
item it;
it.doc_id = doc_id;
it.freq = 1;
it.indexs.push_back(index);
word_map.insert(make_pair(word, it));
}
else {
word_map[word].freq++;
word_map[word].indexs.push_back(index);
}
}
}
int AddReqProc::deal_index_tag(struct table_info *tbinfo, string field_name){
int ret =0;
map<string, item> word_map;
vector<vector<string> > split_content;
switch(tbinfo->field_type){
case FIELD_STRING:
case FIELD_TEXT:
if(json_field[field_name].isString()){
if (tbinfo->segment_tag == SEGMENT_NGRAM) { // NGram split mode
vector<string> ngram_content = SplitManager::Instance()->split(json_field[field_name].asString());
do_stat_word_freq(ngram_content, word_map);
}
else if (tbinfo->segment_tag == SEGMENT_CHINESE || tbinfo->segment_tag == SEGMENT_ENGLISH) { // use intelligent_info
string str = json_field[field_name].asString();
// segment_tag为3对应的字段内容必须为全中文为4对应的的字段不能包含中文
if (tbinfo->segment_tag == SEGMENT_CHINESE && allChinese(str) == false) {
log_error("segment_tag is 3, the content[%s] must be Chinese.", str.c_str());
return RT_ERROR_FIELD_FORMAT;
}
if (tbinfo->segment_tag == SEGMENT_ENGLISH && noChinese(str) == false) {
log_error("segment_tag is 4, the content[%s] can not contain Chinese.", str.c_str());
return RT_ERROR_FIELD_FORMAT;
}
item it;
it.doc_id = doc_id;
it.freq = 1;
if(tbinfo->segment_feature == SEGMENT_FEATURE_SNAPSHOT){
Json::FastWriter ex_writer;
it.extend = ex_writer.write(snapshot_content);
}
word_map.insert(make_pair(str, it));
vector<IntelligentInfo> info;
bool flag = false;
get_intelligent(str, info, flag);
if (flag) {
stringstream ss;
ss << app_id << "#" << tbinfo->field_value;
ret = g_hanpinIndexInstance.do_insert_intelligent(ss.str(), doc_id, str, info, doc_version);
if(0 != ret){
roll_back();
return ret;
}
intelligent_keys.push_back(ss.str());
}
}
else {
split_content = SplitManager::Instance()->split(json_field[field_name].asString(), app_id);
string extend = "";
if(tbinfo->segment_feature == SEGMENT_FEATURE_SNAPSHOT){
Json::FastWriter ex_writer;
extend = ex_writer.write(snapshot_content);
}
do_stat_word_freq(split_content, word_map, extend);
split_content.clear();
}
ret = g_IndexInstance.do_insert_index(word_map, app_id, doc_version, tbinfo->field_value, docid_index_map);
if (0 != ret) {
roll_back();
return ret;
}
word_map.clear();
}else{
log_error("field type error, not FIELD_STRING.");
return RT_ERROR_FIELD_FORMAT;
}
break;
case FIELD_INT:
if(json_field[field_name].isInt()){
int ret;
struct item it;
it.doc_id = doc_id;
it.freq = 0;
string key = "";
if(tbinfo->segment_tag == SEGMENT_RANGE){ // 范围查的字段将key补全到20位
stringstream ss;
ss << setw(20) << setfill('0') << json_field[field_name].asInt();
key = gen_dtc_key_string(app_id, "00", ss.str());
} else {
key = gen_dtc_key_string(app_id, "00", (uint32_t)json_field[field_name].asInt());
}
ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map);
if(ret != 0){
roll_back();
return RT_ERROR_INSERT_INDEX_DTC;
}
}else{
log_error("field type error, not FIELD_INT.");
return RT_ERROR_FIELD_FORMAT;
}
break;
case FIELD_LONG:
if(json_field[field_name].isInt64()){
struct item it;
it.doc_id = doc_id;
it.freq = 0;
string key = gen_dtc_key_string(app_id, "00", (int64_t)json_field[field_name].asInt64());
int ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map);
if(0 != ret){
roll_back();
log_error("insert_index_dtc error, appid[%d], key[%s]", app_id, key.c_str());
return RT_ERROR_INSERT_INDEX_DTC;
}
} else {
log_error("field type error, not FIELD_LONG.");
return RT_ERROR_FIELD_FORMAT;
}
break;
case FIELD_DOUBLE:
if(json_field[field_name].isDouble()){
struct item it;
it.doc_id = doc_id;
it.freq = 0;
string key = gen_dtc_key_string(app_id, "00", (double)json_field[field_name].asDouble());
int ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map);
if(0 != ret){
roll_back();
log_error("insert_index_dtc error, appid[%d], key[%s]", app_id, key.c_str());
return RT_ERROR_INSERT_INDEX_DTC;
}
} else {
log_error("field type error, not FIELD_DOUBLE.");
return RT_ERROR_FIELD_FORMAT;
}
break;
case FIELD_IP:
uint32_t s;
int ret;
if(json_field[field_name].isString()){
ret = inet_pton(AF_INET, json_field[field_name].asString().c_str(), (void *)&s);
if(ret == 0){
log_error("ip format is error\n");
return RT_ERROR_FIELD_FORMAT;
}
struct item it;
it.doc_id = doc_id;
it.freq = 0;
string key = gen_dtc_key_string(app_id, "00", ntohl(s));
ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map);
if(ret != 0){
roll_back();
return RT_ERROR_INSERT_INDEX_DTC;
}
}else{
return RT_ERROR_FIELD_FORMAT;
}
break;
case FIELD_LNG:
if(json_field[field_name].isString()){
lng = json_field[field_name].asString();
}else{
return RT_ERROR_FIELD_FORMAT;
}
break;
case FIELD_LAT:
if(json_field[field_name].isString()){
lat = json_field[field_name].asString();
}else{
return RT_ERROR_FIELD_FORMAT;
}
break;
case FIELD_LNG_ARRAY:
if(json_field[field_name].isArray()){
Json::Value lngs = json_field[field_name];
for (uint32_t lng_idx = 0; lng_idx < lngs.size(); ++lng_idx) {
if (lngs[lng_idx].isString()){
lng_arr.push_back(lngs[lng_idx].asString());
} else {
log_error("longitude must be string");
return RT_ERROR_FIELD_FORMAT;
}
}
}else{
log_error("FIELD_LNG_ARRAY must be array");
return RT_ERROR_FIELD_FORMAT;
}
break;
case FIELD_LAT_ARRAY:
if(json_field[field_name].isArray()){
Json::Value lats = json_field[field_name];
for (uint32_t lat_idx = 0; lat_idx < lats.size(); ++lat_idx) {
if (lats[lat_idx].isString()){
lat_arr.push_back(lats[lat_idx].asString());
} else {
log_error("latitude must be string");
return RT_ERROR_FIELD_FORMAT;
}
}
}else{
log_error("FIELD_LAT_ARRAY must be array");
return RT_ERROR_FIELD_FORMAT;
}
break;
case FIELD_WKT:
if(json_field[field_name].isString()){
string str = json_field[field_name].asString();
str = delPrefix(str);
vector<string> str_vec = splitEx(str, ",");
for(uint32_t str_vec_idx = 0; str_vec_idx < str_vec.size(); str_vec_idx++){
string wkt_str = trim(str_vec[str_vec_idx]);
vector<string> wkt_vec = splitEx(wkt_str, " ");
if(wkt_vec.size() == 2){
lng_arr.push_back(wkt_vec[0]);
lat_arr.push_back(wkt_vec[1]);
}
}
} else {
log_error("FIELD_WKT must be string");
return RT_ERROR_FIELD_FORMAT;
}
break;
default:
break;
}
return 0;
}
int AddReqProc::do_insert_index(UserTableContent& content_fields){
int ret = 0;
Json::Value::Members member = json_field.getMemberNames();
Json::Value::Members::iterator iter = member.begin();
for(; iter != member.end(); ++iter)
{
string field_name = *iter;
struct table_info *tbinfo = NULL;
tbinfo = SplitManager::Instance()->get_table_info(app_id, field_name);
if(tbinfo == NULL){
continue;
}
if(tbinfo->snapshot_tag == 1){ //snapshot
if(tbinfo->field_type == 1 && json_field[field_name].isInt()){
snapshot_content[field_name] = json_field[field_name].asInt();
}else if(tbinfo->field_type > 1 && json_field[field_name].isString()){
snapshot_content[field_name] = json_field[field_name].asString();
}else if(tbinfo->field_type > 1 && json_field[field_name].isDouble()){
snapshot_content[field_name] = json_field[field_name].asDouble();
}else if(tbinfo->field_type > 1 && json_field[field_name].isInt64()){
snapshot_content[field_name] = json_field[field_name].asInt64();
}else if(tbinfo->field_type > 1 && json_field[field_name].isArray()){
snapshot_content[field_name] = json_field[field_name];
}
}
}
for(iter = member.begin(); iter != member.end(); ++iter)
{
string field_name = *iter;
struct table_info *tbinfo = NULL;
tbinfo = SplitManager::Instance()->get_table_info(app_id, field_name);
if(tbinfo == NULL){
continue;
}
if(tbinfo->index_tag == 1){
ret = deal_index_tag(tbinfo, field_name);
if(0 != ret){
log_error("deal index tag process error, ret: %d", ret);
roll_back();
return ret;
}
}
}
if(lng.length() != 0 && lat.length() != 0){
struct table_info *tbinfo = NULL;
tbinfo = SplitManager::Instance()->get_table_info(app_id, "gis");
if(tbinfo == NULL){
roll_back();
return RT_NO_GIS_DEFINE;
}
string gisid = encode(atof(lat.c_str()), atof(lng.c_str()), 6);
log_debug("gis code = %s",gisid.c_str());
int ret;
uint64_t id = 0;
struct item it;
it.doc_id = doc_id;
it.freq = 0;
Json::FastWriter gis_writer;
it.extend = gis_writer.write(snapshot_content);
string key = gen_dtc_key_string(app_id, "00", gisid);
ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map);
if(ret != 0){
roll_back();
return RT_ERROR_INSERT_INDEX_DTC;
}
log_debug("id = %llu,doc_vesion = %d,docid = %s\n",(long long unsigned int)id,doc_version,it.doc_id.c_str());
}
log_debug("lng_arr size: %d, lat_arr size: %d", (int)lng_arr.size(), (int)lat_arr.size());
if(lng_arr.size() > 0 && lat_arr.size() > 0){
if(lng_arr.size() != lat_arr.size()){
log_error("lng_arr size not equal with lat_arr size");
return RT_ERROR_FIELD_FORMAT;
}
set<string> gis_set;
for(uint32_t arr_idx = 0; arr_idx < lng_arr.size(); arr_idx++){
string tmp_lng = lng_arr[arr_idx];
string tmp_lat = lat_arr[arr_idx];
struct table_info *tbinfo = NULL;
tbinfo = SplitManager::Instance()->get_table_info(app_id, "gis");
if(tbinfo == NULL){
roll_back();
log_error("gis field not defined");
return RT_NO_GIS_DEFINE;
}
string gisid = encode(atof(tmp_lat.c_str()), atof(tmp_lng.c_str()), 6);
if(gis_set.find(gisid) != gis_set.end()){
continue;
}
gis_set.insert(gisid);
struct item it;
it.doc_id = doc_id;
it.freq = 0;
Json::FastWriter gis_writer;
it.extend = gis_writer.write(snapshot_content);
string key = gen_dtc_key_string(app_id, "00", gisid);
int ret = g_IndexInstance.insert_index_dtc(key, it, tbinfo->field_value, doc_version, docid_index_map);
if(ret != 0){
roll_back();
return RT_ERROR_INSERT_INDEX_DTC;
}
log_debug("gis code = %s,doc_vesion = %d,docid = %s\n",gisid.c_str(),doc_version,it.doc_id.c_str());
}
}
vector<string> union_key_vec;
SplitManager::Instance()->getUnionKeyField(app_id, union_key_vec);
vector<string>::iterator union_key_iter = union_key_vec.begin();
for(; union_key_iter != union_key_vec.end(); union_key_iter++){
string union_key = *union_key_iter;
vector<int> union_field_vec = splitInt(union_key, ",");
vector<int>::iterator union_field_iter = union_field_vec.begin();
vector<vector<string> > keys_vvec;
for(; union_field_iter != union_field_vec.end(); union_field_iter++){
int union_field_value = *union_field_iter;
if(union_field_value >= (int)docid_index_map.size()){
log_error("appid[%d] field[%d] is invalid", app_id, *union_field_iter);
break;
}
vector<string> key_vec;
if(!docid_index_map[union_field_value].isArray()){
log_debug("doc_id[%s] union_field_value[%d] has no keys", doc_id.c_str(), union_field_value);
break;
}
for (int key_index = 0; key_index < (int)docid_index_map[union_field_value].size(); key_index++){
if(docid_index_map[union_field_value][key_index].isString()){
string union_index_key = docid_index_map[union_field_value][key_index].asString();
if(union_index_key.size() > 9){ // 倒排key的格式为10061#00#折扣,这里只取第二个#后面的内容
key_vec.push_back(union_index_key.substr(9));
}
}
}
keys_vvec.push_back(key_vec);
}
if(keys_vvec.size() != union_field_vec.size()){
log_debug("keys_vvec.size not equal union_field_vec.size");
break;
}
vector<string> union_keys = combination(keys_vvec);
for(int m = 0 ; m < (int)union_keys.size(); m++){
ret = g_IndexInstance.insert_union_index_dtc(union_keys[m], doc_id, app_id, doc_version);
if(ret != 0){
log_error("insert union key[%s] error", union_keys[m].c_str());
}
}
}
Json::FastWriter writer;
content_fields.content = writer.write(snapshot_content);
Json::FastWriter doc_index_writer;
string doc_index_map_string = doc_index_writer.write(docid_index_map);
if(doc_version != 1){//need update
map<uint32_t, vector<string> > index_res;
g_IndexInstance.GetIndexData(gen_dtc_key_string(content_fields.appid, "20", doc_id), doc_version - 1, index_res);
map<uint32_t, vector<string> >::iterator map_iter = index_res.begin();
for(; map_iter != index_res.end(); map_iter++){
uint32_t field = map_iter->first;
vector<string> words = map_iter->second;
for(int i = 0; i < (int)words.size(); i++){
DeleteTask::GetInstance().RegisterInfo(words[i], doc_id, doc_version - 1, field);
}
}
int affected_rows = 0;
ret = g_IndexInstance.update_sanpshot_dtc(content_fields, doc_version, trans_version, affected_rows);
if(ret != 0 || affected_rows == 0){
log_error("update_sanpshot_dtc error, roll back, ret: %d, affected_rows: %d.", ret, affected_rows);
roll_back();
return RT_ERROR_UPDATE_SNAPSHOT;
}
g_IndexInstance.update_docid_index_dtc(doc_index_map_string, doc_id, app_id, doc_version);
}else{
int affected_rows = 0;
ret = g_IndexInstance.update_sanpshot_dtc(content_fields, doc_version, trans_version, affected_rows);
if(ret != 0 || affected_rows == 0){
log_error("update_sanpshot_dtc error, roll back, ret: %d, affected_rows: %d.", ret, affected_rows);
roll_back();
return RT_ERROR_UPDATE_SNAPSHOT;
}
g_IndexInstance.insert_docid_index_dtc(doc_index_map_string, doc_id, app_id, doc_version);
}
return 0;
}
int AddReqProc::roll_back(){
// 删除hanpin_index
for(int i = 0; i < (int)intelligent_keys.size(); i++){
g_hanpinIndexInstance.delete_intelligent(intelligent_keys[i], doc_id, trans_version);
}
// 删除keyword_index
if(docid_index_map.isArray()){
for(int i = 0;i < (int)docid_index_map.size();i++){
Json::Value info = docid_index_map[i];
if(info.isArray()){
for(int j = 0;j < (int)info.size();j++){
if(info[j].isString()){
string key = info[j].asString();
g_IndexInstance.delete_index(key, doc_id, trans_version, i);
}
}
}
}
}
// 如果trans_version=1删除快照否则更新快照的trans_version=trans_version-1
Json::Value res;
if(trans_version == 1){
g_IndexInstance.delete_snapshot_dtc(doc_id, app_id, res);
} else {
g_IndexInstance.update_sanpshot_dtc(app_id, doc_id, trans_version);
}
return 0;
}

View File

@ -0,0 +1,58 @@
/*
* =====================================================================================
*
* Filename: add_request_proc.h
*
* Description: AddReqProc class definition.
*
* Version: 1.0
* Created: 09/08/2020 10:02:05 PM
* Revision: none
* Compiler: gcc
*
* Author: shrewdlin, linjinming@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#ifndef ADD_REQUEST_PROC_H
#define ADD_REQUEST_PROC_H
#include "log.h"
#include "json/json.h"
#include "comm.h"
class UserTableContent;
class SplitManager;
class AddReqProc
{
public:
AddReqProc();
AddReqProc(const Json::Value& jf, InsertParam& insert_param);
~AddReqProc();
int do_insert_index(UserTableContent& content_fields);
private:
void do_stat_word_freq(vector<vector<string> > &strss, map<string, item> &word_map, string extend);
void do_stat_word_freq(vector<string> &strss, map<string, item> &word_map);
int deal_index_tag(struct table_info *tbinfo, string field_name);
int roll_back();
private:
Json::Value json_field;
uint32_t app_id;
uint32_t doc_version;
uint32_t trans_version;
string doc_id;
string lng;
string lat;
vector<string> lng_arr;
vector<string> lat_arr;
vector<string> intelligent_keys;
Json::Value snapshot_content;
Json::Value docid_index_map;
};
#endif