[ISSUE #11125]The related configurations of NamingGrpcRedoService sup… (#11126)

* [ISSUE #11125]The related configurations of NamingGrpcRedoService support configurability.

* [ISSUE #11125]Name adjustment.
This commit is contained in:
阿魁 2023-09-14 09:36:10 +08:00 committed by GitHub
parent 279e488e03
commit d3e099b0db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 18 deletions

View File

@ -77,6 +77,10 @@ public class PropertyKeyConst {
public static final String NAMING_ASYNC_QUERY_SUBSCRIBE_SERVICE = "namingAsyncQuerySubscribeService";
public static final String REDO_DELAY_TIME = "redoDelayTime";
public static final String REDO_DELAY_THREAD_COUNT = "redoDelayThreadCount";
/**
* Get the key value of some variable value from the system property.
*/

View File

@ -86,9 +86,9 @@ public class Constants {
public static final String USERNAME = "username";
public static final String TOKEN_REFRESH_WINDOW = "tokenRefreshWindow";
public static final Integer SDK_GRPC_PORT_DEFAULT_OFFSET = 1000;
public static final Integer CLUSTER_GRPC_PORT_DEFAULT_OFFSET = 1001;
/**
@ -216,13 +216,20 @@ public class Constants {
public static final String CLUSTER_NAME_PATTERN_STRING = "^[0-9a-zA-Z-]+$";
/**
* millisecond.
*/
public static final long DEFAULT_REDO_DELAY_TIME = 3000L;
public static final int DEFAULT_REDO_THREAD_COUNT = 1;
/**
* The constants in config directory.
*/
public static class Config {
public static final String CONFIG_MODULE = "config";
public static final String NOTIFY_HEADER = "notify";
}

View File

@ -98,7 +98,7 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
labels.put(Constants.APPNAME, AppNameUtils.getAppName());
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels,
RpcClientTlsConfig.properties(properties.asProperties()));
this.redoService = new NamingGrpcRedoService(this);
this.redoService = new NamingGrpcRedoService(this, properties);
NAMING_LOGGER.info("Create naming rpc client for uuid->{}", uuid);
start(serverListFactory, serviceInfoHolder);
}

View File

@ -16,9 +16,12 @@
package com.alibaba.nacos.client.naming.remote.gprc.redo;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.BatchInstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData;
@ -47,12 +50,9 @@ public class NamingGrpcRedoService implements ConnectionEventListener {
private static final String REDO_THREAD_NAME = "com.alibaba.nacos.client.naming.grpc.redo";
private static final int REDO_THREAD = 1;
private int redoThreadCount;
/**
* TODO get redo delay from config.
*/
private static final long DEFAULT_REDO_DELAY = 3000L;
private long redoDelayTime;
private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();
@ -62,10 +62,17 @@ public class NamingGrpcRedoService implements ConnectionEventListener {
private volatile boolean connected = false;
public NamingGrpcRedoService(NamingGrpcClientProxy clientProxy) {
this.redoExecutor = new ScheduledThreadPoolExecutor(REDO_THREAD, new NameThreadFactory(REDO_THREAD_NAME));
this.redoExecutor.scheduleWithFixedDelay(new RedoScheduledTask(clientProxy, this), DEFAULT_REDO_DELAY,
DEFAULT_REDO_DELAY, TimeUnit.MILLISECONDS);
public NamingGrpcRedoService(NamingGrpcClientProxy clientProxy, NacosClientProperties properties) {
setProperties(properties);
this.redoExecutor = new ScheduledThreadPoolExecutor(redoThreadCount, new NameThreadFactory(REDO_THREAD_NAME));
this.redoExecutor.scheduleWithFixedDelay(new RedoScheduledTask(clientProxy, this), redoDelayTime, redoDelayTime,
TimeUnit.MILLISECONDS);
}
private void setProperties(NacosClientProperties properties) {
redoDelayTime = properties.getLong(PropertyKeyConst.REDO_DELAY_TIME, Constants.DEFAULT_REDO_DELAY_TIME);
redoThreadCount = properties.getInteger(PropertyKeyConst.REDO_DELAY_THREAD_COUNT,
Constants.DEFAULT_REDO_THREAD_COUNT);
}
public ConcurrentMap<String, InstanceRedoData> getRegisteredInstances() {
@ -115,7 +122,7 @@ public class NamingGrpcRedoService implements ConnectionEventListener {
*
* @param serviceName service name
* @param groupName group name
* @param instances batch registered instance
* @param instances batch registered instance
*/
public void cacheInstanceForRedo(String serviceName, String groupName, List<Instance> instances) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
@ -309,6 +316,7 @@ public class NamingGrpcRedoService implements ConnectionEventListener {
/**
* get Cache service.
*
* @return cache service
*/
public InstanceRedoData getRegisteredInstancesByKey(String combinedServiceName) {

View File

@ -16,7 +16,9 @@
package com.alibaba.nacos.client.naming.remote.gprc.redo;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.BatchInstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData;
@ -29,8 +31,10 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
@ -54,9 +58,11 @@ public class NamingGrpcRedoServiceTest {
@Before
public void setUp() throws Exception {
redoService = new NamingGrpcRedoService(clientProxy);
ScheduledExecutorService redoExecutor = (ScheduledExecutorService) ReflectUtils
.getFieldValue(redoService, "redoExecutor");
Properties prop = new Properties();
NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
redoService = new NamingGrpcRedoService(clientProxy, nacosClientProperties);
ScheduledExecutorService redoExecutor = (ScheduledExecutorService) ReflectUtils.getFieldValue(redoService,
"redoExecutor");
redoExecutor.shutdownNow();
}
@ -65,6 +71,42 @@ public class NamingGrpcRedoServiceTest {
redoService.shutdown();
}
@Test
public void testDefaultProperties() throws Exception {
Field redoThreadCountField = NamingGrpcRedoService.class.getDeclaredField("redoThreadCount");
redoThreadCountField.setAccessible(true);
Field redoDelayTimeField = NamingGrpcRedoService.class.getDeclaredField("redoDelayTime");
redoDelayTimeField.setAccessible(true);
Long redoDelayTimeValue = (Long) redoDelayTimeField.get(redoService);
Integer redoThreadCountValue = (Integer) redoThreadCountField.get(redoService);
assertEquals(Long.valueOf(3000L), redoDelayTimeValue);
assertEquals(Integer.valueOf(1), redoThreadCountValue);
}
@Test
public void testCustomProperties() throws Exception {
Properties prop = new Properties();
prop.setProperty(PropertyKeyConst.REDO_DELAY_TIME, "4000");
prop.setProperty(PropertyKeyConst.REDO_DELAY_THREAD_COUNT, "2");
NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
NamingGrpcRedoService redoService = new NamingGrpcRedoService(clientProxy, nacosClientProperties);
Field redoThreadCountField = NamingGrpcRedoService.class.getDeclaredField("redoThreadCount");
redoThreadCountField.setAccessible(true);
Field redoDelayTimeField = NamingGrpcRedoService.class.getDeclaredField("redoDelayTime");
redoDelayTimeField.setAccessible(true);
Long redoDelayTimeValue = (Long) redoDelayTimeField.get(redoService);
Integer redoThreadCountValue = (Integer) redoThreadCountField.get(redoService);
assertEquals(Long.valueOf(4000L), redoDelayTimeValue);
assertEquals(Integer.valueOf(2), redoThreadCountValue);
}
@Test
public void testOnConnected() {
assertFalse(redoService.isConnected());
@ -113,7 +155,8 @@ public class NamingGrpcRedoServiceTest {
instanceList.add(instance);
redoService.cacheInstanceForRedo(SERVICE, GROUP, instanceList);
assertFalse(registeredInstances.isEmpty());
BatchInstanceRedoData actual = (BatchInstanceRedoData) registeredInstances.entrySet().iterator().next().getValue();
BatchInstanceRedoData actual = (BatchInstanceRedoData) registeredInstances.entrySet().iterator().next()
.getValue();
assertEquals(SERVICE, actual.getServiceName());
assertEquals(GROUP, actual.getGroupName());
assertEquals(instanceList, actual.getInstances());