Fix some bugs for naming module (#4381)

* use instance id get instance metadata

* Fix Disconnection can't notify to naming module.

* Support subscribe specified cluster instance for 1.x

* Fix 2.0 client subscribe service can't be notify problem

* Fix only can search the first service problem

* Fix auto clean empty service can't work problem
This commit is contained in:
杨翊 SionYang 2020-12-01 21:31:18 +08:00 committed by GitHub
parent 0462644a3c
commit 2728df72f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 147 additions and 79 deletions

View File

@ -24,7 +24,6 @@ import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.core.Balancer;
@ -214,13 +213,15 @@ public class NacosNamingService implements NamingService {
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ","));
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
serviceInfo = clientProxy
.queryInstancesOfService(serviceName, groupName, StringUtils.join(clusters, ","), 0, false);
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
@ -274,11 +275,14 @@ public class NacosNamingService implements NamingService {
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ","));
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
serviceInfo = clientProxy
.queryInstancesOfService(serviceName, groupName, StringUtils.join(clusters, ","), 0, false);
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
return selectInstances(serviceInfo, healthy);
}
@ -341,13 +345,16 @@ public class NacosNamingService implements NamingService {
@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
return Balancer.RandomByWeight.selectHost(
serviceInfoHolder.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ",")));
ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
return Balancer.RandomByWeight.selectHost(serviceInfo);
} else {
ServiceInfo serviceInfo = clientProxy
.queryInstancesOfService(serviceName, groupName, StringUtils.join(clusters, ","), 0, false);
.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
return Balancer.RandomByWeight.selectHost(serviceInfo);
}
}
@ -374,7 +381,7 @@ public class NacosNamingService implements NamingService {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(NamingUtils.getGroupedName(serviceName, groupName), clusterString, listener);
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}
@ -396,10 +403,9 @@ public class NacosNamingService implements NamingService {
@Override
public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
String fullServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String clustersString = StringUtils.join(clusters, ",");
changeNotifier.deregisterListener(fullServiceName, clustersString, listener);
if (!changeNotifier.isSubscribed(fullServiceName, clustersString)) {
changeNotifier.deregisterListener(groupName, serviceName, clustersString, listener);
if (!changeNotifier.isSubscribed(groupName, serviceName, clustersString)) {
clientProxy.unsubscribe(serviceName, groupName, clustersString);
}
}

View File

@ -98,12 +98,6 @@ public class ServiceInfoHolder implements Closeable {
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo result = getServiceInfo0(groupedServiceName, clusters);
return null == result ? new ServiceInfo(groupedServiceName, clusters) : result;
}
private ServiceInfo getServiceInfo0(String groupedServiceName, String clusters) {
String key = ServiceInfo.getKey(groupedServiceName, clusters);
return serviceInfoMap.get(key);
}

View File

@ -156,7 +156,7 @@ public class ServiceInfoUpdateService implements Closeable {
long delayTime = -1;
try {
if (!changeNotifier.isSubscribed(groupedServiceName, clusters) && !futureMap.containsKey(serviceKey)) {
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
NAMING_LOGGER
.info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
return;

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.CollectionUtils;
@ -45,12 +46,13 @@ public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
/**
* register listener.
*
* @param serviceName combineServiceName, such as 'xxx@@xxx'
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @param listener custom listener
*/
public void registerListener(String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(serviceName, clusters);
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
synchronized (lock) {
@ -67,12 +69,13 @@ public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
/**
* deregister listener.
*
* @param serviceName combineServiceName, such as 'xxx@@xxx'
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @param listener custom listener
*/
public void deregisterListener(String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(serviceName, clusters);
public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
return;
@ -86,12 +89,13 @@ public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
/**
* check serviceName,clusters is subscribed.
*
* @param serviceName combineServiceName, such as 'xxx@@xxx'
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @return is serviceName,clusters subscribed
*/
public boolean isSubscribed(String serviceName, String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters);
public boolean isSubscribed(String groupName, String serviceName, String clusters) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
return CollectionUtils.isNotEmpty(eventListeners);
}
@ -106,7 +110,8 @@ public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
@Override
public void onEvent(InstancesChangeEvent event) {
String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters());
String key = ServiceInfo
.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (CollectionUtils.isEmpty(eventListeners)) {
return;

View File

@ -72,8 +72,12 @@ public class ClientConnectionEventListenerRegistry {
executorService.schedule(new Runnable() {
@Override
public void run() {
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
clientConnectionEventListener.clientDisConnected(connection);
for (ClientConnectionEventListener each : clientConnectionEventListeners) {
try {
each.clientDisConnected(connection);
} catch (Exception e) {
Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}", each.getName(), e);
}
}
}
}, 0L, TimeUnit.MILLISECONDS);

View File

@ -381,7 +381,7 @@ public class InstanceController {
Subscriber subscriber =
udpPort > 0 ? new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,
udpPort) : null;
udpPort, clusters) : null;
return instanceService.listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly);
}

View File

@ -63,7 +63,8 @@ public class InstanceOperatorServiceImpl implements InstanceOperator {
ServiceInfo result = new ServiceInfo(client.getServiceName(), client.getClusters());
try {
Subscriber subscriber = new Subscriber(client.getAddrStr(), client.getAgent(), client.getApp(),
client.getIp(), client.getNamespaceId(), client.getServiceName(), client.getPort());
client.getIp(), client.getNamespaceId(), client.getServiceName(), client.getPort(),
client.getClusters());
result = listInstance(client.getNamespaceId(), client.getServiceName(), subscriber,
client.getClusters(), false);
} catch (Exception e) {
@ -101,7 +102,8 @@ public class InstanceOperatorServiceImpl implements InstanceOperator {
}
@Override
public void updateInstance(String namespaceId, String serviceName, String groupName, Instance instance) throws NacosException {
public void updateInstance(String namespaceId, String serviceName, String groupName, Instance instance)
throws NacosException {
com.alibaba.nacos.naming.core.Instance coreInstance = (com.alibaba.nacos.naming.core.Instance) instance;
String groupedServiceName = NamingUtils.getGroupedName(groupName, serviceName);
serviceManager.updateInstance(namespaceId, groupedServiceName, coreInstance);

View File

@ -59,10 +59,8 @@ public class ServiceManager {
public Service getSingleton(Service service) {
singletonRepository.putIfAbsent(service, service);
Service result = singletonRepository.get(service);
if (!namespaceSingletonMaps.containsKey(result.getNamespace())) {
namespaceSingletonMaps.putIfAbsent(result.getNamespace(), new ConcurrentHashSet<>());
namespaceSingletonMaps.get(result.getNamespace()).add(result);
}
namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
namespaceSingletonMaps.get(result.getNamespace()).add(result);
return result;
}

View File

@ -24,6 +24,7 @@ import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Set;
@ -35,6 +36,7 @@ import java.util.stream.Stream;
*
* @author xiweng.yy
*/
@Component
public class EmptyServiceAutoCleanerV2 extends AbstractNamingCleaner {
private static final String EMPTY_SERVICE = "emptyService";
@ -67,10 +69,10 @@ public class EmptyServiceAutoCleanerV2 extends AbstractNamingCleaner {
}
private void cleanEmptyService(Service service) {
Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", service.getNamespace(),
service.getGroupedServiceName());
Collection<String> registeredService = clientServiceIndexesManager.getAllClientsRegisteredService(service);
if (registeredService.isEmpty() && isTimeExpired(service)) {
Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", service.getNamespace(),
service.getGroupedServiceName());
clientServiceIndexesManager.removePublisherIndexesByEmptyService(service);
ServiceManager.getInstance().removeSingleton(service);
NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, true));

View File

@ -66,9 +66,13 @@ public class ExpiredMetadataCleaner extends AbstractNamingCleaner {
private void removeExpiredMetadata(ExpiredMetadataInfo expiredInfo) {
Loggers.SRV_LOG.info("Remove expired metadata {}", expiredInfo);
if (null == expiredInfo.getInstanceId()) {
metadataOperateService.deleteServiceMetadata(expiredInfo.getService());
if (metadataManager.containInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId())) {
metadataOperateService.deleteServiceMetadata(expiredInfo.getService());
}
} else {
metadataOperateService.deleteInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId());
if (metadataManager.containServiceMetadata(expiredInfo.getService())) {
metadataOperateService.deleteInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId());
}
}
}
}

View File

@ -60,10 +60,11 @@ public class ClientServiceIndexesManager extends SmartSubscriber {
/**
* Clear the service index without instances.
*
* @param service The service of the Nacos.
*/
public void removePublisherIndexesByEmptyService(Service service) {
if (publisherIndexes.get(service).isEmpty()) {
if (publisherIndexes.containsKey(service) && publisherIndexes.get(service).isEmpty()) {
publisherIndexes.remove(service);
}
}
@ -123,9 +124,6 @@ public class ClientServiceIndexesManager extends SmartSubscriber {
return;
}
publisherIndexes.get(service).remove(clientId);
if (publisherIndexes.get(service).isEmpty()) {
publisherIndexes.remove(service);
}
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}

View File

@ -122,20 +122,20 @@ public class ServiceStorage {
return Optional.ofNullable(client.getInstancePublishInfo(service));
}
private Instance parseInstance(Service service, InstancePublishInfo instancePublishInfo) {
private Instance parseInstance(Service service, InstancePublishInfo instanceInfo) {
Instance result = new Instance();
result.setIp(instancePublishInfo.getIp());
result.setPort(instancePublishInfo.getPort());
result.setIp(instanceInfo.getIp());
result.setPort(instanceInfo.getPort());
result.setServiceName(NamingUtils.getGroupedName(service.getName(), service.getGroup()));
Map<String, String> instanceMetadata = new HashMap<>(instancePublishInfo.getExtendDatum().size());
for (Map.Entry<String, Object> entry : instancePublishInfo.getExtendDatum().entrySet()) {
Map<String, String> instanceMetadata = new HashMap<>(instanceInfo.getExtendDatum().size());
for (Map.Entry<String, Object> entry : instanceInfo.getExtendDatum().entrySet()) {
if (CommonParams.CLUSTER_NAME.equals(entry.getKey())) {
result.setClusterName(entry.getValue().toString());
} else {
instanceMetadata.put(entry.getKey(), entry.getValue().toString());
}
}
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(service, instancePublishInfo.getIp());
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(service, instanceInfo.getInstanceId());
if (metadata.isPresent()) {
result.setEnabled(metadata.get().isEnabled());
result.setWeight(metadata.get().getWeight());
@ -145,7 +145,7 @@ public class ServiceStorage {
}
result.setMetadata(instanceMetadata);
result.setEphemeral(service.isEphemeral());
result.setHealthy(instancePublishInfo.isHealthy());
result.setHealthy(instanceInfo.isHealthy());
return result;
}
}

View File

@ -205,27 +205,27 @@ public class NamingMetadataManager extends SmartSubscriber {
private void handleClientDisconnectEvent(ClientEvent.ClientDisconnectEvent event) {
for (Service each : event.getClient().getAllPublishedService()) {
String instanceId = event.getClient().getInstancePublishInfo(each).getIp();
updateExpiredInfo(true, ExpiredMetadataInfo.newExpiredInstanceMetadata(each, instanceId));
String instanceId = event.getClient().getInstancePublishInfo(each).getInstanceId();
if (containInstanceMetadata(each, instanceId)) {
updateExpiredInfo(true, ExpiredMetadataInfo.newExpiredInstanceMetadata(each, instanceId));
}
}
}
private void handleServiceMetadataEvent(MetadataEvent.ServiceMetadataEvent event) {
Service service = event.getService();
if (!containServiceMetadata(service)) {
return;
if (containServiceMetadata(service)) {
updateExpiredInfo(event.isExpired(), ExpiredMetadataInfo.newExpiredServiceMetadata(service));
}
updateExpiredInfo(event.isExpired(), ExpiredMetadataInfo.newExpiredServiceMetadata(service));
}
private void handleInstanceMetadataEvent(MetadataEvent.InstanceMetadataEvent event) {
Service service = event.getService();
String instanceId = event.getInstanceId();
if (!containInstanceMetadata(service, instanceId)) {
return;
if (containInstanceMetadata(service, instanceId)) {
updateExpiredInfo(event.isExpired(),
ExpiredMetadataInfo.newExpiredInstanceMetadata(event.getService(), event.getInstanceId()));
}
updateExpiredInfo(event.isExpired(),
ExpiredMetadataInfo.newExpiredInstanceMetadata(event.getService(), event.getInstanceId()));
}
private void updateExpiredInfo(boolean expired, ExpiredMetadataInfo expiredMetadataInfo) {

View File

@ -16,6 +16,8 @@
package com.alibaba.nacos.naming.core.v2.pojo;
import com.alibaba.nacos.common.utils.IPUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@ -31,10 +33,10 @@ public class InstancePublishInfo {
private int port;
private Map<String, Object> extendDatum;
private boolean healthy;
private Map<String, Object> extendDatum;
public InstancePublishInfo() {
}
@ -76,6 +78,10 @@ public class InstancePublishInfo {
this.healthy = healthy;
}
public String getInstanceId() {
return ip + IPUtil.IP_PORT_SPLITER + port;
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -52,11 +52,11 @@ public class EphemeralClientOperationServiceImpl implements ClientOperationServi
public void registerInstance(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
Client client = clientManager.getClient(clientId);
InstancePublishInfo instancePublishInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instancePublishInfo);
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instancePublishInfo.getIp(), false));
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getInstanceId(), false));
}
@Override
@ -71,7 +71,7 @@ public class EphemeralClientOperationServiceImpl implements ClientOperationServi
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
if (null != removedInstance) {
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getIp(), true));
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getInstanceId(), true));
}
}

View File

@ -66,7 +66,7 @@ public class ExpiredInstanceChecker implements InstanceBeatChecker {
private Optional<Object> getTimeoutFromMetadata(Service service, InstancePublishInfo instance) {
Optional<InstanceMetadata> instanceMetadata = ApplicationUtils.getBean(NamingMetadataManager.class)
.getInstanceMetadata(service, instance.getIp());
.getInstanceMetadata(service, instance.getInstanceId());
return instanceMetadata.map(metadata -> metadata.getExtendData().get(PreservedMetadataKeys.IP_DELETE_TIMEOUT));
}

View File

@ -36,7 +36,7 @@ public class InstanceEnableBeatCheckInterceptor extends AbstractBeatCheckInterce
public boolean intercept(InstanceBeatCheckTask object) {
NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
HeartBeatInstancePublishInfo instance = object.getInstancePublishInfo();
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(object.getService(), instance.getIp());
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(object.getService(), instance.getInstanceId());
if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
return ConvertUtils.toBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());
}

View File

@ -66,7 +66,7 @@ public class UnhealthyInstanceChecker implements InstanceBeatChecker {
private Optional<Object> getTimeoutFromMetadata(Service service, InstancePublishInfo instance) {
Optional<InstanceMetadata> instanceMetadata = ApplicationUtils.getBean(NamingMetadataManager.class)
.getInstanceMetadata(service, instance.getIp());
.getInstanceMetadata(service, instance.getInstanceId());
return instanceMetadata.map(metadata -> metadata.getExtendData().get(PreservedMetadataKeys.HEART_BEAT_TIMEOUT));
}

View File

@ -16,6 +16,8 @@
package com.alibaba.nacos.naming.pojo;
import com.alibaba.nacos.common.utils.StringUtils;
import java.io.Serializable;
import java.util.Objects;
@ -26,6 +28,8 @@ import java.util.Objects;
*/
public class Subscriber implements Serializable {
private static final long serialVersionUID = -6256968317172033867L;
private String addrStr;
private String agent;
@ -40,7 +44,15 @@ public class Subscriber implements Serializable {
private String serviceName;
public Subscriber(String addrStr, String agent, String app, String ip, String namespaceId, String serviceName, int port) {
private String cluster;
public Subscriber(String addrStr, String agent, String app, String ip, String namespaceId, String serviceName,
int port) {
this(addrStr, agent, app, ip, namespaceId, serviceName, port, StringUtils.EMPTY);
}
public Subscriber(String addrStr, String agent, String app, String ip, String namespaceId, String serviceName,
int port, String clusters) {
this.addrStr = addrStr;
this.agent = agent;
this.app = app;
@ -48,6 +60,7 @@ public class Subscriber implements Serializable {
this.port = port;
this.namespaceId = namespaceId;
this.serviceName = serviceName;
this.cluster = clusters;
}
public String getAddrStr() {
@ -106,6 +119,14 @@ public class Subscriber implements Serializable {
this.port = port;
}
public String getCluster() {
return cluster;
}
public void setCluster(String cluster) {
this.cluster = cluster;
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.naming.push.v2.task;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.pojo.Subscriber;
@ -46,11 +47,24 @@ public class PushExecuteTask extends AbstractExecuteTask {
serviceInfo = ServiceUtil.selectInstances(serviceInfo, true, true);
for (String each : delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)) {
Subscriber subscriber = delayTaskEngine.getClientManager().getClient(each).getSubscriber(service);
delayTaskEngine.getPushExecuteService().doPush(each, subscriber, serviceInfo);
delayTaskEngine.getPushExecuteService().doPush(each, subscriber, handleClusterData(serviceInfo, subscriber));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
/**
* For adapt push cluster feature. Will be remove after 2.1.x.
*
* @param data original data
* @param subscriber subscriber information
* @return cluster filtered data
*/
@Deprecated
private ServiceInfo handleClusterData(ServiceInfo data, Subscriber subscriber) {
return StringUtils.isBlank(subscriber.getCluster()) ? data
: ServiceUtil.selectInstances(data, subscriber.getCluster());
}
}

View File

@ -23,11 +23,13 @@ import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.utils.ServiceUtil;
import org.springframework.stereotype.Component;
/**
@ -56,9 +58,9 @@ public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServ
String groupName = request.getGroupName();
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
Service service = Service.newService(namespaceId, groupName, serviceName, true);
ServiceInfo serviceInfo = serviceStorage.getData(service);
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), "unknown",
meta.getClientIp(), namespaceId, groupedServiceName, 0);
meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());
ServiceInfo serviceInfo = handleClusterData(serviceStorage.getData(service), subscriber);
if (request.isSubscribe()) {
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
} else {
@ -67,4 +69,16 @@ public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServ
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
/**
* For adapt push cluster feature. Will be remove after 2.1.x.
*
* @param data original data
* @param subscriber subscriber information
* @return cluster filtered data
*/
@Deprecated
private ServiceInfo handleClusterData(ServiceInfo data, Subscriber subscriber) {
return StringUtils.isBlank(subscriber.getCluster()) ? data
: ServiceUtil.selectInstances(data, subscriber.getCluster());
}
}