Do some refactor for naming notify event. (#12167)

This commit is contained in:
杨翊 SionYang 2024-06-03 15:55:46 +08:00 committed by GitHub
parent 7cdf224157
commit 07d92ffb3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 154 additions and 112 deletions

View File

@ -553,6 +553,9 @@ public class NacosNamingService implements NamingService {
private void notifyIfSubscribed(String serviceName, String groupName, NamingSelectorWrapper wrapper) {
if (changeNotifier.isSubscribed(groupName, serviceName)) {
NAMING_LOGGER.warn(
"Duplicate subscribe for groupName: {}, serviceName: {}; directly use current cached to notify.",
groupName, serviceName);
ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, Constants.NULL);
InstancesChangeEvent event = transferToEvent(serviceInfo);
wrapper.notifyListener(event);

View File

@ -18,25 +18,25 @@ package com.alibaba.nacos.client.naming.backups;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.cache.InstancesDiffer;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
import com.alibaba.nacos.client.naming.event.InstancesDiff;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.spi.NacosServiceLoader;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -59,15 +59,16 @@ public class FailoverReactor implements Closeable {
private final ScheduledExecutorService executorService;
private final InstancesDiffer instancesDiffer;
private FailoverDataSource failoverDataSource;
private String notifierEventScope;
private Map<String, Meter> meterMap = new HashMap<>(10);
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String notifierEventScope) {
this.serviceInfoHolder = serviceInfoHolder;
this.notifierEventScope = notifierEventScope;
this.instancesDiffer = new InstancesDiffer();
Collection<FailoverDataSource> dataSources = NacosServiceLoader.load(FailoverDataSource.class);
for (FailoverDataSource dataSource : dataSources) {
failoverDataSource = dataSource;
@ -106,11 +107,12 @@ public class FailoverReactor implements Closeable {
for (Map.Entry<String, FailoverData> entry : failoverData.entrySet()) {
ServiceInfo newService = (ServiceInfo) entry.getValue().getData();
ServiceInfo oldService = serviceMap.get(entry.getKey());
if (serviceInfoHolder.isChangedServiceInfo(oldService, newService)) {
InstancesDiff diff = instancesDiffer.doDiff(oldService, newService);
if (diff.hasDifferent()) {
NAMING_LOGGER.info("[NA] failoverdata isChangedServiceInfo. newService:{}",
JacksonUtils.toJson(newService));
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, newService.getName(),
newService.getGroupName(), newService.getClusters(), newService.getHosts()));
newService.getGroupName(), newService.getClusters(), newService.getHosts(), diff));
}
failoverMap.put(entry.getKey(), (ServiceInfo) entry.getValue().getData());
}
@ -130,12 +132,12 @@ public class FailoverReactor implements Closeable {
ServiceInfo oldService = entry.getValue();
ServiceInfo newService = serviceInfoMap.get(entry.getKey());
if (newService != null) {
boolean changed = serviceInfoHolder.isChangedServiceInfo(oldService, newService);
if (changed) {
InstancesDiff diff = instancesDiffer.doDiff(oldService, newService);
if (diff.hasDifferent()) {
NotifyCenter.publishEvent(
new InstancesChangeEvent(notifierEventScope, newService.getName(),
newService.getGroupName(), newService.getClusters(),
newService.getHosts()));
newService.getHosts(), diff));
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.cache;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.event.InstancesDiff;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
/**
* The instance list differ for nacos naming.
*
* @author xiweng.yy
*/
public final class InstancesDiffer {
/**
* Do instance different for input service info.
*
* @param oldService old service info
* @param newService new service info
* @return {@link InstancesDiff} of the differences between old and new service info.
*/
public InstancesDiff doDiff(ServiceInfo oldService, ServiceInfo newService) {
InstancesDiff instancesDiff = new InstancesDiff();
if (null == oldService) {
NAMING_LOGGER.info("init new ips({}) service: {} -> {}", newService.ipCount(), newService.getKey(),
JacksonUtils.toJson(newService.getHosts()));
instancesDiff.setAddedInstances(newService.getHosts());
return instancesDiff;
}
if (oldService.getLastRefTime() > newService.getLastRefTime()) {
NAMING_LOGGER.warn("out of date data received, old-t: {}, new-t: {}", oldService.getLastRefTime(),
newService.getLastRefTime());
return instancesDiff;
}
Map<String, Instance> oldHostMap = new HashMap<>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {
oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap<>(newService.getHosts().size());
for (Instance host : newService.getHosts()) {
newHostMap.put(host.toInetAddr(), host);
}
Set<Instance> modHosts = new HashSet<>();
Set<Instance> newHosts = new HashSet<>();
Set<Instance> remvHosts = new HashSet<>();
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<>(newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) {
modHosts.add(host);
continue;
}
if (!oldHostMap.containsKey(key)) {
newHosts.add(host);
}
}
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {
continue;
}
//add to remove hosts
remvHosts.add(host);
}
if (newHosts.size() > 0) {
NAMING_LOGGER.info("new ips({}) service: {} -> {}", newHosts.size(), newService.getKey(),
JacksonUtils.toJson(newHosts));
instancesDiff.setAddedInstances(newHosts);
}
if (remvHosts.size() > 0) {
NAMING_LOGGER.info("removed ips({}) service: {} -> {}", remvHosts.size(), newService.getKey(),
JacksonUtils.toJson(remvHosts));
instancesDiff.setRemovedInstances(remvHosts);
}
if (modHosts.size() > 0) {
NAMING_LOGGER.info("modified ips({}) service: {} -> {}", modHosts.size(), newService.getKey(),
JacksonUtils.toJson(modHosts));
instancesDiff.setModifiedInstances(modHosts);
}
return instancesDiff;
}
}

View File

@ -18,27 +18,21 @@ package com.alibaba.nacos.client.naming.cache;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.backups.FailoverReactor;
import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
import com.alibaba.nacos.client.naming.utils.CacheDirUtil;
import com.alibaba.nacos.client.naming.event.InstancesDiff;
import com.alibaba.nacos.client.naming.utils.CacheDirUtil;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -57,12 +51,15 @@ public class ServiceInfoHolder implements Closeable {
private final boolean pushEmptyProtection;
private final InstancesDiffer instancesDiffer;
private String cacheDir;
private String notifierEventScope;
public ServiceInfoHolder(String namespace, String notifierEventScope, NacosClientProperties properties) {
cacheDir = CacheDirUtil.initCacheDir(namespace, properties);
instancesDiffer = new InstancesDiffer();
if (isLoadCacheAtStart(properties)) {
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
} else {
@ -159,87 +156,8 @@ public class ServiceInfoHolder implements Closeable {
return null == serviceInfo.getHosts() || (pushEmptyProtection && !serviceInfo.validate());
}
/**
* isChangedServiceInfo.
*
* @param oldService old service data
* @param newService new service data
* @return {@code true} if oldService is not equal newService, {@code false} otherwise.
*/
public boolean isChangedServiceInfo(ServiceInfo oldService, ServiceInfo newService) {
return getServiceInfoDiff(oldService, newService).hasDifferent();
}
private InstancesDiff getServiceInfoDiff(ServiceInfo oldService, ServiceInfo newService) {
InstancesDiff instancesDiff = new InstancesDiff();
if (null == oldService) {
NAMING_LOGGER.info("init new ips({}) service: {} -> {}", newService.ipCount(), newService.getKey(),
JacksonUtils.toJson(newService.getHosts()));
instancesDiff.setAddedInstances(newService.getHosts());
return instancesDiff;
}
if (oldService.getLastRefTime() > newService.getLastRefTime()) {
NAMING_LOGGER.warn("out of date data received, old-t: {}, new-t: {}", oldService.getLastRefTime(),
newService.getLastRefTime());
return instancesDiff;
}
Map<String, Instance> oldHostMap = new HashMap<>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {
oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap<>(newService.getHosts().size());
for (Instance host : newService.getHosts()) {
newHostMap.put(host.toInetAddr(), host);
}
Set<Instance> modHosts = new HashSet<>();
Set<Instance> newHosts = new HashSet<>();
Set<Instance> remvHosts = new HashSet<>();
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<>(newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) {
modHosts.add(host);
continue;
}
if (!oldHostMap.containsKey(key)) {
newHosts.add(host);
}
}
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {
continue;
}
//add to remove hosts
remvHosts.add(host);
}
if (newHosts.size() > 0) {
NAMING_LOGGER.info("new ips({}) service: {} -> {}", newHosts.size(), newService.getKey(),
JacksonUtils.toJson(newHosts));
instancesDiff.setAddedInstances(newHosts);
}
if (remvHosts.size() > 0) {
NAMING_LOGGER.info("removed ips({}) service: {} -> {}", remvHosts.size(), newService.getKey(),
JacksonUtils.toJson(remvHosts));
instancesDiff.setRemovedInstances(remvHosts);
}
if (modHosts.size() > 0) {
NAMING_LOGGER.info("modified ips({}) service: {} -> {}", modHosts.size(), newService.getKey(),
JacksonUtils.toJson(modHosts));
instancesDiff.setModifiedInstances(modHosts);
}
return instancesDiff;
return instancesDiffer.doDiff(oldService, newService);
}
public String getCacheDir() {

View File

@ -43,10 +43,6 @@ public class InstancesChangeEvent extends Event {
private InstancesDiff instancesDiff;
public InstancesChangeEvent(String eventScope, String serviceName, String groupName, String clusters, List<Instance> hosts) {
this(eventScope, serviceName, groupName, clusters, hosts, null);
}
public InstancesChangeEvent(String eventScope, String serviceName, String groupName, String clusters, List<Instance> hosts, InstancesDiff diff) {
this.eventScope = eventScope;
this.serviceName = serviceName;

View File

@ -23,6 +23,8 @@ import com.alibaba.nacos.client.selector.ListenerInvoker;
import java.util.Objects;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
/**
* Naming listener invoker.
*
@ -38,6 +40,7 @@ public class NamingListenerInvoker implements ListenerInvoker<NamingEvent> {
@Override
public void invoke(NamingEvent event) {
logInvoke(event);
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(event));
} else {
@ -45,6 +48,11 @@ public class NamingListenerInvoker implements ListenerInvoker<NamingEvent> {
}
}
private void logInvoke(NamingEvent event) {
NAMING_LOGGER.info("Invoke event groupName: {}, serviceName: {} to Listener: {}", event.getGroupName(),
event.getServiceName(), listener.toString());
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.client.naming.backups;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.common.utils.ReflectUtils;
@ -30,7 +31,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@ -40,7 +40,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@ -87,9 +86,9 @@ class FailoverReactorTest {
when(failoverDataSource.getSwitch()).thenReturn(mockFailoverSwitch);
Map<String, FailoverData> map = new HashMap<>();
ServiceInfo serviceInfo = new ServiceInfo("a@@b");
serviceInfo.addHost(new Instance());
map.put("a@@b", NamingFailoverData.newNamingFailoverData(serviceInfo));
when(failoverDataSource.getFailoverData()).thenReturn(map);
when(holder.isChangedServiceInfo(any(), any())).thenReturn(true);
// waiting refresh thread work
TimeUnit.MILLISECONDS.sleep(5500);
ServiceInfo actual = failoverReactor.getService("a@@b");
@ -119,12 +118,12 @@ class FailoverReactorTest {
failoverSwitchEnableField.set(failoverReactor, true);
Map<String, ServiceInfo> map = new HashMap<>();
ServiceInfo serviceInfo = new ServiceInfo("a@@b");
serviceInfo.addHost(new Instance());
map.put("a@@b", serviceInfo);
when(holder.getServiceInfoMap()).thenReturn(map);
Field serviceMapField = FailoverReactor.class.getDeclaredField("serviceMap");
serviceMapField.setAccessible(true);
serviceMapField.set(failoverReactor, map);
when(holder.isChangedServiceInfo(any(), any())).thenReturn(true);
// waiting refresh thread work
TimeUnit.MILLISECONDS.sleep(5500);
ServiceInfo actual = failoverReactor.getService("a@@b");
@ -143,9 +142,6 @@ class FailoverReactorTest {
@Test
void testFailoverServiceCntMetricsClear()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {
Field field = FailoverReactor.class.getDeclaredField("meterMap");
field.setAccessible(true);
field.set(failoverReactor, Collections.singletonMap("a", null));
Method method = FailoverReactor.class.getDeclaredMethod("failoverServiceCntMetricsClear");
method.setAccessible(true);
method.invoke(failoverReactor);