add TermQueryProcess

This commit is contained in:
shzhulin3 2021-05-20 16:53:46 +08:00
parent 784e39b5e3
commit d922ebf21c
7 changed files with 353 additions and 178 deletions

View File

@ -6,7 +6,6 @@
MatchQueryProcess::MatchQueryProcess(uint32_t appid, Json::Value& value, Component* component)
:QueryProcess(appid, value, component){
appid_ = component_->Appid();
sort_type_ = component_->SortType();
sort_field_ = component_->SortField();
has_gis_ = false;
@ -21,43 +20,44 @@ int MatchQueryProcess::ParseContent(){
}
int MatchQueryProcess::ParseContent(uint32_t type){
vector<FieldInfo> fieldInfos;
vector<FieldInfo> field_info_vec;
Json::Value::Members member = value_.getMemberNames();
Json::Value::Members::iterator iter = member.begin();
string fieldname;
string field_name;
Json::Value field_value;
if(iter != member.end()){ // 一个match下只对应一个字段
fieldname = *iter;
field_value = value_[fieldname];
field_name = *iter;
field_value = value_[field_name];
} else {
log_error("MatchQueryProcess error, value is null");
SetErrMsg("MatchQueryProcess error, value is null");
return -RT_PARSE_CONTENT_ERROR;
}
uint32_t segment_tag = 0;
FieldInfo fieldInfo;
uint32_t field = DBManager::Instance()->GetWordField(segment_tag, appid_, fieldname, fieldInfo);
if (field != 0 && segment_tag == 1)
{
FieldInfo field_info;
uint32_t field = DBManager::Instance()->GetWordField(segment_tag, appid_, field_name, field_info);
if (field != 0 && segment_tag == 1) {
string split_data = SplitManager::Instance()->split(field_value.asString(), appid_);
log_debug("split_data: %s", split_data.c_str());
vector<string> split_datas = splitEx(split_data, "|");
for(size_t index = 0; index < split_datas.size(); index++)
{
for(size_t index = 0; index < split_datas.size(); index++) {
FieldInfo info;
info.field = fieldInfo.field;
info.field_type = fieldInfo.field_type;
info.field = field_info.field;
info.field_type = field_info.field_type;
info.word = split_datas[index];
info.segment_tag = fieldInfo.segment_tag;
fieldInfos.push_back(info);
info.segment_tag = field_info.segment_tag;
field_info_vec.push_back(info);
}
}
else if (field != 0)
{
fieldInfo.word = field_value.asString();
fieldInfos.push_back(fieldInfo);
} else if (field != 0) {
field_info.word = field_value.asString();
field_info_vec.push_back(field_info);
} else {
stringstream ss_msg;
ss_msg << "field_name[" << field_name << "] error, not in the app_field_define";
SetErrMsg(ss_msg.str());
return -RT_PARSE_CONTENT_ERROR;
}
component_->AddToFieldList(type, fieldInfos);
component_->AddToFieldList(type, field_info_vec);
return 0;
}
@ -68,9 +68,9 @@ int MatchQueryProcess::GetValidDoc(){
for (size_t index = 0; index < component_->Keys().size(); index++)
{
vector<IndexInfo> doc_id_vec;
vector<FieldInfo> fieldInfos = component_->Keys()[index];
vector<FieldInfo> field_info_vec = component_->Keys()[index];
vector<FieldInfo>::iterator it;
for (it = fieldInfos.begin(); it != fieldInfos.end(); it++) {
for (it = field_info_vec.begin(); it != field_info_vec.end(); it++) {
vector<IndexInfo> doc_info;
if ((*it).segment_tag == 3) {
int ret = GetDocByShiftWord(*it, doc_info, appid_, highlightWord_);
@ -105,9 +105,6 @@ int MatchQueryProcess::GetValidDoc(){
if (ret != 0){
return -RT_GET_DOC_ERR;
}
if (doc_info.size() == 0)
continue;
if (!isAllNumber((*it).word))
highlightWord_.insert((*it).word);
if(sort_type_ == SORT_RELEVANCE){
logical_operate_->CalculateByWord(*it, doc_info, doc_info_map_, key_in_doc_);
@ -115,12 +112,8 @@ int MatchQueryProcess::GetValidDoc(){
}
doc_id_vec = vec_union(doc_id_vec, doc_info);
}
if(index == 0){ // 第一个直接赋值给vecs后续的依次与前面的进行逻辑运算
doc_vec_.assign(doc_id_vec.begin(), doc_id_vec.end());
} else {
doc_vec_ = vec_union(doc_vec_, doc_id_vec);
}
}
bool bRet = doc_manager_->GetDocContent(has_gis_, doc_vec_, valid_docs_, distances_);
if (false == bRet) {
@ -139,9 +132,8 @@ int MatchQueryProcess::GetScoreAndSort(){
string doc_id;
string keyword;
uint32_t word_freq = 0;
uint32_t field = 0;
if(sort_type_ == SORT_RELEVANCE || sort_type_ == SORT_TIMESTAMP){
if(sort_type_ == SORT_RELEVANCE){
map<string, vec>::iterator ves_iter = doc_info_map_.begin();
for (; ves_iter != doc_info_map_.end(); ves_iter++) {
double score = 0;
@ -153,38 +145,14 @@ int MatchQueryProcess::GetScoreAndSort(){
continue;
}
set<string> word_set;
map<string, vector<int> > pos_map;
map<string, vector<int> > title_pos_map;
for (uint32_t i = 0; i < key_info.size(); i++) {
keyword = key_info[i].word;
if (word_set.find(keyword) == word_set.end()) {
word_set.insert(keyword);
}
word_freq = key_info[i].word_freq;
field = key_info[i].field;
if (field == LOCATE_ANY) {
pos_map[keyword] = key_info[i].pos_vec;
}
if (field == LOCATE_TITLE) {
title_pos_map[keyword] = key_info[i].pos_vec;
}
key_docs = key_in_doc_[keyword];
score += log((doc_cnt - key_docs + 0.5) / (key_docs + 0.5)) * ((k1 + 1)*word_freq) / (K + word_freq) * (k2 + 1) * 1 / (k2 + 1);
}
/*if (!complete_keys.empty()) { // 完全匹配
if (word_set.size() != word_vec.size()) { // 文章中出现的词语数量与输入的不一致,则不满足完全匹配
continue;
}
else { // 在标题和正文中都不连续出现,则不满足
if (CheckWordContinus(word_vec, pos_map) == false && CheckWordContinus(word_vec, title_pos_map) == false) {
continue;
}
}
}*/
skipList_.InsertNode(score, doc_id.c_str());
}
} else {
set<string>::iterator set_iter = valid_docs_.begin();
for(; set_iter != valid_docs_.end(); set_iter++){
@ -218,9 +186,6 @@ void MatchQueryProcess::TaskEnd(){
order_op_cond.limit_start = limit_start;
order_op_cond.count = page_size;
order_op_cond.has_extra_filter = false;
if(component_->ExtraFilterKeys().size() != 0 || component_->ExtraFilterAndKeys().size() != 0 || component_->ExtraFilterInvertKeys().size() != 0){
order_op_cond.has_extra_filter = true;
}
if(sort_field_type_ == FIELDTYPE_INT){
rank += doc_manager_->ScoreIntMap().size();
COrderOp<int> orderOp(FIELDTYPE_INT, component_->SearchAfter(), sort_type_);
@ -234,37 +199,9 @@ void MatchQueryProcess::TaskEnd(){
COrderOp<string> orderOp(FIELDTYPE_STRING, component_->SearchAfter(), sort_type_);
orderOp.Process(doc_manager_->ScoreStrMap(), component_->LastScore(), order_op_cond, response, doc_manager_);
}
} else if (has_gis_ || sort_type_ == SORT_FIELD_ASC) {
log_debug("m_has_gis or SORT_FIELD_ASC, size:%d ", skipList_.GetSize());
SkipListNode *tmp = skipList_.GetHeader()->level[0].forward;
while (tmp->level[0].forward != NULL) {
// 通过extra_filter_keys进行额外过滤针对区分度不高的字段
if(doc_manager_->CheckDocByExtraFilterKey(tmp->value) == false){
log_debug("CheckDocByExtraFilterKey failed, %s", tmp->value);
tmp = tmp->level[0].forward;
continue;
}
sequence++;
rank++;
if(component_->ReturnAll() == 0){
if (sequence < limit_start || sequence > limit_end) {
tmp = tmp->level[0].forward;
continue;
}
}
Json::Value doc_info;
doc_info["doc_id"] = Json::Value(tmp->value);
doc_info["score"] = Json::Value(tmp->key);
response["result"].append(doc_info);
tmp = tmp->level[0].forward;
}
} else {
SkipListNode *tmp = skipList_.GetFooter()->backward;
while(tmp->backward != NULL) {
if(doc_manager_->CheckDocByExtraFilterKey(tmp->value) == false){
tmp = tmp->backward;
continue;
}
sequence++;
rank++;
if (component_->ReturnAll() == 0){
@ -285,43 +222,24 @@ void MatchQueryProcess::TaskEnd(){
doc_manager_->AppendFieldsToRes(response, component_->Fields());
}
if (rank > 0)
if (rank > 0){
AppendHighLightWord(response);
if (has_gis_) {
response["type"] = 1;
}
else {
response["type"] = 0;
}
response["count"] = rank;
/*if(m_index_set_cnt != 0){
response["count"] = m_index_set_cnt;
}*/
log_debug("search result end: %lld.", (long long int)GetSysTimeMicros());
std::string outputConfig = writer.write(response);
request_->setResult(outputConfig);
/*if (component_->ReturnAll() == 0 && component_->CacheSwitch() == 1 && component_->PageIndex() == 1 && has_gis_ == 0
&& rank > 0 && outputConfig.size() < MAX_VALUE_LEN) {
string m_Data_Cache = m_Primary_Data + "|" + component_->DataAnd() + "|" + component_->DataInvert() + "|" + component_->DataComplete() + "|" +
ToString(sort_type_) + "|" + ToString(appid_);
unsigned data_size = m_Data_Cache.size();
int ret = cachelist->add_list(m_Data_Cache.c_str(), outputConfig.c_str(), data_size, outputConfig.size());
if (ret != 0) {
log_error("add to cache_list error, ret: %d.", ret);
}
else {
log_debug("add to cache_list: %s.", m_Data_Cache.c_str());
}
}*/
}
void MatchQueryProcess::AppendHighLightWord(Json::Value& response)
{
void MatchQueryProcess::AppendHighLightWord(Json::Value& response){
int count = 0;
set<string>::iterator iter = highlightWord_.begin();
for (; iter != highlightWord_.end(); iter++) {
if (count >= 10)
if (count >= 10){
break;
}
count = count + 1;
response["hlWord"].append((*iter).c_str());
}

View File

@ -1,9 +1,9 @@
/*
* =====================================================================================
*
* Filename: query_process.h
* Filename: match_query_process.h
*
* Description: query_process class definition.
* Description: match_query_process class definition.
*
* Version: 1.0
* Created: 14/05/2021
@ -40,7 +40,6 @@ private:
vector<IndexInfo> doc_vec_;
hash_double_map distances_;
set<string> valid_docs_;
uint32_t appid_;
uint32_t sort_type_;
string sort_field_;
bool has_gis_;

View File

@ -9,14 +9,31 @@ value_(value)
}
QueryProcess::~QueryProcess(){
if(NULL != component_){
delete component_;
}
if(NULL != logical_operate_){
delete logical_operate_;
}
if(NULL != doc_manager_){
delete doc_manager_;
}
}
int QueryProcess::DoJob(){
TaskBegin();
ParseContent();
GetValidDoc();
GetScoreAndSort();
int ret = ParseContent();
if(0 != ret){
return ret;
}
ret = GetValidDoc();
if(0 != ret){
return ret;
}
ret = GetScoreAndSort();
if(0 != ret){
return ret;
}
TaskEnd();
return 0;
}
@ -48,3 +65,12 @@ int QueryProcess::GetScoreAndSort(){
void QueryProcess::TaskEnd(){
}
void QueryProcess::SetErrMsg(string err_msg){
log_error(err_msg.c_str());
err_msg_ = err_msg;
}
string QueryProcess::GetErrMsg(){
return err_msg_;
}

View File

@ -35,6 +35,8 @@ public:
int DoJob();
void SetSkipList(SkipList& skipList);
void SetRequest(CTaskRequest* request);
void SetErrMsg(string err_msg);
string GetErrMsg();
protected:
void TaskBegin();
@ -51,6 +53,7 @@ protected:
Json::Value value_;
SkipList skipList_;
CTaskRequest* request_;
string err_msg_;
};
#endif

View File

@ -0,0 +1,173 @@
#include "term_query_process.h"
#include <sstream>
#include "../order_op.h"
TermQueryProcess::TermQueryProcess(uint32_t appid, Json::Value& value, Component* component)
:QueryProcess(appid, value, component){
sort_type_ = component_->SortType();
sort_field_ = component_->SortField();
has_gis_ = false;
}
TermQueryProcess::~TermQueryProcess(){
}
int TermQueryProcess::ParseContent(){
return ParseContent(ORKEY);
}
int TermQueryProcess::ParseContent(uint32_t type){
vector<FieldInfo> field_info_vec;
Json::Value::Members member = value_.getMemberNames();
Json::Value::Members::iterator iter = member.begin();
string field_name;
Json::Value field_value;
if(iter != member.end()){ // 一个term下只对应一个字段
field_name = *iter;
field_value = value_[field_name];
} else {
SetErrMsg("TermQueryProcess error, value is null");
return -RT_PARSE_CONTENT_ERROR;
}
uint32_t segment_tag = 0;
FieldInfo field_info;
uint32_t field = DBManager::Instance()->GetWordField(segment_tag, appid_, field_name, field_info);
if(field != 0){
field_info.word = field_value.asString();
field_info_vec.push_back(field_info);
} else {
stringstream ss_msg;
ss_msg << "field_name[" << field_name << "] error, not in the app_field_define";
SetErrMsg(ss_msg.str());
return -RT_PARSE_CONTENT_ERROR;
}
component_->AddToFieldList(type, field_info_vec);
return 0;
}
int TermQueryProcess::GetValidDoc(){
doc_manager_ = new DocManager(component_);
logical_operate_ = new LogicalOperate(appid_, sort_type_, has_gis_, component_->CacheSwitch());
for (size_t index = 0; index < component_->Keys().size(); index++)
{
vector<IndexInfo> doc_id_vec;
vector<FieldInfo> field_info_vec = component_->Keys()[index];
vector<FieldInfo>::iterator it;
for (it = field_info_vec.begin(); it != field_info_vec.end(); it++) {
vector<IndexInfo> doc_info;
int ret = logical_operate_->GetDocIdSetByWord(*it, doc_info);
if (ret != 0){
return -RT_GET_DOC_ERR;
}
highlightWord_.insert((*it).word);
if(sort_type_ == SORT_RELEVANCE){
logical_operate_->CalculateByWord(*it, doc_info, doc_info_map_, key_in_doc_);
}
doc_id_vec = vec_union(doc_id_vec, doc_info);
}
doc_vec_ = vec_union(doc_vec_, doc_id_vec);
}
bool bRet = doc_manager_->GetDocContent(has_gis_, doc_vec_, valid_docs_, distances_);
if (false == bRet) {
log_error("GetDocContent error.");
return -RT_DTC_ERR;
}
return 0;
}
int TermQueryProcess::GetScoreAndSort(){
set<string>::iterator set_iter = valid_docs_.begin();
for(; set_iter != valid_docs_.end(); set_iter++){
string doc_id = *set_iter;
if (sort_type_ == SORT_FIELD_ASC || sort_type_ == SORT_FIELD_DESC){
doc_manager_->GetScoreMap(doc_id, sort_type_, sort_field_, sort_field_type_, appid_);
} else {
skipList_.InsertNode(1, doc_id.c_str());
}
}
return 0;
}
void TermQueryProcess::TaskEnd(){
Json::FastWriter writer;
Json::Value response;
response["code"] = 0;
int sequence = -1;
int rank = 0;
int page_size = component_->PageSize();
int limit_start = page_size * (component_->PageIndex()-1);
int limit_end = page_size * (component_->PageIndex()-1) + page_size - 1;
log_debug("search result begin.");
if((sort_type_ == SORT_FIELD_DESC || sort_type_ == SORT_FIELD_ASC) && skipList_.GetSize() == 0){
OrderOpCond order_op_cond;
order_op_cond.last_id = component_->LastId();
order_op_cond.limit_start = limit_start;
order_op_cond.count = page_size;
order_op_cond.has_extra_filter = false;
if(sort_field_type_ == FIELDTYPE_INT){
rank += doc_manager_->ScoreIntMap().size();
COrderOp<int> orderOp(FIELDTYPE_INT, component_->SearchAfter(), sort_type_);
orderOp.Process(doc_manager_->ScoreIntMap(), atoi(component_->LastScore().c_str()), order_op_cond, response, doc_manager_);
} else if(sort_field_type_ == FIELDTYPE_DOUBLE) {
rank += doc_manager_->ScoreDoubleMap().size();
COrderOp<double> orderOp(FIELDTYPE_DOUBLE, component_->SearchAfter(), sort_type_);
orderOp.Process(doc_manager_->ScoreDoubleMap(), atof(component_->LastScore().c_str()), order_op_cond, response, doc_manager_);
} else {
rank += doc_manager_->ScoreStrMap().size();
COrderOp<string> orderOp(FIELDTYPE_STRING, component_->SearchAfter(), sort_type_);
orderOp.Process(doc_manager_->ScoreStrMap(), component_->LastScore(), order_op_cond, response, doc_manager_);
}
} else {
SkipListNode *tmp = skipList_.GetFooter()->backward;
while(tmp->backward != NULL) {
sequence++;
rank++;
if (component_->ReturnAll() == 0){
if (sequence < limit_start || sequence > limit_end) {
tmp = tmp->backward;
continue;
}
}
Json::Value doc_info;
doc_info["doc_id"] = Json::Value(tmp->value);
doc_info["score"] = Json::Value(tmp->key);
response["result"].append(doc_info);
tmp = tmp->backward;
}
}
if(component_->Fields().size() > 0){
doc_manager_->AppendFieldsToRes(response, component_->Fields());
}
if (rank > 0){
AppendHighLightWord(response);
}
response["type"] = 0;
response["count"] = rank;
log_debug("search result end: %lld.", (long long int)GetSysTimeMicros());
std::string outputConfig = writer.write(response);
request_->setResult(outputConfig);
}
void TermQueryProcess::AppendHighLightWord(Json::Value& response){
int count = 0;
set<string>::iterator iter = highlightWord_.begin();
for (; iter != highlightWord_.end(); iter++) {
if (count >= 10){
break;
}
count = count + 1;
response["hlWord"].append((*iter).c_str());
}
return ;
}

View File

@ -0,0 +1,49 @@
/*
* =====================================================================================
*
* Filename: term_query_process.h
*
* Description: term_query_process class definition.
*
* Version: 1.0
* Created: 20/05/2021
* Revision: none
* Compiler: gcc
*
* Author: zhulin, shzhulin3@jd.com
* Company: JD.com, Inc.
*
* =====================================================================================
*/
#ifndef __TERM_QUERY_PROCESS_H__
#define __TERM_QUERY_PROCESS_H__
#include "query_process.h"
class TermQueryProcess: public QueryProcess{
public:
TermQueryProcess(uint32_t appid, Json::Value& value, Component* component);
~TermQueryProcess();
int ParseContent();
int GetValidDoc();
int GetScoreAndSort();
void TaskEnd();
int ParseContent(uint32_t type);
void AppendHighLightWord(Json::Value& response);
private:
set<string> highlightWord_;
map<string, vec> doc_info_map_;
map<string, uint32_t> key_in_doc_;
vector<IndexInfo> doc_vec_;
hash_double_map distances_;
set<string> valid_docs_;
uint32_t sort_type_;
string sort_field_;
bool has_gis_;
FIELDTYPE sort_field_type_;
};
#endif

View File

@ -606,6 +606,9 @@ int SearchTask::Process(CTaskRequest *request)
query_process_ = new MatchQueryProcess(m_appid, m_query_["match"], component);
} else {
log_error("query type error.");
string str = GenReplyStr(PARAMETER_ERR, "query type error.");
request->setResult(str);
common::ProfilerMonitor::GetInstance().FunctionError(caller_info);
return -RT_PARSE_JSON_ERR;
}
query_process_->SetSkipList(skipList);
@ -613,8 +616,12 @@ int SearchTask::Process(CTaskRequest *request)
int ret = query_process_->DoJob();
if(ret != 0){
log_error("query_process_ DoJob error, ret: %d", ret);
string str = GenReplyStr(PARAMETER_ERR, query_process_->GetErrMsg());
request->setResult(str);
common::ProfilerMonitor::GetInstance().FunctionError(caller_info);
return ret;
}
common::ProfilerMonitor::GetInstance().RegisterInfoEnd(caller_info);
return 0;
}