diff --git a/servant/libservant/EndpointManager.cpp b/servant/libservant/EndpointManager.cpp index a94a769..e0330a4 100644 --- a/servant/libservant/EndpointManager.cpp +++ b/servant/libservant/EndpointManager.cpp @@ -800,8 +800,8 @@ void EndpointManager::updateEndpoints(const set & active, const se _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second)); const string &host = iterAdapter->second->endpoint().host(); - _indexActiveProxys.insert(make_pair(inet_addr(host.data()), iterAdapter->second)); - _sortActivProxys.insert(make_pair(inet_addr(host.data()), iterAdapter->second)); + _indexActiveProxys.insert(make_pair(host, iterAdapter->second)); + _sortActivProxys.insert(make_pair(host, iterAdapter->second)); //设置该节点的静态权重值 iterAdapter->second->setWeight(iter->weight()); @@ -1016,7 +1016,7 @@ AdapterProxy* EndpointManager::getHashProxyForWeight(int64_t hashCode, bool bSta } else { - TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << _vRegProxys[iIndex]->getTransceiver()->getEndpointInfo().desc() << endl); + TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << _vRegProxys[iIndex]->endpoint().desc() << endl); if(_activeProxys.empty()) { TLOGERROR("[EndpointManager::getHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl); @@ -1105,12 +1105,12 @@ AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool b while(_consistentHashWeight.size() > 0) { - unsigned int iIndex = 0; + string sNode; // 通过一致性hash取到对应的节点 - _consistentHashWeight.getIndex(hashCode, iIndex); + _consistentHashWeight.getNodeName(hashCode, sNode); - auto it = _indexActiveProxys.find(iIndex); + auto it = _indexActiveProxys.find(sNode); // 节点不存在,可能是下线或者服务不可用 if (it == _indexActiveProxys.end()) { @@ -1125,15 +1125,15 @@ AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool b } else { - TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << it->second->getTransceiver()->getEndpointInfo().desc() << endl); + TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl); // 剔除节点再次hash if (!it->second->isActiveInReg()) { // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加 - _indexActiveProxys.erase(iIndex); + _indexActiveProxys.erase(sNode); } // checkConHashChange里重新加回到_sortActivProxys重试 - _sortActivProxys.erase(iIndex); + _sortActivProxys.erase(sNode); updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight); if (_indexActiveProxys.empty()) @@ -1251,7 +1251,7 @@ bool EndpointManager::checkHashStaticWeightChange(bool bStatic) return false; } -bool EndpointManager::checkConHashChange(bool bStatic, const map &umLastConHashProxys) +bool EndpointManager::checkConHashChange(bool bStatic, const map &mLastConHashProxys) { // 将之前故障临时剔除的节点重新加回来重试 if (_indexActiveProxys.size() != _sortActivProxys.size()) @@ -1262,14 +1262,14 @@ bool EndpointManager::checkConHashChange(bool bStatic, const mapfirst != itSort->first) { @@ -1433,7 +1433,7 @@ void EndpointManager::updateHashProxyWeighted(bool bStatic) } } -void EndpointManager::updateConHashProxyWeighted(bool bStatic, map &umLastConHashProxys, TC_ConsistentHashNew &conHash) +void EndpointManager::updateConHashProxyWeighted(bool bStatic, map &mLastConHashProxys, TC_ConsistentHashNew &conHash) { if(_sortActivProxys.empty()) { @@ -1441,7 +1441,7 @@ void EndpointManager::updateConHashProxyWeighted(bool bStatic, mapsecond->endpoint().host(), it->first, iWeight); + // 一致性hash用host进行索引,不使用index,这里传0 + conHash.addNode(it->second->endpoint().host(), 0, iWeight); } //防止多个服务节点权重同时更新时一致性哈希环多次更新 it->second->resetWeightChanged(); @@ -1520,7 +1521,7 @@ AdapterProxy* EndpointManager::getHashProxyForNormal(int64_t hashCode) } else { - TLOGWARN("[EndpointManager::getHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << _vRegProxys[hash]->getTransceiver()->getEndpointInfo().desc() << endl); + TLOGWARN("[EndpointManager::getHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << _vRegProxys[hash]->endpoint().desc() << endl); if(_activeProxys.empty()) { TLOGERROR("[EndpointManager::getHashProxyForNormal _activeEndpoints is empty]" << endl); @@ -1604,12 +1605,12 @@ AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode) while(_consistentHash.size() > 0) { - unsigned int iIndex = 0; + string sNode; // 通过一致性hash取到对应的节点 - _consistentHash.getIndex(hashCode, iIndex); + _consistentHash.getNodeName(hashCode, sNode); - auto it = _indexActiveProxys.find(iIndex); + auto it = _indexActiveProxys.find(sNode); // 节点不存在,可能是下线或者服务不可用 if (it == _indexActiveProxys.end()) { @@ -1624,15 +1625,15 @@ AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode) } else { - TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << it->second->getTransceiver()->getEndpointInfo().desc() << endl); + TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl); // 剔除节点再次hash if (!it->second->isActiveInReg()) { // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加 - _indexActiveProxys.erase(iIndex); + _indexActiveProxys.erase(sNode); } // checkConHashChange里重新加回到_sortActivProxys重试 - _sortActivProxys.erase(iIndex); + _sortActivProxys.erase(sNode); updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash); if (_indexActiveProxys.empty()) diff --git a/servant/servant/EndpointManager.h b/servant/servant/EndpointManager.h index 6f7d046..68402b5 100644 --- a/servant/servant/EndpointManager.h +++ b/servant/servant/EndpointManager.h @@ -418,7 +418,7 @@ private: /* * 判断静态权重节点是否有变化 */ - bool checkConHashChange(bool bStatic, const map &umLastConHashProxys); + bool checkConHashChange(bool bStatic, const map &mLastConHashProxys); /* * 更新取模hash方法的静态权重节点信息 @@ -428,7 +428,7 @@ private: /* * 更新一致性hash方法的静态权重节点信息 */ - void updateConHashProxyWeighted(bool bStatic, map &umLastConHashProxys, TC_ConsistentHashNew &conHash); + void updateConHashProxyWeighted(bool bStatic, map &mLastConHashProxys, TC_ConsistentHashNew &conHash); /* * 根据后端服务的权重值选取一个结点 @@ -469,10 +469,10 @@ private: /* * 一致性hash使用,保证强一致性 - * key:inet_addr(ip) + * key:host */ - map _sortActivProxys; - unordered_map _indexActiveProxys; + map _sortActivProxys; + unordered_map _indexActiveProxys; /* * 部署的结点 包括活跃的和不活跃的 @@ -539,7 +539,7 @@ private: /* * 一致性hash静态权重时使用 */ - map _lastConHashWeightProxys; + map _lastConHashWeightProxys; /* * 一致性hash静态权重时使用 @@ -549,7 +549,7 @@ private: /* * 一致性hash普通使用 */ - map _lastConHashProxys; + map _lastConHashProxys; /* * 一致性hash普通使用 diff --git a/util/include/util/tc_consistent_hash_new.h b/util/include/util/tc_consistent_hash_new.h index e9d0d1b..8044aa7 100755 --- a/util/include/util/tc_consistent_hash_new.h +++ b/util/include/util/tc_consistent_hash_new.h @@ -96,6 +96,12 @@ public: */ int32_t iHashCode; + /** + *节点名称 + *node name + */ + string sNode; + /** *节点下标 * node subscript @@ -161,6 +167,19 @@ public: */ int getIndex(const string & key, unsigned int & iIndex); + /** + * @brief 获取某key对应到的节点node的名称. + * @brief Gets the name of the node to which a key corresponds. + * + * @param key key名称 + * @param key key name + * @param sNode 对应到的节点的名称 + * @param sNode the name of the node to which corresponds. + * @return 0:获取成功 -1:没有被添加的节点 + * @return 0:obtain successfully -1:no nodes added + */ + int getNodeName(const string & key, string & sNode); + /** * @brief 获取某hashcode对应到的节点node的下标. * @brief Gets the subscript of the node to which a certain hashcode corresponds @@ -173,6 +192,18 @@ public: */ int getIndex(int32_t hashcode, unsigned int & iIndex); + /** + * @brief 获取某hashcode对应到的节点node的名称. + * @brief Gets the name of the node to which a certain hashcode corresponds + * + * @param hashcode hashcode + * @param sNode 对应到的节点的名称 + * @param sNode the name of the node to which corresponds. + * @return 0:获取成功 -1:没有被添加的节点 + * @return 0:obtain successfully -1:no nodes added + */ + int getNodeName(int32_t hashcode, string & sNode); + /** * @brief 获取当前hash列表的长度. * @brief Get the length of the current hash list diff --git a/util/src/tc_consistent_hash_new.cpp b/util/src/tc_consistent_hash_new.cpp index cb8e825..f58a808 100755 --- a/util/src/tc_consistent_hash_new.cpp +++ b/util/src/tc_consistent_hash_new.cpp @@ -126,7 +126,7 @@ void TC_ConsistentHashNew::printNode() mapNode[_vHashList[i].iIndex] = value; } - cout << "printNode: " << _vHashList[i].iHashCode << "|" << _vHashList[i].iIndex << "|" << mapNode[_vHashList[i].iIndex] << endl; + cout << "printNode: " << _vHashList[i].iHashCode << "|" << _vHashList[i].sNode << "|" << _vHashList[i].iIndex << "|" << mapNode[_vHashList[i].iIndex] << endl; } map::iterator it = mapNode.begin(); @@ -152,6 +152,7 @@ int TC_ConsistentHashNew::addNode(const string & node, unsigned int index, int w } node_T_new stItem; + stItem.sNode = node; stItem.iIndex = index; for (int j = 0; j < weight; j++) @@ -237,4 +238,55 @@ int TC_ConsistentHashNew::getIndex(int32_t hashcode, unsigned int & iIndex) return 0; } +int TC_ConsistentHashNew::getNodeName(const string & key, string & sNode) +{ + if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0) + { + sNode = ""; + return -1; + } + + vector data = TC_MD5::md5bin(key); + int32_t iCode = _ptrHashAlg->hash(data.data(), data.size()); + + return getNodeName(iCode, sNode); +} + +int TC_ConsistentHashNew::getNodeName(int32_t hashcode, string & sNode) +{ + if(_ptrHashAlg.get() == NULL || _vHashList.size() == 0) + { + sNode = ""; + return -1; + } + + // 只保留32位 + int32_t iCode = (hashcode & 0xFFFFFFFFL); + + int low = 0; + int high = (int)_vHashList.size(); + + if(iCode <= _vHashList[0].iHashCode || iCode > _vHashList[high-1].iHashCode) + { + sNode = _vHashList[0].sNode; + return 0; + } + + + while (low < high - 1) + { + int mid = (low + high) / 2; + if (_vHashList[mid].iHashCode > iCode) + { + high = mid; + } + else + { + low = mid; + } + } + sNode = _vHashList[low+1].sNode; + return 0; +} + }