From 2728df72f2133634ae285a69a46597d55a88369e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E7=BF=8A=20SionYang?= <263976490@qq.com> Date: Tue, 1 Dec 2020 21:31:18 +0800 Subject: [PATCH] Fix some bugs for naming module (#4381) * use instance id get instance metadata * Fix Disconnection can't notify to naming module. * Support subscribe specified cluster instance for 1.x * Fix 2.0 client subscribe service can't be notify problem * Fix only can search the first service problem * Fix auto clean empty service can't work problem --- .../client/naming/NacosNamingService.java | 38 +++++++++++-------- .../naming/cache/ServiceInfoHolder.java | 6 --- .../naming/core/ServiceInfoUpdateService.java | 2 +- .../naming/event/InstancesChangeNotifier.java | 25 +++++++----- ...ClientConnectionEventListenerRegistry.java | 8 +++- .../controllers/InstanceController.java | 2 +- .../core/InstanceOperatorServiceImpl.java | 6 ++- .../nacos/naming/core/v2/ServiceManager.java | 6 +-- .../v2/cleaner/EmptyServiceAutoCleanerV2.java | 6 ++- .../v2/cleaner/ExpiredMetadataCleaner.java | 8 +++- .../v2/index/ClientServiceIndexesManager.java | 6 +-- .../naming/core/v2/index/ServiceStorage.java | 14 +++---- .../v2/metadata/NamingMetadataManager.java | 18 ++++----- .../core/v2/pojo/InstancePublishInfo.java | 10 ++++- .../EphemeralClientOperationServiceImpl.java | 8 ++-- .../heartbeat/ExpiredInstanceChecker.java | 2 +- .../InstanceEnableBeatCheckInterceptor.java | 2 +- .../heartbeat/UnhealthyInstanceChecker.java | 2 +- .../alibaba/nacos/naming/pojo/Subscriber.java | 23 ++++++++++- .../naming/push/v2/task/PushExecuteTask.java | 16 +++++++- .../SubscribeServiceRequestHandler.java | 18 ++++++++- 21 files changed, 147 insertions(+), 79 deletions(-) diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java index 73d529ac0..dff28674f 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java @@ -24,7 +24,6 @@ import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ListView; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; -import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.api.selector.AbstractSelector; import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder; import com.alibaba.nacos.client.naming.core.Balancer; @@ -214,13 +213,15 @@ public class NacosNamingService implements NamingService { @Override public List getAllInstances(String serviceName, String groupName, List clusters, boolean subscribe) throws NacosException { - ServiceInfo serviceInfo; + String clusterString = StringUtils.join(clusters, ","); if (subscribe) { - serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ",")); + serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString); + if (null == serviceInfo) { + serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString); + } } else { - serviceInfo = clientProxy - .queryInstancesOfService(serviceName, groupName, StringUtils.join(clusters, ","), 0, false); + serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false); } List list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { @@ -274,11 +275,14 @@ public class NacosNamingService implements NamingService { boolean subscribe) throws NacosException { ServiceInfo serviceInfo; + String clusterString = StringUtils.join(clusters, ","); if (subscribe) { - serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ",")); + serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString); + if (null == serviceInfo) { + serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString); + } } else { - serviceInfo = clientProxy - .queryInstancesOfService(serviceName, groupName, StringUtils.join(clusters, ","), 0, false); + serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false); } return selectInstances(serviceInfo, healthy); } @@ -341,13 +345,16 @@ public class NacosNamingService implements NamingService { @Override public Instance selectOneHealthyInstance(String serviceName, String groupName, List clusters, boolean subscribe) throws NacosException { - + String clusterString = StringUtils.join(clusters, ","); if (subscribe) { - return Balancer.RandomByWeight.selectHost( - serviceInfoHolder.getServiceInfo(serviceName, groupName, StringUtils.join(clusters, ","))); + ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString); + if (null == serviceInfo) { + serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString); + } + return Balancer.RandomByWeight.selectHost(serviceInfo); } else { ServiceInfo serviceInfo = clientProxy - .queryInstancesOfService(serviceName, groupName, StringUtils.join(clusters, ","), 0, false); + .queryInstancesOfService(serviceName, groupName, clusterString, 0, false); return Balancer.RandomByWeight.selectHost(serviceInfo); } } @@ -374,7 +381,7 @@ public class NacosNamingService implements NamingService { return; } String clusterString = StringUtils.join(clusters, ","); - changeNotifier.registerListener(NamingUtils.getGroupedName(serviceName, groupName), clusterString, listener); + changeNotifier.registerListener(groupName, serviceName, clusterString, listener); clientProxy.subscribe(serviceName, groupName, clusterString); } @@ -396,10 +403,9 @@ public class NacosNamingService implements NamingService { @Override public void unsubscribe(String serviceName, String groupName, List clusters, EventListener listener) throws NacosException { - String fullServiceName = NamingUtils.getGroupedName(serviceName, groupName); String clustersString = StringUtils.join(clusters, ","); - changeNotifier.deregisterListener(fullServiceName, clustersString, listener); - if (!changeNotifier.isSubscribed(fullServiceName, clustersString)) { + changeNotifier.deregisterListener(groupName, serviceName, clustersString, listener); + if (!changeNotifier.isSubscribed(groupName, serviceName, clustersString)) { clientProxy.unsubscribe(serviceName, groupName, clustersString); } } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java b/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java index 07282a445..673c86294 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java @@ -98,12 +98,6 @@ public class ServiceInfoHolder implements Closeable { if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } - ServiceInfo result = getServiceInfo0(groupedServiceName, clusters); - return null == result ? new ServiceInfo(groupedServiceName, clusters) : result; - } - - private ServiceInfo getServiceInfo0(String groupedServiceName, String clusters) { - String key = ServiceInfo.getKey(groupedServiceName, clusters); return serviceInfoMap.get(key); } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java b/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java index d520a6977..0ba19e3c1 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java @@ -156,7 +156,7 @@ public class ServiceInfoUpdateService implements Closeable { long delayTime = -1; try { - if (!changeNotifier.isSubscribed(groupedServiceName, clusters) && !futureMap.containsKey(serviceKey)) { + if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) { NAMING_LOGGER .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters); return; diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java index 4ab6f853d..73fcabf15 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java @@ -20,6 +20,7 @@ import com.alibaba.nacos.api.naming.listener.AbstractEventListener; import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.common.utils.CollectionUtils; @@ -45,12 +46,13 @@ public class InstancesChangeNotifier extends Subscriber { /** * register listener. * - * @param serviceName combineServiceName, such as 'xxx@@xxx' + * @param groupName group name + * @param serviceName serviceName * @param clusters clusters, concat by ','. such as 'xxx,yyy' * @param listener custom listener */ - public void registerListener(String serviceName, String clusters, EventListener listener) { - String key = ServiceInfo.getKey(serviceName, clusters); + public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) { + String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); ConcurrentHashSet eventListeners = listenerMap.get(key); if (eventListeners == null) { synchronized (lock) { @@ -67,12 +69,13 @@ public class InstancesChangeNotifier extends Subscriber { /** * deregister listener. * - * @param serviceName combineServiceName, such as 'xxx@@xxx' + * @param groupName group name + * @param serviceName serviceName * @param clusters clusters, concat by ','. such as 'xxx,yyy' * @param listener custom listener */ - public void deregisterListener(String serviceName, String clusters, EventListener listener) { - String key = ServiceInfo.getKey(serviceName, clusters); + public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) { + String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); ConcurrentHashSet eventListeners = listenerMap.get(key); if (eventListeners == null) { return; @@ -86,12 +89,13 @@ public class InstancesChangeNotifier extends Subscriber { /** * check serviceName,clusters is subscribed. * - * @param serviceName combineServiceName, such as 'xxx@@xxx' + * @param groupName group name + * @param serviceName serviceName * @param clusters clusters, concat by ','. such as 'xxx,yyy' * @return is serviceName,clusters subscribed */ - public boolean isSubscribed(String serviceName, String clusters) { - String key = ServiceInfo.getKey(serviceName, clusters); + public boolean isSubscribed(String groupName, String serviceName, String clusters) { + String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); ConcurrentHashSet eventListeners = listenerMap.get(key); return CollectionUtils.isNotEmpty(eventListeners); } @@ -106,7 +110,8 @@ public class InstancesChangeNotifier extends Subscriber { @Override public void onEvent(InstancesChangeEvent event) { - String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters()); + String key = ServiceInfo + .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters()); ConcurrentHashSet eventListeners = listenerMap.get(key); if (CollectionUtils.isEmpty(eventListeners)) { return; diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ClientConnectionEventListenerRegistry.java b/core/src/main/java/com/alibaba/nacos/core/remote/ClientConnectionEventListenerRegistry.java index 4ae7a8006..7976b83d4 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ClientConnectionEventListenerRegistry.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ClientConnectionEventListenerRegistry.java @@ -72,8 +72,12 @@ public class ClientConnectionEventListenerRegistry { executorService.schedule(new Runnable() { @Override public void run() { - for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) { - clientConnectionEventListener.clientDisConnected(connection); + for (ClientConnectionEventListener each : clientConnectionEventListeners) { + try { + each.clientDisConnected(connection); + } catch (Exception e) { + Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}", each.getName(), e); + } } } }, 0L, TimeUnit.MILLISECONDS); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java index fc0103034..555c1ff19 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java @@ -381,7 +381,7 @@ public class InstanceController { Subscriber subscriber = udpPort > 0 ? new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName, - udpPort) : null; + udpPort, clusters) : null; return instanceService.listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorServiceImpl.java index 1d5287252..36fa19ff5 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceOperatorServiceImpl.java @@ -63,7 +63,8 @@ public class InstanceOperatorServiceImpl implements InstanceOperator { ServiceInfo result = new ServiceInfo(client.getServiceName(), client.getClusters()); try { Subscriber subscriber = new Subscriber(client.getAddrStr(), client.getAgent(), client.getApp(), - client.getIp(), client.getNamespaceId(), client.getServiceName(), client.getPort()); + client.getIp(), client.getNamespaceId(), client.getServiceName(), client.getPort(), + client.getClusters()); result = listInstance(client.getNamespaceId(), client.getServiceName(), subscriber, client.getClusters(), false); } catch (Exception e) { @@ -101,7 +102,8 @@ public class InstanceOperatorServiceImpl implements InstanceOperator { } @Override - public void updateInstance(String namespaceId, String serviceName, String groupName, Instance instance) throws NacosException { + public void updateInstance(String namespaceId, String serviceName, String groupName, Instance instance) + throws NacosException { com.alibaba.nacos.naming.core.Instance coreInstance = (com.alibaba.nacos.naming.core.Instance) instance; String groupedServiceName = NamingUtils.getGroupedName(groupName, serviceName); serviceManager.updateInstance(namespaceId, groupedServiceName, coreInstance); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/ServiceManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/ServiceManager.java index 11d1d2e89..fdbafe501 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/ServiceManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/ServiceManager.java @@ -59,10 +59,8 @@ public class ServiceManager { public Service getSingleton(Service service) { singletonRepository.putIfAbsent(service, service); Service result = singletonRepository.get(service); - if (!namespaceSingletonMaps.containsKey(result.getNamespace())) { - namespaceSingletonMaps.putIfAbsent(result.getNamespace(), new ConcurrentHashSet<>()); - namespaceSingletonMaps.get(result.getNamespace()).add(result); - } + namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>()); + namespaceSingletonMaps.get(result.getNamespace()).add(result); return result; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/EmptyServiceAutoCleanerV2.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/EmptyServiceAutoCleanerV2.java index 2fb61158a..869173f71 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/EmptyServiceAutoCleanerV2.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/EmptyServiceAutoCleanerV2.java @@ -24,6 +24,7 @@ import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.misc.GlobalConfig; import com.alibaba.nacos.naming.misc.GlobalExecutor; import com.alibaba.nacos.naming.misc.Loggers; +import org.springframework.stereotype.Component; import java.util.Collection; import java.util.Set; @@ -35,6 +36,7 @@ import java.util.stream.Stream; * * @author xiweng.yy */ +@Component public class EmptyServiceAutoCleanerV2 extends AbstractNamingCleaner { private static final String EMPTY_SERVICE = "emptyService"; @@ -67,10 +69,10 @@ public class EmptyServiceAutoCleanerV2 extends AbstractNamingCleaner { } private void cleanEmptyService(Service service) { - Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", service.getNamespace(), - service.getGroupedServiceName()); Collection registeredService = clientServiceIndexesManager.getAllClientsRegisteredService(service); if (registeredService.isEmpty() && isTimeExpired(service)) { + Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", service.getNamespace(), + service.getGroupedServiceName()); clientServiceIndexesManager.removePublisherIndexesByEmptyService(service); ServiceManager.getInstance().removeSingleton(service); NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, true)); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/ExpiredMetadataCleaner.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/ExpiredMetadataCleaner.java index 36c3514cb..0c9d88b55 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/ExpiredMetadataCleaner.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/ExpiredMetadataCleaner.java @@ -66,9 +66,13 @@ public class ExpiredMetadataCleaner extends AbstractNamingCleaner { private void removeExpiredMetadata(ExpiredMetadataInfo expiredInfo) { Loggers.SRV_LOG.info("Remove expired metadata {}", expiredInfo); if (null == expiredInfo.getInstanceId()) { - metadataOperateService.deleteServiceMetadata(expiredInfo.getService()); + if (metadataManager.containInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId())) { + metadataOperateService.deleteServiceMetadata(expiredInfo.getService()); + } } else { - metadataOperateService.deleteInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId()); + if (metadataManager.containServiceMetadata(expiredInfo.getService())) { + metadataOperateService.deleteInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId()); + } } } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ClientServiceIndexesManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ClientServiceIndexesManager.java index 45993263c..51e51d7b9 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ClientServiceIndexesManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ClientServiceIndexesManager.java @@ -60,10 +60,11 @@ public class ClientServiceIndexesManager extends SmartSubscriber { /** * Clear the service index without instances. + * * @param service The service of the Nacos. */ public void removePublisherIndexesByEmptyService(Service service) { - if (publisherIndexes.get(service).isEmpty()) { + if (publisherIndexes.containsKey(service) && publisherIndexes.get(service).isEmpty()) { publisherIndexes.remove(service); } } @@ -123,9 +124,6 @@ public class ClientServiceIndexesManager extends SmartSubscriber { return; } publisherIndexes.get(service).remove(clientId); - if (publisherIndexes.get(service).isEmpty()) { - publisherIndexes.remove(service); - } NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ServiceStorage.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ServiceStorage.java index 41e2a8dad..482605c1d 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ServiceStorage.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ServiceStorage.java @@ -122,20 +122,20 @@ public class ServiceStorage { return Optional.ofNullable(client.getInstancePublishInfo(service)); } - private Instance parseInstance(Service service, InstancePublishInfo instancePublishInfo) { + private Instance parseInstance(Service service, InstancePublishInfo instanceInfo) { Instance result = new Instance(); - result.setIp(instancePublishInfo.getIp()); - result.setPort(instancePublishInfo.getPort()); + result.setIp(instanceInfo.getIp()); + result.setPort(instanceInfo.getPort()); result.setServiceName(NamingUtils.getGroupedName(service.getName(), service.getGroup())); - Map instanceMetadata = new HashMap<>(instancePublishInfo.getExtendDatum().size()); - for (Map.Entry entry : instancePublishInfo.getExtendDatum().entrySet()) { + Map instanceMetadata = new HashMap<>(instanceInfo.getExtendDatum().size()); + for (Map.Entry entry : instanceInfo.getExtendDatum().entrySet()) { if (CommonParams.CLUSTER_NAME.equals(entry.getKey())) { result.setClusterName(entry.getValue().toString()); } else { instanceMetadata.put(entry.getKey(), entry.getValue().toString()); } } - Optional metadata = metadataManager.getInstanceMetadata(service, instancePublishInfo.getIp()); + Optional metadata = metadataManager.getInstanceMetadata(service, instanceInfo.getInstanceId()); if (metadata.isPresent()) { result.setEnabled(metadata.get().isEnabled()); result.setWeight(metadata.get().getWeight()); @@ -145,7 +145,7 @@ public class ServiceStorage { } result.setMetadata(instanceMetadata); result.setEphemeral(service.isEphemeral()); - result.setHealthy(instancePublishInfo.isHealthy()); + result.setHealthy(instanceInfo.isHealthy()); return result; } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataManager.java index 8a01c4e43..5f6e64f0e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataManager.java @@ -205,27 +205,27 @@ public class NamingMetadataManager extends SmartSubscriber { private void handleClientDisconnectEvent(ClientEvent.ClientDisconnectEvent event) { for (Service each : event.getClient().getAllPublishedService()) { - String instanceId = event.getClient().getInstancePublishInfo(each).getIp(); - updateExpiredInfo(true, ExpiredMetadataInfo.newExpiredInstanceMetadata(each, instanceId)); + String instanceId = event.getClient().getInstancePublishInfo(each).getInstanceId(); + if (containInstanceMetadata(each, instanceId)) { + updateExpiredInfo(true, ExpiredMetadataInfo.newExpiredInstanceMetadata(each, instanceId)); + } } } private void handleServiceMetadataEvent(MetadataEvent.ServiceMetadataEvent event) { Service service = event.getService(); - if (!containServiceMetadata(service)) { - return; + if (containServiceMetadata(service)) { + updateExpiredInfo(event.isExpired(), ExpiredMetadataInfo.newExpiredServiceMetadata(service)); } - updateExpiredInfo(event.isExpired(), ExpiredMetadataInfo.newExpiredServiceMetadata(service)); } private void handleInstanceMetadataEvent(MetadataEvent.InstanceMetadataEvent event) { Service service = event.getService(); String instanceId = event.getInstanceId(); - if (!containInstanceMetadata(service, instanceId)) { - return; + if (containInstanceMetadata(service, instanceId)) { + updateExpiredInfo(event.isExpired(), + ExpiredMetadataInfo.newExpiredInstanceMetadata(event.getService(), event.getInstanceId())); } - updateExpiredInfo(event.isExpired(), - ExpiredMetadataInfo.newExpiredInstanceMetadata(event.getService(), event.getInstanceId())); } private void updateExpiredInfo(boolean expired, ExpiredMetadataInfo expiredMetadataInfo) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/pojo/InstancePublishInfo.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/pojo/InstancePublishInfo.java index 7932d2faa..27bfe0ef0 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/pojo/InstancePublishInfo.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/pojo/InstancePublishInfo.java @@ -16,6 +16,8 @@ package com.alibaba.nacos.naming.core.v2.pojo; +import com.alibaba.nacos.common.utils.IPUtil; + import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -31,10 +33,10 @@ public class InstancePublishInfo { private int port; - private Map extendDatum; - private boolean healthy; + private Map extendDatum; + public InstancePublishInfo() { } @@ -76,6 +78,10 @@ public class InstancePublishInfo { this.healthy = healthy; } + public String getInstanceId() { + return ip + IPUtil.IP_PORT_SPLITER + port; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/service/impl/EphemeralClientOperationServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/service/impl/EphemeralClientOperationServiceImpl.java index bd31bab00..8cb0937d4 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/service/impl/EphemeralClientOperationServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/service/impl/EphemeralClientOperationServiceImpl.java @@ -52,11 +52,11 @@ public class EphemeralClientOperationServiceImpl implements ClientOperationServi public void registerInstance(Service service, Instance instance, String clientId) { Service singleton = ServiceManager.getInstance().getSingleton(service); Client client = clientManager.getClient(clientId); - InstancePublishInfo instancePublishInfo = getPublishInfo(instance); - client.addServiceInstance(singleton, instancePublishInfo); + InstancePublishInfo instanceInfo = getPublishInfo(instance); + client.addServiceInstance(singleton, instanceInfo); client.setLastUpdatedTime(); NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId)); - NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instancePublishInfo.getIp(), false)); + NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getInstanceId(), false)); } @Override @@ -71,7 +71,7 @@ public class EphemeralClientOperationServiceImpl implements ClientOperationServi client.setLastUpdatedTime(); NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId)); if (null != removedInstance) { - NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getIp(), true)); + NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getInstanceId(), true)); } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ExpiredInstanceChecker.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ExpiredInstanceChecker.java index 327b18f3e..d18299ecc 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ExpiredInstanceChecker.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/ExpiredInstanceChecker.java @@ -66,7 +66,7 @@ public class ExpiredInstanceChecker implements InstanceBeatChecker { private Optional getTimeoutFromMetadata(Service service, InstancePublishInfo instance) { Optional instanceMetadata = ApplicationUtils.getBean(NamingMetadataManager.class) - .getInstanceMetadata(service, instance.getIp()); + .getInstanceMetadata(service, instance.getInstanceId()); return instanceMetadata.map(metadata -> metadata.getExtendData().get(PreservedMetadataKeys.IP_DELETE_TIMEOUT)); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/InstanceEnableBeatCheckInterceptor.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/InstanceEnableBeatCheckInterceptor.java index 73c1aebd6..ec350e203 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/InstanceEnableBeatCheckInterceptor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/InstanceEnableBeatCheckInterceptor.java @@ -36,7 +36,7 @@ public class InstanceEnableBeatCheckInterceptor extends AbstractBeatCheckInterce public boolean intercept(InstanceBeatCheckTask object) { NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class); HeartBeatInstancePublishInfo instance = object.getInstancePublishInfo(); - Optional metadata = metadataManager.getInstanceMetadata(object.getService(), instance.getIp()); + Optional metadata = metadataManager.getInstanceMetadata(object.getService(), instance.getInstanceId()); if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) { return ConvertUtils.toBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString()); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/UnhealthyInstanceChecker.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/UnhealthyInstanceChecker.java index eefb9aaa7..e84846270 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/UnhealthyInstanceChecker.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/heartbeat/UnhealthyInstanceChecker.java @@ -66,7 +66,7 @@ public class UnhealthyInstanceChecker implements InstanceBeatChecker { private Optional getTimeoutFromMetadata(Service service, InstancePublishInfo instance) { Optional instanceMetadata = ApplicationUtils.getBean(NamingMetadataManager.class) - .getInstanceMetadata(service, instance.getIp()); + .getInstanceMetadata(service, instance.getInstanceId()); return instanceMetadata.map(metadata -> metadata.getExtendData().get(PreservedMetadataKeys.HEART_BEAT_TIMEOUT)); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/pojo/Subscriber.java b/naming/src/main/java/com/alibaba/nacos/naming/pojo/Subscriber.java index db502d765..af21e0fcb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/pojo/Subscriber.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/pojo/Subscriber.java @@ -16,6 +16,8 @@ package com.alibaba.nacos.naming.pojo; +import com.alibaba.nacos.common.utils.StringUtils; + import java.io.Serializable; import java.util.Objects; @@ -26,6 +28,8 @@ import java.util.Objects; */ public class Subscriber implements Serializable { + private static final long serialVersionUID = -6256968317172033867L; + private String addrStr; private String agent; @@ -40,7 +44,15 @@ public class Subscriber implements Serializable { private String serviceName; - public Subscriber(String addrStr, String agent, String app, String ip, String namespaceId, String serviceName, int port) { + private String cluster; + + public Subscriber(String addrStr, String agent, String app, String ip, String namespaceId, String serviceName, + int port) { + this(addrStr, agent, app, ip, namespaceId, serviceName, port, StringUtils.EMPTY); + } + + public Subscriber(String addrStr, String agent, String app, String ip, String namespaceId, String serviceName, + int port, String clusters) { this.addrStr = addrStr; this.agent = agent; this.app = app; @@ -48,6 +60,7 @@ public class Subscriber implements Serializable { this.port = port; this.namespaceId = namespaceId; this.serviceName = serviceName; + this.cluster = clusters; } public String getAddrStr() { @@ -106,6 +119,14 @@ public class Subscriber implements Serializable { this.port = port; } + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/push/v2/task/PushExecuteTask.java b/naming/src/main/java/com/alibaba/nacos/naming/push/v2/task/PushExecuteTask.java index e148ffaf2..5007b467b 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/push/v2/task/PushExecuteTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/push/v2/task/PushExecuteTask.java @@ -18,6 +18,7 @@ package com.alibaba.nacos.naming.push.v2.task; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.pojo.Subscriber; @@ -46,11 +47,24 @@ public class PushExecuteTask extends AbstractExecuteTask { serviceInfo = ServiceUtil.selectInstances(serviceInfo, true, true); for (String each : delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)) { Subscriber subscriber = delayTaskEngine.getClientManager().getClient(each).getSubscriber(service); - delayTaskEngine.getPushExecuteService().doPush(each, subscriber, serviceInfo); + delayTaskEngine.getPushExecuteService().doPush(each, subscriber, handleClusterData(serviceInfo, subscriber)); } } catch (Exception e) { Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e); delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L)); } } + + /** + * For adapt push cluster feature. Will be remove after 2.1.x. + * + * @param data original data + * @param subscriber subscriber information + * @return cluster filtered data + */ + @Deprecated + private ServiceInfo handleClusterData(ServiceInfo data, Subscriber subscriber) { + return StringUtils.isBlank(subscriber.getCluster()) ? data + : ServiceUtil.selectInstances(data, subscriber.getCluster()); + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/SubscribeServiceRequestHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/SubscribeServiceRequestHandler.java index 98c9ebea6..8eaf31505 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/SubscribeServiceRequestHandler.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/SubscribeServiceRequestHandler.java @@ -23,11 +23,13 @@ import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse; import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.ResponseCode; +import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.core.remote.RequestHandler; import com.alibaba.nacos.naming.core.v2.index.ServiceStorage; import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl; import com.alibaba.nacos.naming.pojo.Subscriber; +import com.alibaba.nacos.naming.utils.ServiceUtil; import org.springframework.stereotype.Component; /** @@ -56,9 +58,9 @@ public class SubscribeServiceRequestHandler extends RequestHandler