一致性hash异常适配域名、ipv6

This commit is contained in:
wuxiaofeng1 2021-11-25 20:15:58 +08:00 committed by ruanshudong
parent 1b3aea34fa
commit e5ac86f299
4 changed files with 115 additions and 31 deletions

View File

@ -800,8 +800,8 @@ void EndpointManager::updateEndpoints(const set<EndpointInfo> & active, const se
_regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
const string &host = iterAdapter->second->endpoint().host();
_indexActiveProxys.insert(make_pair(inet_addr(host.data()), iterAdapter->second));
_sortActivProxys.insert(make_pair(inet_addr(host.data()), iterAdapter->second));
_indexActiveProxys.insert(make_pair(host, iterAdapter->second));
_sortActivProxys.insert(make_pair(host, iterAdapter->second));
//设置该节点的静态权重值
iterAdapter->second->setWeight(iter->weight());
@ -1016,7 +1016,7 @@ AdapterProxy* EndpointManager::getHashProxyForWeight(int64_t hashCode, bool bSta
}
else
{
TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << _vRegProxys[iIndex]->getTransceiver()->getEndpointInfo().desc() << endl);
TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << _vRegProxys[iIndex]->endpoint().desc() << endl);
if(_activeProxys.empty())
{
TLOGERROR("[EndpointManager::getHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
@ -1105,12 +1105,12 @@ AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool b
while(_consistentHashWeight.size() > 0)
{
unsigned int iIndex = 0;
string sNode;
// 通过一致性hash取到对应的节点
_consistentHashWeight.getIndex(hashCode, iIndex);
_consistentHashWeight.getNodeName(hashCode, sNode);
auto it = _indexActiveProxys.find(iIndex);
auto it = _indexActiveProxys.find(sNode);
// 节点不存在,可能是下线或者服务不可用
if (it == _indexActiveProxys.end())
{
@ -1125,15 +1125,15 @@ AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool b
}
else
{
TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << it->second->getTransceiver()->getEndpointInfo().desc() << endl);
TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
// 剔除节点再次hash
if (!it->second->isActiveInReg())
{
// 如果在主控的注册状态不是active直接删除如果状态有变更由updateEndpoints函数里重新添加
_indexActiveProxys.erase(iIndex);
_indexActiveProxys.erase(sNode);
}
// checkConHashChange里重新加回到_sortActivProxys重试
_sortActivProxys.erase(iIndex);
_sortActivProxys.erase(sNode);
updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
if (_indexActiveProxys.empty())
@ -1251,7 +1251,7 @@ bool EndpointManager::checkHashStaticWeightChange(bool bStatic)
return false;
}
bool EndpointManager::checkConHashChange(bool bStatic, const map<uint32_t, AdapterProxy*> &umLastConHashProxys)
bool EndpointManager::checkConHashChange(bool bStatic, const map<string, AdapterProxy*> &mLastConHashProxys)
{
// 将之前故障临时剔除的节点重新加回来重试
if (_indexActiveProxys.size() != _sortActivProxys.size())
@ -1262,14 +1262,14 @@ bool EndpointManager::checkConHashChange(bool bStatic, const map<uint32_t, Adapt
}
}
if(umLastConHashProxys.size() != _sortActivProxys.size())
if(mLastConHashProxys.size() != _sortActivProxys.size())
{
return true;
}
auto itLast = umLastConHashProxys.begin();
auto itLast = mLastConHashProxys.begin();
auto itSort = _sortActivProxys.begin();
for (; itLast!=umLastConHashProxys.end() && itSort!=_sortActivProxys.end(); ++itLast,++itSort)
for (; itLast!=mLastConHashProxys.end() && itSort!=_sortActivProxys.end(); ++itLast,++itSort)
{
if (itLast->first != itSort->first)
{
@ -1433,7 +1433,7 @@ void EndpointManager::updateHashProxyWeighted(bool bStatic)
}
}
void EndpointManager::updateConHashProxyWeighted(bool bStatic, map<uint32_t, AdapterProxy*> &umLastConHashProxys, TC_ConsistentHashNew &conHash)
void EndpointManager::updateConHashProxyWeighted(bool bStatic, map<string, AdapterProxy*> &mLastConHashProxys, TC_ConsistentHashNew &conHash)
{
if(_sortActivProxys.empty())
{
@ -1441,7 +1441,7 @@ void EndpointManager::updateConHashProxyWeighted(bool bStatic, map<uint32_t, Ada
return ;
}
umLastConHashProxys = _sortActivProxys;
mLastConHashProxys = _sortActivProxys;
conHash.clear();
for (auto it = _sortActivProxys.begin(); it != _sortActivProxys.end(); ++it)
@ -1457,7 +1457,8 @@ void EndpointManager::updateConHashProxyWeighted(bool bStatic, map<uint32_t, Ada
// 同一服务有多个obj的情况
// 同一hash值调用不同的obj会hash到不同的服务器
// 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
conHash.addNode(it->second->endpoint().host(), it->first, iWeight);
// 一致性hash用host进行索引不使用index这里传0
conHash.addNode(it->second->endpoint().host(), 0, iWeight);
}
//防止多个服务节点权重同时更新时一致性哈希环多次更新
it->second->resetWeightChanged();
@ -1520,7 +1521,7 @@ AdapterProxy* EndpointManager::getHashProxyForNormal(int64_t hashCode)
}
else
{
TLOGWARN("[EndpointManager::getHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << _vRegProxys[hash]->getTransceiver()->getEndpointInfo().desc() << endl);
TLOGWARN("[EndpointManager::getHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << _vRegProxys[hash]->endpoint().desc() << endl);
if(_activeProxys.empty())
{
TLOGERROR("[EndpointManager::getHashProxyForNormal _activeEndpoints is empty]" << endl);
@ -1604,12 +1605,12 @@ AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode)
while(_consistentHash.size() > 0)
{
unsigned int iIndex = 0;
string sNode;
// 通过一致性hash取到对应的节点
_consistentHash.getIndex(hashCode, iIndex);
_consistentHash.getNodeName(hashCode, sNode);
auto it = _indexActiveProxys.find(iIndex);
auto it = _indexActiveProxys.find(sNode);
// 节点不存在,可能是下线或者服务不可用
if (it == _indexActiveProxys.end())
{
@ -1624,15 +1625,15 @@ AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode)
}
else
{
TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << it->second->getTransceiver()->getEndpointInfo().desc() << endl);
TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
// 剔除节点再次hash
if (!it->second->isActiveInReg())
{
// 如果在主控的注册状态不是active直接删除如果状态有变更由updateEndpoints函数里重新添加
_indexActiveProxys.erase(iIndex);
_indexActiveProxys.erase(sNode);
}
// checkConHashChange里重新加回到_sortActivProxys重试
_sortActivProxys.erase(iIndex);
_sortActivProxys.erase(sNode);
updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
if (_indexActiveProxys.empty())

View File

@ -418,7 +418,7 @@ private:
/*
*
*/
bool checkConHashChange(bool bStatic, const map<uint32_t, AdapterProxy*> &umLastConHashProxys);
bool checkConHashChange(bool bStatic, const map<string, AdapterProxy*> &mLastConHashProxys);
/*
* hash方法的静态权重节点信息
@ -428,7 +428,7 @@ private:
/*
* hash方法的静态权重节点信息
*/
void updateConHashProxyWeighted(bool bStatic, map<uint32_t, AdapterProxy*> &umLastConHashProxys, TC_ConsistentHashNew &conHash);
void updateConHashProxyWeighted(bool bStatic, map<string, AdapterProxy*> &mLastConHashProxys, TC_ConsistentHashNew &conHash);
/*
*
@ -469,10 +469,10 @@ private:
/*
* hash使用
* keyinet_addr(ip)
* keyhost
*/
map<uint32_t, AdapterProxy*> _sortActivProxys;
unordered_map<uint32_t, AdapterProxy*> _indexActiveProxys;
map<string, AdapterProxy*> _sortActivProxys;
unordered_map<string, AdapterProxy*> _indexActiveProxys;
/*
*
@ -539,7 +539,7 @@ private:
/*
* hash静态权重时使用
*/
map<uint32_t, AdapterProxy*> _lastConHashWeightProxys;
map<string, AdapterProxy*> _lastConHashWeightProxys;
/*
* hash静态权重时使用
@ -549,7 +549,7 @@ private:
/*
* hash普通使用
*/
map<uint32_t, AdapterProxy*> _lastConHashProxys;
map<string, AdapterProxy*> _lastConHashProxys;
/*
* hash普通使用

View File

@ -96,6 +96,12 @@ public:
*/
int32_t iHashCode;
/**
*
*node name
*/
string sNode;
/**
*
* node subscript
@ -161,6 +167,19 @@ public:
*/
int getIndex(const string & key, unsigned int & iIndex);
/**
* @brief key对应到的节点node的名称.
* @brief Gets the name of the node to which a key corresponds.
*
* @param key key名称
* @param key key name
* @param sNode
* @param sNode the name of the node to which corresponds.
* @return 0: -1:
* @return 0:obtain successfully -1:no nodes added
*/
int getNodeName(const string & key, string & sNode);
/**
* @brief hashcode对应到的节点node的下标.
* @brief Gets the subscript of the node to which a certain hashcode corresponds
@ -173,6 +192,18 @@ public:
*/
int getIndex(int32_t hashcode, unsigned int & iIndex);
/**
* @brief hashcode对应到的节点node的名称.
* @brief Gets the name of the node to which a certain hashcode corresponds
*
* @param hashcode hashcode
* @param sNode
* @param sNode the name of the node to which corresponds.
* @return 0: -1:
* @return 0:obtain successfully -1:no nodes added
*/
int getNodeName(int32_t hashcode, string & sNode);
/**
* @brief hash列表的长度.
* @brief Get the length of the current hash list

View File

@ -126,7 +126,7 @@ void TC_ConsistentHashNew::printNode()
mapNode[_vHashList[i].iIndex] = value;
}
cout << "printNode: " << _vHashList[i].iHashCode << "|" << _vHashList[i].iIndex << "|" << mapNode[_vHashList[i].iIndex] << endl;
cout << "printNode: " << _vHashList[i].iHashCode << "|" << _vHashList[i].sNode << "|" << _vHashList[i].iIndex << "|" << mapNode[_vHashList[i].iIndex] << endl;
}
map<unsigned int, unsigned int>::iterator it = mapNode.begin();
@ -152,6 +152,7 @@ int TC_ConsistentHashNew::addNode(const string & node, unsigned int index, int w
}
node_T_new stItem;
stItem.sNode = node;
stItem.iIndex = index;
for (int j = 0; j < weight; j++)
@ -237,4 +238,55 @@ int TC_ConsistentHashNew::getIndex(int32_t hashcode, unsigned int & iIndex)
return 0;
}
int TC_ConsistentHashNew::getNodeName(const string & key, string & sNode)
{
if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0)
{
sNode = "";
return -1;
}
vector<char> data = TC_MD5::md5bin(key);
int32_t iCode = _ptrHashAlg->hash(data.data(), data.size());
return getNodeName(iCode, sNode);
}
int TC_ConsistentHashNew::getNodeName(int32_t hashcode, string & sNode)
{
if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0)
{
sNode = "";
return -1;
}
// 只保留32位
int32_t iCode = (hashcode & 0xFFFFFFFFL);
int low = 0;
int high = (int)_vHashList.size();
if(iCode <= _vHashList[0].iHashCode || iCode > _vHashList[high-1].iHashCode)
{
sNode = _vHashList[0].sNode;
return 0;
}
while (low < high - 1)
{
int mid = (low + high) / 2;
if (_vHashList[mid].iHashCode > iCode)
{
high = mid;
}
else
{
low = mid;
}
}
sNode = _vHashList[low+1].sNode;
return 0;
}
}