[ISSUE-#3880] NamingService Client support pushEmptyProtection. (#4665)

* NamingService Client support pushEmptyProtection.

* check hosts is null. if null, also invalid.
This commit is contained in:
赵延 2021-01-12 09:48:27 +08:00 committed by GitHub
parent a4e3b7cbc7
commit e2f7796d3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 8 deletions

View File

@ -66,6 +66,8 @@ public class PropertyKeyConst {
public static final String NAMING_POLLING_THREAD_COUNT = "namingPollingThreadCount";
public static final String NAMING_REQUEST_DOMAIN_RETRY_COUNT = "namingRequestDomainMaxRetryCount";
public static final String NAMING_PUSH_EMPTY_PROTECTION = "namingPushEmptyProtection";
/**
* Get the key value of some variable value from the system property.

View File

@ -167,6 +167,10 @@ public class ServiceInfo {
return true;
}
if (hosts == null) {
return false;
}
List<Instance> validHosts = new ArrayList<Instance>();
for (Instance host : hosts) {
if (!host.isHealthy()) {
@ -177,8 +181,8 @@ public class ServiceInfo {
validHosts.add(host);
}
}
return true;
//No valid hosts, return false.
return !validHosts.isEmpty();
}
@JsonIgnore

View File

@ -93,7 +93,7 @@ public class NacosNamingService implements NamingService {
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties),
initPollingThreadCount(properties));
isPushEmptyProtect(properties), initPollingThreadCount(properties));
}
private int initClientBeatThreadCount(Properties properties) {
@ -126,6 +126,16 @@ public class NacosNamingService implements NamingService {
return loadCacheAtStart;
}
private boolean isPushEmptyProtect(Properties properties) {
boolean pushEmptyProtection = false;
if (properties != null && StringUtils
.isNotEmpty(properties.getProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION))) {
pushEmptyProtection = ConvertUtils
.toBoolean(properties.getProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION));
}
return pushEmptyProtection;
}
private void initServerAddr(Properties properties) {
serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
endpoint = InitUtils.initEndpoint(properties);

View File

@ -78,16 +78,18 @@ public class HostReactor implements Closeable {
private final String cacheDir;
private final boolean pushEmptyProtection;
private final ScheduledExecutorService executor;
private final InstancesChangeNotifier notifier;
public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) {
this(serverProxy, beatReactor, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}
public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart,
int pollingThreadCount) {
boolean pushEmptyProtection, int pollingThreadCount) {
// init executorService
this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
@ -107,12 +109,12 @@ public class HostReactor implements Closeable {
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.pushEmptyProtection = pushEmptyProtection;
this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this);
this.notifier = new InstancesChangeNotifier();
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(notifier);
}
@ -161,7 +163,8 @@ public class HostReactor implements Closeable {
public ServiceInfo processServiceJson(String json) {
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
if (pushEmptyProtection && !serviceInfo.validate()) {
//empty or error push, just ignore
return oldService;
}