add MatchQueryProcess

This commit is contained in:
shzhulin3 2021-05-17 10:52:53 +08:00
parent 466408ca54
commit 784e39b5e3
11 changed files with 1071 additions and 552 deletions

View File

@ -9,6 +9,7 @@ AUX_SOURCE_DIRECTORY(. main)
AUX_SOURCE_DIRECTORY(./index_sync index_sync) AUX_SOURCE_DIRECTORY(./index_sync index_sync)
AUX_SOURCE_DIRECTORY(./utils utils) AUX_SOURCE_DIRECTORY(./utils utils)
AUX_SOURCE_DIRECTORY(./query query) AUX_SOURCE_DIRECTORY(./query query)
AUX_SOURCE_DIRECTORY(./process process)
LINK_DIRECTORIES( LINK_DIRECTORIES(
${PROJECT_SOURCE_DIR}/../../comm ${PROJECT_SOURCE_DIR}/../../comm
@ -16,7 +17,7 @@ ${PROJECT_SOURCE_DIR}/../../3rdlib/jsoncpp/lib
${PROJECT_SOURCE_DIR}/../../comm/stat ${PROJECT_SOURCE_DIR}/../../comm/stat
) )
ADD_EXECUTABLE(index_read ${main} ${index_sync} ${utils} ${query}) ADD_EXECUTABLE(index_read ${main} ${index_sync} ${utils} ${query} ${process})
target_include_directories(index_read PUBLIC target_include_directories(index_read PUBLIC
../../3rdlib/jsoncpp/include ../../3rdlib/jsoncpp/include

View File

@ -749,6 +749,10 @@ bool Component::TerminalTagValid(){
return m_terminal_tag_valid; return m_terminal_tag_valid;
} }
Json::Value& Component::GetQuery(){
return m_query;
}
void Component::GetKeyFromFieldInfo(const vector<FieldInfo>& field_info_vec, vector<string>& key_vec){ void Component::GetKeyFromFieldInfo(const vector<FieldInfo>& field_info_vec, vector<string>& key_vec){
vector<FieldInfo>::const_iterator iter = field_info_vec.begin(); vector<FieldInfo>::const_iterator iter = field_info_vec.begin();
for(; iter != field_info_vec.end(); iter++){ for(; iter != field_info_vec.end(); iter++){

View File

@ -67,8 +67,8 @@ public:
vector<string>& Fields(); vector<string>& Fields();
uint32_t TerminalTag(); uint32_t TerminalTag();
bool TerminalTagValid(); bool TerminalTagValid();
Json::Value& GetQuery();
private:
void GetFieldWords(int type, string dataStr, uint32_t appid, uint32_t &m_has_gis); void GetFieldWords(int type, string dataStr, uint32_t appid, uint32_t &m_has_gis);
void AddToFieldList(int type, vector<FieldInfo>& fields); void AddToFieldList(int type, vector<FieldInfo>& fields);
void GetKeyFromFieldInfo(const vector<FieldInfo>& field_info_vec, vector<string>& key_vec); void GetKeyFromFieldInfo(const vector<FieldInfo>& field_info_vec, vector<string>& key_vec);

View File

@ -16,6 +16,9 @@
* ===================================================================================== * =====================================================================================
*/ */
#ifndef LOGICAL_OP_H
#define LOGICAL_OP_H
#include "component.h" #include "component.h"
#include <map> #include <map>
#include <set> #include <set>
@ -35,7 +38,6 @@ public:
void SetFunc(logical_func func); void SetFunc(logical_func func);
int ProcessTerminal(const vector<vector<FieldInfo> >& and_keys, const TerminalQryCond& query_cond, vector<TerminalRes>& vecs); int ProcessTerminal(const vector<vector<FieldInfo> >& and_keys, const TerminalQryCond& query_cond, vector<TerminalRes>& vecs);
private:
void CalculateByWord(FieldInfo fieldInfo, const vector<IndexInfo> &doc_info, map<string, vec> &ves, map<string, uint32_t> &key_in_doc); void CalculateByWord(FieldInfo fieldInfo, const vector<IndexInfo> &doc_info, map<string, vec> &ves, map<string, uint32_t> &key_in_doc);
void SetDocIndexCache(const vector<IndexInfo> &doc_info, string& indexJsonStr); void SetDocIndexCache(const vector<IndexInfo> &doc_info, string& indexJsonStr);
bool GetDocIndexCache(string word, uint32_t field, vector<IndexInfo> &doc_info); bool GetDocIndexCache(string word, uint32_t field, vector<IndexInfo> &doc_info);
@ -49,3 +51,4 @@ private:
logical_func m_func; logical_func m_func;
}; };
#endif

View File

@ -0,0 +1,329 @@
#include "match_query_process.h"
#include "math.h"
#include "../order_op.h"
#define DOC_CNT 10000
MatchQueryProcess::MatchQueryProcess(uint32_t appid, Json::Value& value, Component* component)
:QueryProcess(appid, value, component){
appid_ = component_->Appid();
sort_type_ = component_->SortType();
sort_field_ = component_->SortField();
has_gis_ = false;
}
MatchQueryProcess::~MatchQueryProcess(){
}
int MatchQueryProcess::ParseContent(){
return ParseContent(ORKEY);
}
int MatchQueryProcess::ParseContent(uint32_t type){
vector<FieldInfo> fieldInfos;
Json::Value::Members member = value_.getMemberNames();
Json::Value::Members::iterator iter = member.begin();
string fieldname;
Json::Value field_value;
if(iter != member.end()){ // 一个match下只对应一个字段
fieldname = *iter;
field_value = value_[fieldname];
} else {
log_error("MatchQueryProcess error, value is null");
return -RT_PARSE_CONTENT_ERROR;
}
uint32_t segment_tag = 0;
FieldInfo fieldInfo;
uint32_t field = DBManager::Instance()->GetWordField(segment_tag, appid_, fieldname, fieldInfo);
if (field != 0 && segment_tag == 1)
{
string split_data = SplitManager::Instance()->split(field_value.asString(), appid_);
log_debug("split_data: %s", split_data.c_str());
vector<string> split_datas = splitEx(split_data, "|");
for(size_t index = 0; index < split_datas.size(); index++)
{
FieldInfo info;
info.field = fieldInfo.field;
info.field_type = fieldInfo.field_type;
info.word = split_datas[index];
info.segment_tag = fieldInfo.segment_tag;
fieldInfos.push_back(info);
}
}
else if (field != 0)
{
fieldInfo.word = field_value.asString();
fieldInfos.push_back(fieldInfo);
}
component_->AddToFieldList(type, fieldInfos);
return 0;
}
int MatchQueryProcess::GetValidDoc(){
doc_manager_ = new DocManager(component_);
logical_operate_ = new LogicalOperate(appid_, sort_type_, has_gis_, component_->CacheSwitch());
for (size_t index = 0; index < component_->Keys().size(); index++)
{
vector<IndexInfo> doc_id_vec;
vector<FieldInfo> fieldInfos = component_->Keys()[index];
vector<FieldInfo>::iterator it;
for (it = fieldInfos.begin(); it != fieldInfos.end(); it++) {
vector<IndexInfo> doc_info;
if ((*it).segment_tag == 3) {
int ret = GetDocByShiftWord(*it, doc_info, appid_, highlightWord_);
if (ret != 0) {
doc_id_vec.clear();
return -RT_GET_DOC_ERR;
}
sort(doc_info.begin(), doc_info.end());
for (size_t doc_info_idx = 0; doc_info_idx < doc_info.size(); doc_info_idx++){
KeyInfo info;
info.word_freq = 1;
info.field = (*it).field;
info.word = (*it).word;
doc_info_map_[doc_info[doc_info_idx].doc_id].push_back(info);
}
} else if ((*it).segment_tag == 4) {
int ret = GetDocByShiftEnWord(*it, doc_info, appid_, highlightWord_);
if (ret != 0) {
doc_id_vec.clear();
return -RT_GET_DOC_ERR;
}
sort(doc_info.begin(), doc_info.end());
for (size_t doc_info_idx = 0; doc_info_idx < doc_info.size(); doc_info_idx++){
KeyInfo info;
info.word_freq = 1;
info.field = (*it).field;
info.word = (*it).word;
doc_info_map_[doc_info[doc_info_idx].doc_id].push_back(info);
}
} else {
int ret = logical_operate_->GetDocIdSetByWord(*it, doc_info);
if (ret != 0){
return -RT_GET_DOC_ERR;
}
if (doc_info.size() == 0)
continue;
if (!isAllNumber((*it).word))
highlightWord_.insert((*it).word);
if(sort_type_ == SORT_RELEVANCE){
logical_operate_->CalculateByWord(*it, doc_info, doc_info_map_, key_in_doc_);
}
}
doc_id_vec = vec_union(doc_id_vec, doc_info);
}
if(index == 0){ // 第一个直接赋值给vecs后续的依次与前面的进行逻辑运算
doc_vec_.assign(doc_id_vec.begin(), doc_id_vec.end());
} else {
doc_vec_ = vec_union(doc_vec_, doc_id_vec);
}
}
bool bRet = doc_manager_->GetDocContent(has_gis_, doc_vec_, valid_docs_, distances_);
if (false == bRet) {
log_error("GetDocContent error.");
return -RT_DTC_ERR;
}
return 0;
}
int MatchQueryProcess::GetScoreAndSort(){
// BM25 algorithm
uint32_t doc_cnt = DOC_CNT;
double k1 = 1.2;
double k2 = 200;
double K = 1.65;
string doc_id;
string keyword;
uint32_t word_freq = 0;
uint32_t field = 0;
if(sort_type_ == SORT_RELEVANCE || sort_type_ == SORT_TIMESTAMP){
map<string, vec>::iterator ves_iter = doc_info_map_.begin();
for (; ves_iter != doc_info_map_.end(); ves_iter++) {
double score = 0;
uint32_t key_docs = 0;
doc_id = ves_iter->first;
vector<KeyInfo> &key_info = ves_iter->second;
if(valid_docs_.find(doc_id) == valid_docs_.end()){
continue;
}
set<string> word_set;
map<string, vector<int> > pos_map;
map<string, vector<int> > title_pos_map;
for (uint32_t i = 0; i < key_info.size(); i++) {
keyword = key_info[i].word;
if (word_set.find(keyword) == word_set.end()) {
word_set.insert(keyword);
}
word_freq = key_info[i].word_freq;
field = key_info[i].field;
if (field == LOCATE_ANY) {
pos_map[keyword] = key_info[i].pos_vec;
}
if (field == LOCATE_TITLE) {
title_pos_map[keyword] = key_info[i].pos_vec;
}
key_docs = key_in_doc_[keyword];
score += log((doc_cnt - key_docs + 0.5) / (key_docs + 0.5)) * ((k1 + 1)*word_freq) / (K + word_freq) * (k2 + 1) * 1 / (k2 + 1);
}
/*if (!complete_keys.empty()) { // 完全匹配
if (word_set.size() != word_vec.size()) { // 文章中出现的词语数量与输入的不一致,则不满足完全匹配
continue;
}
else { // 在标题和正文中都不连续出现,则不满足
if (CheckWordContinus(word_vec, pos_map) == false && CheckWordContinus(word_vec, title_pos_map) == false) {
continue;
}
}
}*/
skipList_.InsertNode(score, doc_id.c_str());
}
} else {
set<string>::iterator set_iter = valid_docs_.begin();
for(; set_iter != valid_docs_.end(); set_iter++){
doc_id = *set_iter;
if (sort_type_ == SORT_FIELD_ASC || sort_type_ == SORT_FIELD_DESC){
doc_manager_->GetScoreMap(doc_id, sort_type_, sort_field_, sort_field_type_, appid_);
} else {
skipList_.InsertNode(1, doc_id.c_str());
}
}
}
return 0;
}
void MatchQueryProcess::TaskEnd(){
Json::FastWriter writer;
Json::Value response;
response["code"] = 0;
int sequence = -1;
int rank = 0;
int page_size = component_->PageSize();
int limit_start = page_size * (component_->PageIndex()-1);
int limit_end = page_size * (component_->PageIndex()-1) + page_size - 1;
log_debug("search result begin.");
if((sort_type_ == SORT_FIELD_DESC || sort_type_ == SORT_FIELD_ASC) && skipList_.GetSize() == 0){
OrderOpCond order_op_cond;
order_op_cond.last_id = component_->LastId();
order_op_cond.limit_start = limit_start;
order_op_cond.count = page_size;
order_op_cond.has_extra_filter = false;
if(component_->ExtraFilterKeys().size() != 0 || component_->ExtraFilterAndKeys().size() != 0 || component_->ExtraFilterInvertKeys().size() != 0){
order_op_cond.has_extra_filter = true;
}
if(sort_field_type_ == FIELDTYPE_INT){
rank += doc_manager_->ScoreIntMap().size();
COrderOp<int> orderOp(FIELDTYPE_INT, component_->SearchAfter(), sort_type_);
orderOp.Process(doc_manager_->ScoreIntMap(), atoi(component_->LastScore().c_str()), order_op_cond, response, doc_manager_);
} else if(sort_field_type_ == FIELDTYPE_DOUBLE) {
rank += doc_manager_->ScoreDoubleMap().size();
COrderOp<double> orderOp(FIELDTYPE_DOUBLE, component_->SearchAfter(), sort_type_);
orderOp.Process(doc_manager_->ScoreDoubleMap(), atof(component_->LastScore().c_str()), order_op_cond, response, doc_manager_);
} else {
rank += doc_manager_->ScoreStrMap().size();
COrderOp<string> orderOp(FIELDTYPE_STRING, component_->SearchAfter(), sort_type_);
orderOp.Process(doc_manager_->ScoreStrMap(), component_->LastScore(), order_op_cond, response, doc_manager_);
}
} else if (has_gis_ || sort_type_ == SORT_FIELD_ASC) {
log_debug("m_has_gis or SORT_FIELD_ASC, size:%d ", skipList_.GetSize());
SkipListNode *tmp = skipList_.GetHeader()->level[0].forward;
while (tmp->level[0].forward != NULL) {
// 通过extra_filter_keys进行额外过滤针对区分度不高的字段
if(doc_manager_->CheckDocByExtraFilterKey(tmp->value) == false){
log_debug("CheckDocByExtraFilterKey failed, %s", tmp->value);
tmp = tmp->level[0].forward;
continue;
}
sequence++;
rank++;
if(component_->ReturnAll() == 0){
if (sequence < limit_start || sequence > limit_end) {
tmp = tmp->level[0].forward;
continue;
}
}
Json::Value doc_info;
doc_info["doc_id"] = Json::Value(tmp->value);
doc_info["score"] = Json::Value(tmp->key);
response["result"].append(doc_info);
tmp = tmp->level[0].forward;
}
} else {
SkipListNode *tmp = skipList_.GetFooter()->backward;
while(tmp->backward != NULL) {
if(doc_manager_->CheckDocByExtraFilterKey(tmp->value) == false){
tmp = tmp->backward;
continue;
}
sequence++;
rank++;
if (component_->ReturnAll() == 0){
if (sequence < limit_start || sequence > limit_end) {
tmp = tmp->backward;
continue;
}
}
Json::Value doc_info;
doc_info["doc_id"] = Json::Value(tmp->value);
doc_info["score"] = Json::Value(tmp->key);
response["result"].append(doc_info);
tmp = tmp->backward;
}
}
if(component_->Fields().size() > 0){
doc_manager_->AppendFieldsToRes(response, component_->Fields());
}
if (rank > 0)
AppendHighLightWord(response);
if (has_gis_) {
response["type"] = 1;
}
else {
response["type"] = 0;
}
response["count"] = rank;
/*if(m_index_set_cnt != 0){
response["count"] = m_index_set_cnt;
}*/
log_debug("search result end: %lld.", (long long int)GetSysTimeMicros());
std::string outputConfig = writer.write(response);
request_->setResult(outputConfig);
/*if (component_->ReturnAll() == 0 && component_->CacheSwitch() == 1 && component_->PageIndex() == 1 && has_gis_ == 0
&& rank > 0 && outputConfig.size() < MAX_VALUE_LEN) {
string m_Data_Cache = m_Primary_Data + "|" + component_->DataAnd() + "|" + component_->DataInvert() + "|" + component_->DataComplete() + "|" +
ToString(sort_type_) + "|" + ToString(appid_);
unsigned data_size = m_Data_Cache.size();
int ret = cachelist->add_list(m_Data_Cache.c_str(), outputConfig.c_str(), data_size, outputConfig.size());
if (ret != 0) {
log_error("add to cache_list error, ret: %d.", ret);
}
else {
log_debug("add to cache_list: %s.", m_Data_Cache.c_str());
}
}*/
}
void MatchQueryProcess::AppendHighLightWord(Json::Value& response)
{
int count = 0;
set<string>::iterator iter = highlightWord_.begin();
for (; iter != highlightWord_.end(); iter++) {
if (count >= 10)
break;
count = count + 1;
response["hlWord"].append((*iter).c_str());
}
return ;
}

View File

@ -0,0 +1,50 @@
/*
* =====================================================================================
*
* Filename: query_process.h
*
* Description: query_process class definition.
*
* Version: 1.0
* Created: 14/05/2021
* Revision: none
* Compiler: gcc
*
* Author: zhulin, shzhulin3@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#ifndef __MATCH_QUERY_PROCESS_H__
#define __MATCH_QUERY_PROCESS_H__
#include "query_process.h"
class MatchQueryProcess: public QueryProcess{
public:
MatchQueryProcess(uint32_t appid, Json::Value& value, Component* component);
~MatchQueryProcess();
int ParseContent();
int GetValidDoc();
int GetScoreAndSort();
void TaskEnd();
int ParseContent(uint32_t type);
void AppendHighLightWord(Json::Value& response);
private:
set<string> highlightWord_;
map<string, vec> doc_info_map_;
map<string, uint32_t> key_in_doc_;
vector<IndexInfo> doc_vec_;
hash_double_map distances_;
set<string> valid_docs_;
uint32_t appid_;
uint32_t sort_type_;
string sort_field_;
bool has_gis_;
FIELDTYPE sort_field_type_;
};
#endif

View File

@ -0,0 +1,50 @@
#include "query_process.h"
QueryProcess::QueryProcess(uint32_t appid, Json::Value& value, Component* component)
:component_(component),
appid_(appid),
value_(value)
{
}
QueryProcess::~QueryProcess(){
}
int QueryProcess::DoJob(){
TaskBegin();
ParseContent();
GetValidDoc();
GetScoreAndSort();
TaskEnd();
return 0;
}
void QueryProcess::SetSkipList(SkipList& skipList){
skipList_ = skipList;
}
void QueryProcess::SetRequest(CTaskRequest* request){
request_ = request;
}
void QueryProcess::TaskBegin(){
}
int QueryProcess::ParseContent(){
return 0;
}
int QueryProcess::GetValidDoc(){
return 0;
}
int QueryProcess::GetScoreAndSort(){
return 0;
}
void QueryProcess::TaskEnd(){
}

View File

@ -0,0 +1,56 @@
/*
* =====================================================================================
*
* Filename: query_process.h
*
* Description: query_process class definition.
*
* Version: 1.0
* Created: 14/05/2021
* Revision: none
* Compiler: gcc
*
* Author: zhulin, shzhulin3@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#ifndef __QUERY_PROCESS_H__
#define __QUERY_PROCESS_H__
#include "../component.h"
#include "../logical_operate.h"
#include "../doc_manager.h"
#include "../comm.h"
#include "../db_manager.h"
#include "../split_manager.h"
#include "skiplist.h"
#include "task_request.h"
class QueryProcess{
public:
QueryProcess(uint32_t appid, Json::Value& value, Component* component);
~QueryProcess();
int DoJob();
void SetSkipList(SkipList& skipList);
void SetRequest(CTaskRequest* request);
protected:
void TaskBegin();
virtual int ParseContent();
virtual int GetValidDoc();
virtual int GetScoreAndSort();
virtual void TaskEnd();
protected:
Component* component_;
LogicalOperate* logical_operate_;
DocManager* doc_manager_;
uint32_t appid_;
Json::Value value_;
SkipList skipList_;
CTaskRequest* request_;
};
#endif

View File

@ -41,6 +41,8 @@
#include "index_sync/sequence_search_index.h" #include "index_sync/sequence_search_index.h"
#include "order_op.h" #include "order_op.h"
#include "process/match_query_process.h"
using namespace std; using namespace std;
#define DOC_CNT 10000 #define DOC_CNT 10000
@ -597,6 +599,25 @@ int SearchTask::Process(CTaskRequest *request)
skipList.InitList(); skipList.InitList();
component->InitSwitch(); component->InitSwitch();
log_debug("m_Data: %s", m_Primary_Data.c_str()); log_debug("m_Data: %s", m_Primary_Data.c_str());
m_query_ = component->GetQuery();
if(m_query_.isObject()){
if(m_query_.isMember("match")){
query_process_ = new MatchQueryProcess(m_appid, m_query_["match"], component);
} else {
log_error("query type error.");
return -RT_PARSE_JSON_ERR;
}
query_process_->SetSkipList(skipList);
query_process_->SetRequest(request);
int ret = query_process_->DoJob();
if(ret != 0){
log_error("query_process_ DoJob error, ret: %d", ret);
return ret;
}
return 0;
}
string err_msg = ""; string err_msg = "";
int ret = component->GetQueryWord(m_has_gis, err_msg); int ret = component->GetQueryWord(m_has_gis, err_msg);
if (ret != 0) { if (ret != 0) {

View File

@ -32,6 +32,9 @@
#include <string> #include <string>
#include <map> #include <map>
#include <vector> #include <vector>
#include "process/query_process.h"
using namespace std; using namespace std;
typedef vector<KeyInfo> vec; typedef vector<KeyInfo> vec;
@ -68,6 +71,8 @@ private:
uint32_t m_has_gis; //该appid是否包含有地理位置gis信息的查询 uint32_t m_has_gis; //该appid是否包含有地理位置gis信息的查询
set<string> highlightWord; set<string> highlightWord;
SkipList skipList; SkipList skipList;
QueryProcess* query_process_;
Json::Value m_query_;
}; };

View File

@ -132,7 +132,7 @@ static int accept_connection(int fd)
if (newfd < 0 && errno == EINVAL) if (newfd < 0 && errno == EINVAL)
{ {
if (getppid() == (pid_t)1) if (getppid() == (pid_t)1)
{ // 父进程已经退出 { // <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ѿ<EFBFBD><EFBFBD>˳<EFBFBD>
log_error("dtc father process not exist. helper[%d] exit now.", getpid()); log_error("dtc father process not exist. helper[%d] exit now.", getpid());
exit(0); exit(0);
} }