Add switch for naming async query. (#9325)
This commit is contained in:
parent
ee3ae0034f
commit
0bfd75252b
@ -75,6 +75,8 @@ public class PropertyKeyConst {
|
|||||||
|
|
||||||
public static final String NAMING_PUSH_EMPTY_PROTECTION = "namingPushEmptyProtection";
|
public static final String NAMING_PUSH_EMPTY_PROTECTION = "namingPushEmptyProtection";
|
||||||
|
|
||||||
|
public static final String NAMING_ASYNC_QUERY_SUBSCRIBE_SERVICE = "namingAsyncQuerySubscribeService";
|
||||||
|
|
||||||
public static final String PUSH_RECEIVER_UDP_PORT = "push.receiver.udp.port";
|
public static final String PUSH_RECEIVER_UDP_PORT = "push.receiver.udp.port";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -61,8 +61,11 @@ public class ServiceInfoUpdateService implements Closeable {
|
|||||||
|
|
||||||
private final InstancesChangeNotifier changeNotifier;
|
private final InstancesChangeNotifier changeNotifier;
|
||||||
|
|
||||||
|
private final boolean asyncQuerySubscribeService;
|
||||||
|
|
||||||
public ServiceInfoUpdateService(Properties properties, ServiceInfoHolder serviceInfoHolder,
|
public ServiceInfoUpdateService(Properties properties, ServiceInfoHolder serviceInfoHolder,
|
||||||
NamingClientProxy namingClientProxy, InstancesChangeNotifier changeNotifier) {
|
NamingClientProxy namingClientProxy, InstancesChangeNotifier changeNotifier) {
|
||||||
|
this.asyncQuerySubscribeService = isAsyncQueryForSubscribeService(properties);
|
||||||
this.executor = new ScheduledThreadPoolExecutor(initPollingThreadCount(properties),
|
this.executor = new ScheduledThreadPoolExecutor(initPollingThreadCount(properties),
|
||||||
new NameThreadFactory("com.alibaba.nacos.client.naming.updater"));
|
new NameThreadFactory("com.alibaba.nacos.client.naming.updater"));
|
||||||
this.serviceInfoHolder = serviceInfoHolder;
|
this.serviceInfoHolder = serviceInfoHolder;
|
||||||
@ -70,6 +73,13 @@ public class ServiceInfoUpdateService implements Closeable {
|
|||||||
this.changeNotifier = changeNotifier;
|
this.changeNotifier = changeNotifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isAsyncQueryForSubscribeService(Properties properties) {
|
||||||
|
if (properties == null || !properties.containsKey(PropertyKeyConst.NAMING_ASYNC_QUERY_SUBSCRIBE_SERVICE)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return ConvertUtils.toBoolean(properties.getProperty(PropertyKeyConst.NAMING_ASYNC_QUERY_SUBSCRIBE_SERVICE), true);
|
||||||
|
}
|
||||||
|
|
||||||
private int initPollingThreadCount(Properties properties) {
|
private int initPollingThreadCount(Properties properties) {
|
||||||
if (properties == null) {
|
if (properties == null) {
|
||||||
return UtilAndComs.DEFAULT_POLLING_THREAD_COUNT;
|
return UtilAndComs.DEFAULT_POLLING_THREAD_COUNT;
|
||||||
@ -86,6 +96,9 @@ public class ServiceInfoUpdateService implements Closeable {
|
|||||||
* @param clusters clusters
|
* @param clusters clusters
|
||||||
*/
|
*/
|
||||||
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
|
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
|
||||||
|
if (!asyncQuerySubscribeService) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
|
String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
|
||||||
if (futureMap.get(serviceKey) != null) {
|
if (futureMap.get(serviceKey) != null) {
|
||||||
return;
|
return;
|
||||||
@ -193,9 +206,10 @@ public class ServiceInfoUpdateService implements Closeable {
|
|||||||
// TODO multiple time can be configured.
|
// TODO multiple time can be configured.
|
||||||
delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
|
delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
|
||||||
resetFailCount();
|
resetFailCount();
|
||||||
|
} catch (NacosException e) {
|
||||||
|
handleNacosException(e);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
incFailCount();
|
handleUnknownException(e);
|
||||||
NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
|
|
||||||
} finally {
|
} finally {
|
||||||
if (!isCancel) {
|
if (!isCancel) {
|
||||||
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
|
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
|
||||||
@ -204,6 +218,20 @@ public class ServiceInfoUpdateService implements Closeable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleNacosException(NacosException e) {
|
||||||
|
incFailCount();
|
||||||
|
int errorCode = e.getErrCode();
|
||||||
|
if (NacosException.SERVER_ERROR == errorCode) {
|
||||||
|
handleUnknownException(e);
|
||||||
|
}
|
||||||
|
NAMING_LOGGER.warn("Can't update serviceName: {}, reason: {}", groupedServiceName, e.getErrMsg());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleUnknownException(Throwable throwable) {
|
||||||
|
incFailCount();
|
||||||
|
NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, throwable);
|
||||||
|
}
|
||||||
|
|
||||||
private void incFailCount() {
|
private void incFailCount() {
|
||||||
int limit = 6;
|
int limit = 6;
|
||||||
if (failCount == limit) {
|
if (failCount == limit) {
|
||||||
|
Loading…
Reference in New Issue
Block a user