[ISSUE#6384] Add redo feature for nacos client naming (#6395)

* Add RedoData

* Add NamingGrpcRedoService

* Add RedoScheduledTask

* Refactor NamingGrpcClientProxy to use new redo service.

* For PMD

* Remove NamingGrpcConnectionEventListener.java
This commit is contained in:
杨翊 SionYang 2021-07-21 10:08:10 +08:00 committed by GitHub
parent 784cc82d9e
commit fca259fd38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1139 additions and 802 deletions

View File

@ -141,11 +141,11 @@ public class NamingClientProxyDelegate implements NamingClientProxy {
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters); String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey); ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (null == result) { if (null == result) {
result = grpcClientProxy.subscribe(serviceName, groupName, clusters); result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
} }
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
serviceInfoHolder.processServiceInfo(result); serviceInfoHolder.processServiceInfo(result);
return result; return result;
} }

View File

@ -40,6 +40,7 @@ import com.alibaba.nacos.api.selector.SelectorType;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder; import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.event.ServerListChangedEvent; import com.alibaba.nacos.client.naming.event.ServerListChangedEvent;
import com.alibaba.nacos.client.naming.remote.AbstractNamingClientProxy; import com.alibaba.nacos.client.naming.remote.AbstractNamingClientProxy;
import com.alibaba.nacos.client.naming.remote.gprc.redo.NamingGrpcRedoService;
import com.alibaba.nacos.client.security.SecurityProxy; import com.alibaba.nacos.client.security.SecurityProxy;
import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.NotifyCenter;
@ -72,7 +73,7 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
private final RpcClient rpcClient; private final RpcClient rpcClient;
private final NamingGrpcConnectionEventListener namingGrpcConnectionEventListener; private final NamingGrpcRedoService redoService;
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException { Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
@ -84,15 +85,15 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK); labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING); labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels); this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this); this.redoService = new NamingGrpcRedoService(this);
start(serverListFactory, serviceInfoHolder); start(serverListFactory, serviceInfoHolder);
} }
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException { private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.serverListFactory(serverListFactory); rpcClient.serverListFactory(serverListFactory);
rpcClient.start(); rpcClient.registerConnectionListener(redoService);
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder)); rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
rpcClient.registerConnectionListener(namingGrpcConnectionEventListener); rpcClient.start();
NotifyCenter.registerSubscriber(this); NotifyCenter.registerSubscriber(this);
} }
@ -110,10 +111,23 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance); instance);
namingGrpcConnectionEventListener.cacheInstanceForRedo(serviceName, groupName, instance); redoService.cacheInstanceForRedo(serviceName, groupName, instance);
doRegisterService(serviceName, groupName, instance);
}
/**
* Execute register operation.
*
* @param serviceName name of service
* @param groupName group of service
* @param instance instance to register
* @throws NacosException nacos exception
*/
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance); NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class); requestToServer(request, Response.class);
redoService.instanceRegistered(serviceName, groupName);
} }
@Override @Override
@ -121,10 +135,23 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
NAMING_LOGGER NAMING_LOGGER
.info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}", namespaceId, serviceName, .info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}", namespaceId, serviceName,
instance); instance);
namingGrpcConnectionEventListener.removeInstanceForRedo(serviceName, groupName, instance); redoService.instanceDeregister(serviceName, groupName);
doDeregisterService(serviceName, groupName, instance);
}
/**
* Execute deregister operation.
*
* @param serviceName service name
* @param groupName group name
* @param instance instance
* @throws NacosException nacos exception
*/
public void doDeregisterService(String serviceName, String groupName, Instance instance) throws NacosException {
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.DE_REGISTER_INSTANCE, instance); NamingRemoteConstants.DE_REGISTER_INSTANCE, instance);
requestToServer(request, Response.class); requestToServer(request, Response.class);
redoService.removeInstanceForRedo(serviceName, groupName);
} }
@Override @Override
@ -181,21 +208,46 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
@Override @Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
return doSubscribe(serviceName, groupName, clusters);
}
/**
* Execute subscribe operation.
*
* @param serviceName service name
* @param groupName group name
* @param clusters clusters, current only support subscribe all clusters, maybe deprecated
* @return current service info of subscribe service
* @throws NacosException nacos exception
*/
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
true); true);
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class); SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
namingGrpcConnectionEventListener redoService.subscriberRegistered(serviceName, groupName, clusters);
.cacheSubscriberForRedo(NamingUtils.getGroupedName(serviceName, groupName), clusters);
return response.getServiceInfo(); return response.getServiceInfo();
} }
@Override @Override
public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException { public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException {
redoService.subscriberDeregister(serviceName, groupName, clusters);
doUnsubscribe(serviceName, groupName, clusters);
}
/**
* Execute unsubscribe operation.
*
* @param serviceName service name
* @param groupName group name
* @param clusters clusters, current only support subscribe all clusters, maybe deprecated
* @throws NacosException nacos exception
*/
public void doUnsubscribe(String serviceName, String groupName, String clusters) throws NacosException {
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceName, groupName, clusters, SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceName, groupName, clusters,
false); false);
requestToServer(request, SubscribeServiceResponse.class); requestToServer(request, SubscribeServiceResponse.class);
namingGrpcConnectionEventListener redoService.removeSubscriberForRedo(serviceName, groupName, clusters);
.removeSubscriberForRedo(NamingUtils.getGroupedName(serviceName, groupName), clusters);
} }
@Override @Override
@ -232,7 +284,7 @@ public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
@Override @Override
public void shutdown() throws NacosException { public void shutdown() throws NacosException {
rpcClient.shutdown(); rpcClient.shutdown();
namingGrpcConnectionEventListener.shutdown(); redoService.shutdown();
} }
public boolean isEnable() { public boolean isEnable() {

View File

@ -1,171 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote.gprc;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.common.remote.client.ConnectionEventListener;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Naming client gprc connection event listener.
*
* <p>
* When connection reconnect to server, redo the register and subscribe.
* </p>
*
* @author xiweng.yy
*/
public class NamingGrpcConnectionEventListener implements ConnectionEventListener {
private final NamingGrpcClientProxy clientProxy;
private final ConcurrentMap<String, Instance> registeredInstanceCached = new ConcurrentHashMap<>();
private final Set<String> subscribes = new ConcurrentHashSet<String>();
private volatile boolean connected = false;
private static final long DEFAULT_REDO_DELAY = 3000L;
private static final int DEFAULT_REDO_THREAD = 2;
private ScheduledExecutorService redoExecutorService;
public NamingGrpcConnectionEventListener(NamingGrpcClientProxy clientProxy) {
this.clientProxy = clientProxy;
this.redoExecutorService = new ScheduledThreadPoolExecutor(DEFAULT_REDO_THREAD, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.naming.grpc.event.listener");
t.setDaemon(true);
return t;
});
}
@Override
public void onConnected() {
connected = true;
LogUtils.NAMING_LOGGER.info("Grpc re-connect, redo subscribe services");
redoSubscribe(subscribes);
LogUtils.NAMING_LOGGER.info("Grpc re-connect, redo register services");
redoRegisterEachService(registeredInstanceCached.keySet());
}
private void redoSubscribe(Set<String> subscribes) {
Set<String> failedSubscribes = new ConcurrentHashSet<>();
for (String each : subscribes) {
if (!connected) {
failedSubscribes.clear();
break;
}
ServiceInfo serviceInfo = ServiceInfo.fromKey(each);
try {
clientProxy.subscribe(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters());
} catch (NacosException e) {
failedSubscribes.add(each);
LogUtils.NAMING_LOGGER.warn(String.format("re subscribe service %s failed, try again later.", serviceInfo.getName()), e);
}
}
if (!failedSubscribes.isEmpty()) {
redoExecutorService.schedule(() -> redoSubscribe(failedSubscribes), DEFAULT_REDO_DELAY, TimeUnit.MILLISECONDS);
}
}
private void redoRegisterEachService(Set<String> services) {
Set<String> failedServices = new ConcurrentHashSet<>();
for (String each : services) {
if (!connected) {
failedServices.clear();
break;
}
String serviceName = NamingUtils.getServiceName(each);
String groupName = NamingUtils.getGroupName(each);
Instance instance = registeredInstanceCached.get(each);
if (!redoRegisterEachInstance(serviceName, groupName, instance)) {
failedServices.add(each);
}
}
if (!failedServices.isEmpty()) {
redoExecutorService.schedule(() -> redoRegisterEachService(failedServices), DEFAULT_REDO_DELAY, TimeUnit.MILLISECONDS);
}
}
private boolean redoRegisterEachInstance(String serviceName, String groupName, Instance instance) {
try {
clientProxy.registerService(serviceName, groupName, instance);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.warn(String
.format("redo register for service %s@@%s, %s failed, try again later.", groupName, serviceName, instance.toString()), e);
return false;
}
return true;
}
@Override
public void onDisConnect() {
connected = false;
LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect");
}
/**
* Cache registered instance for redo.
*
* @param serviceName service name
* @param groupName group name
* @param instance registered instance
*/
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
registeredInstanceCached.put(key, instance);
}
/**
* Remove registered instance for redo.
*
* @param serviceName service name
* @param groupName group name
* @param instance registered instance
*/
public void removeInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
registeredInstanceCached.remove(key);
}
public void cacheSubscriberForRedo(String fullServiceName, String cluster) {
subscribes.add(ServiceInfo.getKey(fullServiceName, cluster));
}
public void removeSubscriberForRedo(String fullServiceName, String cluster) {
subscribes.remove(ServiceInfo.getKey(fullServiceName, cluster));
}
public void shutdown() {
LogUtils.NAMING_LOGGER.info("Shutdown grpc event listener executor " + redoExecutorService);
redoExecutorService.shutdownNow();
}
}

View File

@ -0,0 +1,257 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote.gprc.redo;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.remote.client.ConnectionEventListener;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Naming client gprc redo service.
*
* <p>When connection reconnect to server, redo the register and subscribe.
*
* @author xiweng.yy
*/
public class NamingGrpcRedoService implements ConnectionEventListener {
private static final String REDO_THREAD_NAME = "com.alibaba.nacos.client.naming.grpc.redo";
private static final int REDO_THREAD = 1;
/**
* TODO get redo delay from config.
*/
private static final long DEFAULT_REDO_DELAY = 3000L;
private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();
private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();
private final ScheduledExecutorService redoExecutor;
private volatile boolean connected = false;
public NamingGrpcRedoService(NamingGrpcClientProxy clientProxy) {
this.redoExecutor = new ScheduledThreadPoolExecutor(REDO_THREAD, new NameThreadFactory(REDO_THREAD_NAME));
this.redoExecutor.scheduleWithFixedDelay(new RedoScheduledTask(clientProxy, this), DEFAULT_REDO_DELAY,
DEFAULT_REDO_DELAY, TimeUnit.MILLISECONDS);
}
public boolean isConnected() {
return connected;
}
@Override
public void onConnected() {
connected = true;
LogUtils.NAMING_LOGGER.info("Grpc connection connect");
}
@Override
public void onDisConnect() {
connected = false;
LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo");
synchronized (registeredInstances) {
registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));
}
synchronized (subscribes) {
subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));
}
LogUtils.NAMING_LOGGER.warn("mark to redo completed");
}
/**
* Cache registered instance for redo.
*
* @param serviceName service name
* @param groupName group name
* @param instance registered instance
*/
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
synchronized (registeredInstances) {
registeredInstances.put(key, redoData);
}
}
/**
* Instance register successfully, mark registered status as {@code true}.
*
* @param serviceName service name
* @param groupName group name
*/
public void instanceRegistered(String serviceName, String groupName) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
synchronized (registeredInstances) {
InstanceRedoData redoData = registeredInstances.get(key);
if (null != redoData) {
redoData.setRegistered(true);
}
}
}
/**
* Instance deregister, mark unregistering status as {@code true}.
*
* @param serviceName service name
* @param groupName group name
*/
public void instanceDeregister(String serviceName, String groupName) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
synchronized (registeredInstances) {
InstanceRedoData redoData = registeredInstances.get(key);
if (null != redoData) {
redoData.setUnregistering(true);
}
}
}
/**
* Remove registered instance for redo.
*
* @param serviceName service name
* @param groupName group name
*/
public void removeInstanceForRedo(String serviceName, String groupName) {
synchronized (registeredInstances) {
registeredInstances.remove(NamingUtils.getGroupedName(serviceName, groupName));
}
}
/**
* Find all instance redo data which need do redo.
*
* @return set of {@code InstanceRedoData} need to do redo.
*/
public Set<InstanceRedoData> findInstanceRedoData() {
Set<InstanceRedoData> result = new HashSet<>();
synchronized (registeredInstances) {
for (InstanceRedoData each : registeredInstances.values()) {
if (each.isNeedRedo()) {
result.add(each);
}
}
}
return result;
}
/**
* Cache subscriber for redo.
*
* @param serviceName service name
* @param groupName group name
* @param cluster cluster
*/
public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);
synchronized (subscribes) {
subscribes.put(key, redoData);
}
}
/**
* Subscriber register successfully, mark registered status as {@code true}.
*
* @param serviceName service name
* @param groupName group name
* @param cluster cluster
*/
public void subscriberRegistered(String serviceName, String groupName, String cluster) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
synchronized (subscribes) {
SubscriberRedoData redoData = subscribes.get(key);
if (null != redoData) {
redoData.setRegistered(true);
}
}
}
/**
* Subscriber deregister, mark unregistering status as {@code true}.
*
* @param serviceName service name
* @param groupName group name
* @param cluster cluster
*/
public void subscriberDeregister(String serviceName, String groupName, String cluster) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
synchronized (subscribes) {
SubscriberRedoData redoData = subscribes.get(key);
if (null != redoData) {
redoData.setUnregistering(true);
}
}
}
/**
* Remove subscribe for redo.
*
* @param serviceName service name
* @param groupName group name
* @param cluster cluster
*/
public void removeSubscriberForRedo(String serviceName, String groupName, String cluster) {
synchronized (subscribes) {
subscribes.remove(ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster));
}
}
/**
* Find all subscriber redo data which need do redo.
*
* @return set of {@code InstanceRedoData} need to do redo.
*/
public Set<SubscriberRedoData> findSubscriberRedoData() {
Set<SubscriberRedoData> result = new HashSet<>();
synchronized (subscribes) {
for (SubscriberRedoData each : subscribes.values()) {
if (each.isNeedRedo()) {
result.add(each);
}
}
}
return result;
}
/**
* Shutdown redo service.
*/
public void shutdown() {
LogUtils.NAMING_LOGGER.info("Shutdown grpc event listener executor " + redoExecutor);
registeredInstances.clear();
subscribes.clear();
redoExecutor.shutdownNow();
}
}

View File

@ -0,0 +1,134 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote.gprc.redo;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.RedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
/**
* Redo task.
*
* @author xiweng.yy
*/
public class RedoScheduledTask extends AbstractExecuteTask {
private final NamingGrpcClientProxy clientProxy;
private final NamingGrpcRedoService redoService;
public RedoScheduledTask(NamingGrpcClientProxy clientProxy, NamingGrpcRedoService redoService) {
this.clientProxy = clientProxy;
this.redoService = redoService;
}
@Override
public void run() {
if (!redoService.isConnected()) {
LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
return;
}
try {
redoForInstances();
redoForSubscribes();
} catch (Exception e) {
LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
}
}
private void redoForInstances() {
for (InstanceRedoData each : redoService.findInstanceRedoData()) {
try {
redoForInstance(each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(),
each.getGroupName(), each.getServiceName(), e);
}
}
}
private void redoForInstance(InstanceRedoData redoData) throws NacosException {
RedoData.RedoType redoType = redoData.getRedoType();
String serviceName = redoData.getServiceName();
String groupName = redoData.getGroupName();
LogUtils.NAMING_LOGGER.info("Redo instance operation {} for {}@@{}", redoType, groupName, serviceName);
switch (redoType) {
case REGISTER:
if (isClientDisabled()) {
return;
}
clientProxy.doRegisterService(serviceName, groupName, redoData.get());
break;
case UNREGISTER:
if (isClientDisabled()) {
return;
}
clientProxy.doDeregisterService(serviceName, groupName, redoData.get());
break;
case REMOVE:
redoService.removeInstanceForRedo(serviceName, groupName);
break;
default:
}
}
private void redoForSubscribes() {
for (SubscriberRedoData each : redoService.findSubscriberRedoData()) {
try {
redoForSubscribe(each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.error("Redo subscriber operation {} for {}@@{}#{} failed. ", each.getRedoType(),
each.getGroupName(), each.getServiceName(), each.get(), e);
}
}
}
private void redoForSubscribe(SubscriberRedoData redoData) throws NacosException {
RedoData.RedoType redoType = redoData.getRedoType();
String serviceName = redoData.getServiceName();
String groupName = redoData.getGroupName();
String cluster = redoData.get();
LogUtils.NAMING_LOGGER.info("Redo subscriber operation {} for {}@@{}#{}", redoType, groupName, serviceName, cluster);
switch (redoData.getRedoType()) {
case REGISTER:
if (isClientDisabled()) {
return;
}
clientProxy.doSubscribe(serviceName, groupName, cluster);
break;
case UNREGISTER:
if (isClientDisabled()) {
return;
}
clientProxy.doUnsubscribe(serviceName, groupName, cluster);
break;
case REMOVE:
redoService.removeSubscriberForRedo(redoData.getServiceName(), redoData.getGroupName(), redoData.get());
break;
default:
}
}
private boolean isClientDisabled() {
return !clientProxy.isEnable();
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote.gprc.redo.data;
import com.alibaba.nacos.api.naming.pojo.Instance;
/**
* Redo data for register service instance.
*
* @author xiweng.yy
*/
public class InstanceRedoData extends RedoData<Instance> {
private InstanceRedoData(String serviceName, String groupName) {
super(serviceName, groupName);
}
/**
* Build a new {@code RedoData} for register service instance.
*
* @param serviceName service name for redo data
* @param groupName group name for redo data
* @param instance instance for redo data
* @return new {@code RedoData} for register service instance
*/
public static InstanceRedoData build(String serviceName, String groupName, Instance instance) {
InstanceRedoData result = new InstanceRedoData(serviceName, groupName);
result.set(instance);
return result;
}
}

View File

@ -0,0 +1,130 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote.gprc.redo.data;
/**
* Nacos naming redo data.
*
* @author xiweng.yy
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class RedoData<T> {
private final String serviceName;
private final String groupName;
/**
* If {@code true} means cached data has been registered to server successfully.
*/
private volatile boolean registered;
/**
* If {@code true} means cached data is unregistering from server.
*/
private volatile boolean unregistering;
private T data;
protected RedoData(String serviceName, String groupName) {
this.serviceName = serviceName;
this.groupName = groupName;
}
public String getServiceName() {
return serviceName;
}
public String getGroupName() {
return groupName;
}
public boolean isRegistered() {
return registered;
}
public void setRegistered(boolean registered) {
this.registered = registered;
}
public boolean isUnregistering() {
return unregistering;
}
public void setUnregistering(boolean unregistering) {
this.unregistering = unregistering;
}
public T get() {
return data;
}
public void set(T data) {
this.data = data;
}
/**
* Get redo type for current redo data.
*
* <ul>
* <li>{@code registered=true} & {@code unregistering=false} means data has registered, so redo should not do anything.</li>
* <li>{@code registered=true} & {@code unregistering=true} means data has registered and now need unregister.</li>
* <li>{@code registered=false} & {@code unregistering=false} means not registered yet, need register again.</li>
* <li>{@code registered=false} & {@code unregistering=true} means not registered yet and not continue to register.</li>
* </ul>
*
* @return redo type
*/
public RedoType getRedoType() {
if (isRegistered() && !isUnregistering()) {
return RedoType.NONE;
} else if (isRegistered() && isUnregistering()) {
return RedoType.UNREGISTER;
} else if (!isRegistered() && !isUnregistering()) {
return RedoType.REGISTER;
} else {
return RedoType.REMOVE;
}
}
public boolean isNeedRedo() {
return !RedoType.NONE.equals(getRedoType());
}
public enum RedoType {
/**
* Redo register.
*/
REGISTER,
/**
* Redo unregister.
*/
UNREGISTER,
/**
* Redo nothing.
*/
NONE,
/**
* Remove redo data.
*/
REMOVE;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote.gprc.redo.data;
/**
* Redo data for subscribers.
*
* @author xiweng.yy
*/
public class SubscriberRedoData extends RedoData<String> {
private SubscriberRedoData(String serviceName, String groupName) {
super(serviceName, groupName);
}
/**
* Build a new {@code RedoData} for subscribers.
*
* @param serviceName service name for redo data
* @param groupName group name for redo data
* @param clusters clusters for redo data
* @return new {@code RedoData} for subscribers
*/
public static SubscriberRedoData build(String serviceName, String groupName, String clusters) {
SubscriberRedoData result = new SubscriberRedoData(serviceName, groupName);
result.set(clusters);
return result;
}
}

View File

@ -39,6 +39,7 @@ import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.api.selector.NoneSelector; import com.alibaba.nacos.api.selector.NoneSelector;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder; import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.event.ServerListChangedEvent; import com.alibaba.nacos.client.naming.event.ServerListChangedEvent;
import com.alibaba.nacos.client.naming.remote.gprc.redo.NamingGrpcRedoService;
import com.alibaba.nacos.client.security.SecurityProxy; import com.alibaba.nacos.client.security.SecurityProxy;
import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.ConnectionType;
@ -46,10 +47,13 @@ import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.ServerListFactory; import com.alibaba.nacos.common.remote.client.ServerListFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import org.mockito.ArgumentMatcher; import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.Arrays; import java.util.Arrays;
@ -62,480 +66,215 @@ import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class NamingGrpcClientProxyTest { public class NamingGrpcClientProxyTest {
private static final String NAMESPACE_ID = "ns1";
private static final String SERVICE_NAME = "service1";
private static final String GROUP_NAME = "group1";
private static final String CLUSTERS = "cluster1";
@Mock
private SecurityProxy proxy;
@Mock
private ServerListFactory factory;
@Mock
private ServiceInfoHolder holder;
@Mock
private RpcClient rpcClient;
private Properties prop;
private NamingGrpcClientProxy client;
private Response response;
private Instance instance;
@Rule @Rule
public final ExpectedException thrown = ExpectedException.none(); public final ExpectedException thrown = ExpectedException.none();
@Test @Before
public void testRegisterService() throws NacosException, NoSuchFieldException, IllegalAccessException { public void setUp() throws NacosException, NoSuchFieldException, IllegalAccessException {
// given prop = new Properties();
String namespaceId = "ns1"; client = new NamingGrpcClientProxy(NAMESPACE_ID, proxy, factory, prop, holder);
SecurityProxy proxy = mock(SecurityProxy.class); Field rpcClientField = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
ServerListFactory factory = mock(ServerListFactory.class); rpcClientField.setAccessible(true);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class); rpcClientField.set(client, this.rpcClient);
Properties prop = new Properties(); response = new InstanceResponse();
when(this.rpcClient.request(any())).thenReturn(response);
String serviceName = "service1"; instance = new Instance();
Instance instance = new Instance(); instance.setServiceName(SERVICE_NAME);
instance.setServiceName(serviceName);
instance.setIp("1.1.1.1"); instance.setIp("1.1.1.1");
instance.setPort(1111); instance.setPort(1111);
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
// inject rpcClient;
RpcClient rpc = mock(RpcClient.class);
Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClient.setAccessible(true);
rpcClient.set(client, rpc);
Response res = new InstanceResponse();
when(rpc.request(any())).thenReturn(res);
String groupName = "group1";
// when
client.registerService(serviceName, groupName, instance);
// then
verify(rpc, times(1)).request(argThat(new ArgumentMatcher<Request>() {
@Override
public boolean matches(Request request) {
if (request instanceof InstanceRequest) {
InstanceRequest request1 = (InstanceRequest) request;
return request1.getType().equals(NamingRemoteConstants.REGISTER_INSTANCE);
}
return false;
}
}));
} }
@Test @Test
public void testDeregisterService() throws NacosException, NoSuchFieldException, IllegalAccessException { public void testRegisterService() throws NacosException {
// given client.registerService(SERVICE_NAME, GROUP_NAME, instance);
String namespaceId = "ns1"; verify(this.rpcClient, times(1)).request(argThat(request -> {
SecurityProxy proxy = mock(SecurityProxy.class); if (request instanceof InstanceRequest) {
ServerListFactory factory = mock(ServerListFactory.class); InstanceRequest request1 = (InstanceRequest) request;
ServiceInfoHolder holder = mock(ServiceInfoHolder.class); return request1.getType().equals(NamingRemoteConstants.REGISTER_INSTANCE);
Properties prop = new Properties();
String serviceName = "service1";
Instance instance = new Instance();
instance.setServiceName(serviceName);
instance.setIp("1.1.1.1");
instance.setPort(1111);
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
// inject rpcClient;
RpcClient rpc = mock(RpcClient.class);
Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClient.setAccessible(true);
rpcClient.set(client, rpc);
Response res = new InstanceResponse();
when(rpc.request(any())).thenReturn(res);
String groupName = "group1";
// when
client.deregisterService(serviceName, groupName, instance);
// then
verify(rpc, times(1)).request(argThat(new ArgumentMatcher<Request>() {
@Override
public boolean matches(Request request) {
if (request instanceof InstanceRequest) {
InstanceRequest request1 = (InstanceRequest) request;
return request1.getType().equals(NamingRemoteConstants.DE_REGISTER_INSTANCE);
}
return false;
} }
return false;
}));
}
@Test
public void testDeregisterService() throws NacosException {
client.deregisterService(SERVICE_NAME, GROUP_NAME, instance);
verify(this.rpcClient, times(1)).request(argThat(request -> {
if (request instanceof InstanceRequest) {
InstanceRequest request1 = (InstanceRequest) request;
return request1.getType().equals(NamingRemoteConstants.DE_REGISTER_INSTANCE);
}
return false;
})); }));
} }
@Test @Test
public void testUpdateInstance() throws Exception { public void testUpdateInstance() throws Exception {
//TODO thrown.expect(UnsupportedOperationException.class); //TODO thrown.expect(UnsupportedOperationException.class);
String namespaceId = "ns1"; client.updateInstance(SERVICE_NAME, GROUP_NAME, instance);
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
String serviceName = "service1";
Instance instance = new Instance();
instance.setServiceName(serviceName);
instance.setIp("1.1.1.1");
instance.setPort(1111);
String groupName = "group1";
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
client.updateInstance(serviceName, groupName, instance);
} }
@Test @Test
public void testQueryInstancesOfService() throws Exception { public void testQueryInstancesOfService() throws Exception {
// given
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
String serviceName = "service1";
Instance instance = new Instance();
instance.setServiceName(serviceName);
instance.setIp("1.1.1.1");
instance.setPort(1111);
// inject rpcClient;
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
RpcClient rpc = mock(RpcClient.class);
Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClient.setAccessible(true);
rpcClient.set(client, rpc);
QueryServiceResponse res = new QueryServiceResponse(); QueryServiceResponse res = new QueryServiceResponse();
String clusters = "cluster1"; ServiceInfo info = new ServiceInfo(GROUP_NAME + "@@" + SERVICE_NAME + "@@" + CLUSTERS);
String groupName = "group1";
ServiceInfo info = new ServiceInfo(groupName + "@@" + serviceName + "@@" + clusters);
res.setServiceInfo(info); res.setServiceInfo(info);
when(rpc.request(any())).thenReturn(res); when(this.rpcClient.request(any())).thenReturn(res);
ServiceInfo actual = client.queryInstancesOfService(SERVICE_NAME, GROUP_NAME, CLUSTERS, 0, false);
// when
ServiceInfo actual = client.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
Assert.assertEquals(info, actual); Assert.assertEquals(info, actual);
} }
@Test @Test
public void testQueryService() throws Exception { public void testQueryService() throws Exception {
// given Service service = client.queryService(SERVICE_NAME, GROUP_NAME);
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
String serviceName = "service1";
String groupName = "group1";
// when
Service service = client.queryService(serviceName, groupName);
// then
Assert.assertNull(service); Assert.assertNull(service);
} }
@Test @Test
public void testCreateService() throws Exception { public void testCreateService() throws Exception {
//TODO thrown.expect(UnsupportedOperationException.class); //TODO thrown.expect(UnsupportedOperationException.class);
// given
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
Service service = new Service(); Service service = new Service();
AbstractSelector selector = new NoneSelector(); AbstractSelector selector = new NoneSelector();
// when
client.createService(service, selector); client.createService(service, selector);
} }
@Test @Test
public void testDeleteService() throws Exception { public void testDeleteService() throws Exception {
//TODO thrown.expect(UnsupportedOperationException.class); //TODO thrown.expect(UnsupportedOperationException.class);
// given Assert.assertFalse(client.deleteService(SERVICE_NAME, GROUP_NAME));
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
String serviceName = "service1";
String groupName = "group1";
// when
Assert.assertFalse(client.deleteService(serviceName, groupName));
} }
@Test @Test
public void testUpdateService() throws NacosException { public void testUpdateService() throws NacosException {
//TODO thrown.expect(UnsupportedOperationException.class); //TODO thrown.expect(UnsupportedOperationException.class);
// given
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
Service service = new Service(); Service service = new Service();
AbstractSelector selector = new NoneSelector(); AbstractSelector selector = new NoneSelector();
// when
client.updateService(service, selector); client.updateService(service, selector);
} }
@Test @Test
public void testGetServiceList() throws Exception { public void testGetServiceList() throws Exception {
// given
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
String serviceName = "service1";
Instance instance = new Instance();
instance.setServiceName(serviceName);
instance.setIp("1.1.1.1");
instance.setPort(1111);
// inject rpcClient;
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
RpcClient rpc = mock(RpcClient.class);
Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClient.setAccessible(true);
rpcClient.set(client, rpc);
ServiceListResponse res = new ServiceListResponse(); ServiceListResponse res = new ServiceListResponse();
List<String> services = Arrays.asList("service1", "service2"); List<String> services = Arrays.asList("service1", "service2");
res.setServiceNames(services); res.setServiceNames(services);
res.setCount(5); res.setCount(5);
when(rpc.request(any())).thenReturn(res); when(this.rpcClient.request(any())).thenReturn(res);
AbstractSelector selector = new NoneSelector(); AbstractSelector selector = new NoneSelector();
String groupName = "group1"; ListView<String> serviceList = client.getServiceList(1, 10, GROUP_NAME, selector);
// when
ListView<String> serviceList = client.getServiceList(1, 10, groupName, selector);
Assert.assertEquals(5, serviceList.getCount()); Assert.assertEquals(5, serviceList.getCount());
Assert.assertEquals(services, serviceList.getData()); Assert.assertEquals(services, serviceList.getData());
} }
@Test @Test
public void testSubscribe() throws Exception { public void testSubscribe() throws Exception {
// given
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
String serviceName = "service1";
Instance instance = new Instance();
instance.setServiceName(serviceName);
instance.setIp("1.1.1.1");
instance.setPort(1111);
// inject rpcClient;
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
RpcClient rpc = mock(RpcClient.class);
Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClient.setAccessible(true);
rpcClient.set(client, rpc);
SubscribeServiceResponse res = new SubscribeServiceResponse(); SubscribeServiceResponse res = new SubscribeServiceResponse();
String clusters = "cluster1"; ServiceInfo info = new ServiceInfo(GROUP_NAME + "@@" + SERVICE_NAME + "@@" + CLUSTERS);
String groupName = "group1";
ServiceInfo info = new ServiceInfo(groupName + "@@" + serviceName + "@@" + clusters);
res.setServiceInfo(info); res.setServiceInfo(info);
when(rpc.request(any())).thenReturn(res); when(this.rpcClient.request(any())).thenReturn(res);
ServiceInfo actual = client.subscribe(SERVICE_NAME, GROUP_NAME, CLUSTERS);
// when
ServiceInfo actual = client.subscribe(serviceName, groupName, clusters);
Assert.assertEquals(info, actual); Assert.assertEquals(info, actual);
} }
@Test @Test
public void testUnsubscribe() throws Exception { public void testUnsubscribe() throws Exception {
// given
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
String serviceName = "service1";
Instance instance = new Instance();
instance.setServiceName(serviceName);
instance.setIp("1.1.1.1");
instance.setPort(1111);
// inject rpcClient;
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
RpcClient rpc = mock(RpcClient.class);
Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClient.setAccessible(true);
rpcClient.set(client, rpc);
SubscribeServiceResponse res = new SubscribeServiceResponse(); SubscribeServiceResponse res = new SubscribeServiceResponse();
String clusters = "cluster1"; ServiceInfo info = new ServiceInfo(GROUP_NAME + "@@" + SERVICE_NAME + "@@" + CLUSTERS);
String groupName = "group1";
ServiceInfo info = new ServiceInfo(groupName + "@@" + serviceName + "@@" + clusters);
res.setServiceInfo(info); res.setServiceInfo(info);
when(rpc.request(any())).thenReturn(res); when(this.rpcClient.request(any())).thenReturn(res);
client.unsubscribe(SERVICE_NAME, GROUP_NAME, CLUSTERS);
// when verify(this.rpcClient, times(1)).request(argThat(request -> {
client.unsubscribe(serviceName, groupName, clusters); if (request instanceof SubscribeServiceRequest) {
SubscribeServiceRequest request1 = (SubscribeServiceRequest) request;
verify(rpc, times(1)).request(argThat(new ArgumentMatcher<Request>() { // not subscribe
@Override return !request1.isSubscribe();
public boolean matches(Request request) {
if (request instanceof SubscribeServiceRequest) {
SubscribeServiceRequest request1 = (SubscribeServiceRequest) request;
// not subscribe
return !request1.isSubscribe();
}
return false;
} }
return false;
})); }));
} }
@Test @Test
public void testUpdateBeatInfo() throws NacosException { public void testUpdateBeatInfo() {
//TODO thrown.expect(UnsupportedOperationException.class); //TODO thrown.expect(UnsupportedOperationException.class);
// given
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
// when
client.updateBeatInfo(new HashSet<>()); client.updateBeatInfo(new HashSet<>());
} }
@Test @Test
public void testServerHealthy() throws NacosException, IllegalAccessException, NoSuchFieldException { public void testServerHealthy() {
// given when(this.rpcClient.isRunning()).thenReturn(true);
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
// inject rpcClient;
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
RpcClient rpc = mock(RpcClient.class);
Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClient.setAccessible(true);
rpcClient.set(client, rpc);
when(rpc.isRunning()).thenReturn(true);
// when
Assert.assertTrue(client.serverHealthy()); Assert.assertTrue(client.serverHealthy());
verify(this.rpcClient, times(1)).isRunning();
verify(rpc, times(1)).isRunning();
} }
@Test @Test
public void testShutdown() throws Exception { public void testShutdown() throws Exception {
// given
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
// inject rpcClient;
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
RpcClient rpc = mock(RpcClient.class);
Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClient.setAccessible(true);
rpcClient.set(client, rpc);
when(rpc.isRunning()).thenReturn(true);
// when
client.shutdown(); client.shutdown();
verify(this.rpcClient, times(1)).shutdown();
verify(rpc, times(1)).shutdown();
} }
@Test @Test
public void testIsEnable() throws Exception { public void testIsEnable() {
// given when(this.rpcClient.isRunning()).thenReturn(true);
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
// inject rpcClient;
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
RpcClient rpc = mock(RpcClient.class);
Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClient.setAccessible(true);
rpcClient.set(client, rpc);
when(rpc.isRunning()).thenReturn(true);
// when
Assert.assertTrue(client.isEnable()); Assert.assertTrue(client.isEnable());
verify(this.rpcClient, times(1)).isRunning();
verify(rpc, times(1)).isRunning();
} }
@Test @Test
public void testServerListChanged() throws Exception { public void testServerListChanged() throws Exception {
// given
String namespaceId = "ns1";
SecurityProxy proxy = mock(SecurityProxy.class);
ServerListFactory factory = mock(ServerListFactory.class);
ServiceInfoHolder holder = mock(ServiceInfoHolder.class);
Properties prop = new Properties();
String originServer = "www.google.com"; String originServer = "www.google.com";
when(factory.getServerList()).thenReturn(Stream.of(originServer, "anotherServer").collect(Collectors.toList())); List<String> serverList = Stream.of(originServer, "anotherServer").collect(Collectors.toList());
// inject rpcClient; when(factory.getServerList()).thenReturn(serverList);
NamingGrpcClientProxy client = new NamingGrpcClientProxy(namespaceId, proxy, factory, prop, holder);
when(factory.genNextServer()).thenReturn(originServer); when(factory.genNextServer()).thenReturn(originServer);
RpcClient rpc = new RpcClient("testServerListHasChanged", factory) { RpcClient rpc = new RpcClient("testServerListHasChanged", factory) {
@Override @Override
public ConnectionType getConnectionType() { public ConnectionType getConnectionType() {
return ConnectionType.GRPC; return ConnectionType.GRPC;
} }
@Override @Override
public int rpcPortOffset() { public int rpcPortOffset() {
return 0; return 0;
} }
@Override @Override
public Connection connectToServer(ServerInfo serverInfo) throws Exception { public Connection connectToServer(ServerInfo serverInfo) throws Exception {
return new Connection(serverInfo) { return new Connection(serverInfo) {
@Override @Override
public Response request(Request request, long timeoutMills) throws NacosException { public Response request(Request request, long timeoutMills) throws NacosException {
Response response = new Response() { Response response = new Response() {
@ -543,17 +282,17 @@ public class NamingGrpcClientProxyTest {
response.setRequestId(request.getRequestId()); response.setRequestId(request.getRequestId());
return response; return response;
} }
@Override @Override
public RequestFuture requestFuture(Request request) throws NacosException { public RequestFuture requestFuture(Request request) throws NacosException {
return new DefaultRequestFuture("test", request.getRequestId()); return new DefaultRequestFuture("test", request.getRequestId());
} }
@Override @Override
public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException { public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException {
} }
@Override @Override
public void close() { public void close() {
} }
@ -563,12 +302,12 @@ public class NamingGrpcClientProxyTest {
Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient"); Field rpcClient = NamingGrpcClientProxy.class.getDeclaredField("rpcClient");
rpcClient.setAccessible(true); rpcClient.setAccessible(true);
rpcClient.set(client, rpc); rpcClient.set(client, rpc);
rpc.serverListFactory(factory); rpc.serverListFactory(factory);
rpc.registerServerRequestHandler(new NamingPushRequestHandler(holder)); rpc.registerServerRequestHandler(new NamingPushRequestHandler(holder));
Field listenerField = NamingGrpcClientProxy.class.getDeclaredField("namingGrpcConnectionEventListener"); Field listenerField = NamingGrpcClientProxy.class.getDeclaredField("redoService");
listenerField.setAccessible(true); listenerField.setAccessible(true);
NamingGrpcConnectionEventListener listener = (NamingGrpcConnectionEventListener) listenerField.get(client); NamingGrpcRedoService listener = (NamingGrpcRedoService) listenerField.get(client);
rpc.registerConnectionListener(listener); rpc.registerConnectionListener(listener);
rpc.start(); rpc.start();
int retry = 10; int retry = 10;
@ -578,14 +317,14 @@ public class NamingGrpcClientProxyTest {
Assert.fail("rpc is not running"); Assert.fail("rpc is not running");
} }
} }
Assert.assertEquals(originServer, rpc.getCurrentServer().getServerIp()); Assert.assertEquals(originServer, rpc.getCurrentServer().getServerIp());
String newServer = "www.aliyun.com"; String newServer = "www.aliyun.com";
when(factory.genNextServer()).thenReturn(newServer); when(factory.genNextServer()).thenReturn(newServer);
when(factory.getServerList()).thenReturn(Stream.of(newServer, "anotherServer").collect(Collectors.toList())); when(factory.getServerList()).thenReturn(Stream.of(newServer, "anotherServer").collect(Collectors.toList()));
NotifyCenter.publishEvent(new ServerListChangedEvent()); NotifyCenter.publishEvent(new ServerListChangedEvent());
retry = 10; retry = 10;
while (originServer.equals(rpc.getCurrentServer().getServerIp())) { while (originServer.equals(rpc.getCurrentServer().getServerIp())) {
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
@ -593,7 +332,7 @@ public class NamingGrpcClientProxyTest {
Assert.fail("failed to auth switch server"); Assert.fail("failed to auth switch server");
} }
} }
Assert.assertEquals(newServer, rpc.getCurrentServer().getServerIp()); Assert.assertEquals(newServer, rpc.getCurrentServer().getServerIp());
} }
} }

View File

@ -1,248 +0,0 @@
/*
*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.alibaba.nacos.client.naming.remote.gprc;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import org.junit.Test;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class NamingGrpcConnectionEventListenerTest {
@Test
public void testCacheInstanceForRedo() throws NacosException {
//given
NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class);
NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy);
String serviceName = "service1";
String groupName = "group1";
Instance instance = new Instance();
listener.cacheInstanceForRedo(serviceName, groupName, instance);
//when
listener.onConnected();
//then
verify(proxy, times(1)).registerService(serviceName, groupName, instance);
}
@Test
public void testRemoveInstanceForRedo() throws NacosException {
//given
NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class);
NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy);
String serviceName = "service1";
String groupName = "group1";
Instance instance = new Instance();
listener.cacheInstanceForRedo(serviceName, groupName, instance);
listener.removeInstanceForRedo(serviceName, groupName, instance);
//when
listener.onConnected();
//then
verify(proxy, times(0)).registerService(serviceName, groupName, instance);
}
@Test
public void testCacheSubscriberForRedo() throws NacosException {
//given
NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class);
NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy);
String fullServiceName = "group1@@service1";
String cluster = "cluster1";
listener.cacheSubscriberForRedo(fullServiceName, cluster);
//when
listener.onConnected();
//then
ServiceInfo info = ServiceInfo.fromKey(fullServiceName);
verify(proxy, times(1)).subscribe(info.getName(), info.getGroupName(), cluster);
}
@Test
public void testRemoveSubscriberForRedo() throws NacosException {
//given
NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class);
NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy);
String fullServiceName = "group1@@service1";
String cluster = "cluster1";
listener.cacheSubscriberForRedo(fullServiceName, cluster);
listener.removeSubscriberForRedo(fullServiceName, cluster);
//when
listener.onConnected();
//then
ServiceInfo info = ServiceInfo.fromKey(fullServiceName);
verify(proxy, times(0)).subscribe(info.getName(), info.getGroupName(), cluster);
}
@Test
public void testRedoRegisterEachService1()
throws NacosException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {
//given
NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class);
NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy);
String serviceName = "service1";
String groupName = "group1";
Instance instance = new Instance();
listener.cacheInstanceForRedo(serviceName, groupName, instance);
Set<String> failedServices = new ConcurrentHashSet<>();
failedServices.add(NamingUtils.getGroupedName(serviceName, groupName));
//when
Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected");
connected.setAccessible(true);
connected.setBoolean(listener, true);
Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoRegisterEachService", Set.class);
method.setAccessible(true);
method.invoke(listener, failedServices);
//then
verify(proxy, times(1)).registerService(serviceName, groupName, instance);
}
@Test
public void testRedoRegisterEachService2()
throws NacosException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {
//given
NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class);
NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy);
String serviceName = "service1";
String groupName = "group1";
Instance instance = new Instance();
listener.cacheInstanceForRedo(serviceName, groupName, instance);
Set<String> failedServices = new ConcurrentHashSet<>();
failedServices.add(NamingUtils.getGroupedName(serviceName, groupName));
//when
Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected");
connected.setAccessible(true);
connected.setBoolean(listener, false);
Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoRegisterEachService", Set.class);
method.setAccessible(true);
method.invoke(listener, failedServices);
//then
verify(proxy, times(0)).registerService(serviceName, groupName, instance);
}
@Test
public void testRedoRegisterEachService3()
throws NacosException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException, InterruptedException {
//given
NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class);
NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy);
String serviceName = "service1";
String groupName = "group1";
Instance instance = new Instance();
listener.cacheInstanceForRedo(serviceName, groupName, instance);
Set<String> failedServices = new ConcurrentHashSet<>();
failedServices.add(NamingUtils.getGroupedName(serviceName, groupName));
//when
Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected");
connected.setAccessible(true);
connected.setBoolean(listener, true);
Field executorFiled = NamingGrpcConnectionEventListener.class.getDeclaredField("redoExecutorService");
executorFiled.setAccessible(true);
ScheduledExecutorService executorService = (ScheduledExecutorService) executorFiled.get(listener);
Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoRegisterEachService", Set.class);
method.setAccessible(true);
executorService.schedule(() -> method.invoke(listener, failedServices), 0, TimeUnit.MILLISECONDS);
//then
TimeUnit.MILLISECONDS.sleep(100L);
verify(proxy, times(1)).registerService(serviceName, groupName, instance);
}
@Test
public void testRedoSubscribe1()
throws NacosException, NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
//given
NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class);
NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy);
String fullServiceName = "group1@@service1";
String cluster = "cluster1";
listener.cacheSubscriberForRedo(fullServiceName, cluster);
Set<String> failedSubscribes = new ConcurrentHashSet<>();
failedSubscribes.add(ServiceInfo.getKey(fullServiceName, cluster));
//when
Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected");
connected.setAccessible(true);
connected.setBoolean(listener, true);
Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoSubscribe", Set.class);
method.setAccessible(true);
method.invoke(listener, failedSubscribes);
//then
ServiceInfo info = ServiceInfo.fromKey(fullServiceName);
verify(proxy, times(1)).subscribe(info.getName(), info.getGroupName(), cluster);
}
@Test
public void testRedoSubscribe2()
throws NacosException, NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
//given
NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class);
NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy);
String fullServiceName = "group1@@service1";
String cluster = "cluster1";
listener.cacheSubscriberForRedo(fullServiceName, cluster);
Set<String> failedSubscribes = new ConcurrentHashSet<>();
failedSubscribes.add(ServiceInfo.getKey(fullServiceName, cluster));
//when
Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected");
connected.setAccessible(true);
connected.setBoolean(listener, false);
Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoSubscribe", Set.class);
method.setAccessible(true);
method.invoke(listener, failedSubscribes);
//then
ServiceInfo info = ServiceInfo.fromKey(fullServiceName);
verify(proxy, times(0)).subscribe(info.getName(), info.getGroupName(), cluster);
}
@Test
public void testRedoSubscribe3()
throws NacosException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException, InterruptedException {
//given
NamingGrpcClientProxy proxy = mock(NamingGrpcClientProxy.class);
NamingGrpcConnectionEventListener listener = new NamingGrpcConnectionEventListener(proxy);
String fullServiceName = "group1@@service1";
String cluster = "cluster1";
listener.cacheSubscriberForRedo(fullServiceName, cluster);
Set<String> failedSubscribes = new ConcurrentHashSet<>();
failedSubscribes.add(ServiceInfo.getKey(fullServiceName, cluster));
//when
Field connected = NamingGrpcConnectionEventListener.class.getDeclaredField("connected");
connected.setAccessible(true);
connected.setBoolean(listener, true);
Field executorFiled = NamingGrpcConnectionEventListener.class.getDeclaredField("redoExecutorService");
executorFiled.setAccessible(true);
ScheduledExecutorService executorService = (ScheduledExecutorService) executorFiled.get(listener);
Method method = NamingGrpcConnectionEventListener.class.getDeclaredMethod("redoSubscribe", Set.class);
method.setAccessible(true);
executorService.schedule(() -> method.invoke(listener, failedSubscribes), 0, TimeUnit.MILLISECONDS);
//then
TimeUnit.MILLISECONDS.sleep(100L);
ServiceInfo info = ServiceInfo.fromKey(fullServiceName);
verify(proxy, times(1)).subscribe(info.getName(), info.getGroupName(), cluster);
}
}

View File

@ -0,0 +1,202 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote.gprc.redo;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData;
import com.alibaba.nacos.common.utils.ReflectUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(MockitoJUnitRunner.class)
public class NamingGrpcRedoServiceTest {
private static final String SERVICE = "service";
private static final String GROUP = "group";
private static final String CLUSTER = "cluster";
@Mock
private NamingGrpcClientProxy clientProxy;
private NamingGrpcRedoService redoService;
@Before
public void setUp() throws Exception {
redoService = new NamingGrpcRedoService(clientProxy);
ScheduledExecutorService redoExecutor = (ScheduledExecutorService) ReflectUtils
.getFieldValue(redoService, "redoExecutor");
redoExecutor.shutdownNow();
}
@After
public void tearDown() throws Exception {
redoService.shutdown();
}
@Test
public void testOnConnected() {
assertFalse(redoService.isConnected());
redoService.onConnected();
assertTrue(redoService.isConnected());
}
@Test
public void testOnDisConnect() {
redoService.onConnected();
redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance());
redoService.instanceRegistered(SERVICE, GROUP);
redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER);
redoService.subscriberRegistered(SERVICE, GROUP, CLUSTER);
assertTrue(redoService.isConnected());
assertTrue(redoService.findInstanceRedoData().isEmpty());
assertTrue(redoService.findSubscriberRedoData().isEmpty());
redoService.onDisConnect();
assertFalse(redoService.isConnected());
assertFalse(redoService.findInstanceRedoData().isEmpty());
assertFalse(redoService.findSubscriberRedoData().isEmpty());
}
@Test
public void testCacheInstanceForRedo() {
ConcurrentMap<String, InstanceRedoData> registeredInstances = getInstanceRedoDataMap();
assertTrue(registeredInstances.isEmpty());
Instance instance = new Instance();
redoService.cacheInstanceForRedo(SERVICE, GROUP, instance);
assertFalse(registeredInstances.isEmpty());
InstanceRedoData actual = registeredInstances.entrySet().iterator().next().getValue();
assertEquals(SERVICE, actual.getServiceName());
assertEquals(GROUP, actual.getGroupName());
assertEquals(instance, actual.get());
assertFalse(actual.isRegistered());
assertFalse(actual.isUnregistering());
}
@Test
public void testInstanceRegistered() {
ConcurrentMap<String, InstanceRedoData> registeredInstances = getInstanceRedoDataMap();
redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance());
redoService.instanceRegistered(SERVICE, GROUP);
InstanceRedoData actual = registeredInstances.entrySet().iterator().next().getValue();
assertTrue(actual.isRegistered());
}
@Test
public void testInstanceDeregister() {
ConcurrentMap<String, InstanceRedoData> registeredInstances = getInstanceRedoDataMap();
redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance());
redoService.instanceDeregister(SERVICE, GROUP);
InstanceRedoData actual = registeredInstances.entrySet().iterator().next().getValue();
assertTrue(actual.isUnregistering());
}
@Test
public void testRemoveInstanceForRedo() {
ConcurrentMap<String, InstanceRedoData> registeredInstances = getInstanceRedoDataMap();
assertTrue(registeredInstances.isEmpty());
redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance());
assertFalse(registeredInstances.isEmpty());
redoService.removeInstanceForRedo(SERVICE, GROUP);
assertTrue(registeredInstances.isEmpty());
}
@Test
public void testFindInstanceRedoData() {
redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance());
assertFalse(redoService.findInstanceRedoData().isEmpty());
redoService.instanceRegistered(SERVICE, GROUP);
assertTrue(redoService.findInstanceRedoData().isEmpty());
redoService.instanceDeregister(SERVICE, GROUP);
assertFalse(redoService.findInstanceRedoData().isEmpty());
}
@SuppressWarnings("all")
private ConcurrentMap<String, InstanceRedoData> getInstanceRedoDataMap() {
return (ConcurrentMap<String, InstanceRedoData>) ReflectUtils.getFieldValue(redoService, "registeredInstances");
}
@Test
public void testCacheSubscriberForRedo() {
ConcurrentMap<String, SubscriberRedoData> subscribes = getSubscriberRedoDataMap();
assertTrue(subscribes.isEmpty());
redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER);
assertFalse(subscribes.isEmpty());
SubscriberRedoData actual = subscribes.entrySet().iterator().next().getValue();
assertEquals(SERVICE, actual.getServiceName());
assertEquals(GROUP, actual.getGroupName());
assertEquals(CLUSTER, actual.get());
assertFalse(actual.isRegistered());
assertFalse(actual.isUnregistering());
}
@Test
public void testSubscriberRegistered() {
ConcurrentMap<String, SubscriberRedoData> subscribes = getSubscriberRedoDataMap();
redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER);
redoService.subscriberRegistered(SERVICE, GROUP, CLUSTER);
SubscriberRedoData actual = subscribes.entrySet().iterator().next().getValue();
assertTrue(actual.isRegistered());
}
@Test
public void testSubscriberDeregister() {
ConcurrentMap<String, SubscriberRedoData> subscribes = getSubscriberRedoDataMap();
redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER);
redoService.subscriberDeregister(SERVICE, GROUP, CLUSTER);
SubscriberRedoData actual = subscribes.entrySet().iterator().next().getValue();
assertTrue(actual.isUnregistering());
}
@Test
public void testRemoveSubscriberForRedo() {
ConcurrentMap<String, SubscriberRedoData> subscribes = getSubscriberRedoDataMap();
assertTrue(subscribes.isEmpty());
redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER);
assertFalse(subscribes.isEmpty());
redoService.removeSubscriberForRedo(SERVICE, GROUP, CLUSTER);
assertTrue(subscribes.isEmpty());
}
@Test
public void testFindSubscriberRedoData() {
redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER);
assertFalse(redoService.findSubscriberRedoData().isEmpty());
redoService.subscriberRegistered(SERVICE, GROUP, CLUSTER);
assertTrue(redoService.findSubscriberRedoData().isEmpty());
redoService.subscriberDeregister(SERVICE, GROUP, CLUSTER);
assertFalse(redoService.findSubscriberRedoData().isEmpty());
}
@SuppressWarnings("all")
private ConcurrentMap<String, SubscriberRedoData> getSubscriberRedoDataMap() {
return (ConcurrentMap<String, SubscriberRedoData>) ReflectUtils.getFieldValue(redoService, "subscribes");
}
}

View File

@ -0,0 +1,154 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote.gprc.redo;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashSet;
import java.util.Set;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class RedoScheduledTaskTest {
private static final String SERVICE = "service";
private static final String GROUP = "group";
private static final String CLUSTER = "cluster";
private static final Instance INSTANCE = new Instance();
@Mock
private NamingGrpcClientProxy clientProxy;
@Mock
private NamingGrpcRedoService redoService;
private RedoScheduledTask redoTask;
@Before
public void setUp() throws Exception {
redoTask = new RedoScheduledTask(clientProxy, redoService);
when(clientProxy.isEnable()).thenReturn(true);
when(redoService.isConnected()).thenReturn(true);
}
@Test
public void testRunRedoRegisterInstance() throws NacosException {
Set<InstanceRedoData> mockData = generateMockInstanceData(false, false);
when(redoService.findInstanceRedoData()).thenReturn(mockData);
redoTask.run();
verify(clientProxy).doRegisterService(SERVICE, GROUP, INSTANCE);
}
@Test
public void testRunRedoDeregisterInstance() throws NacosException {
Set<InstanceRedoData> mockData = generateMockInstanceData(true, true);
when(redoService.findInstanceRedoData()).thenReturn(mockData);
redoTask.run();
verify(clientProxy).doDeregisterService(SERVICE, GROUP, INSTANCE);
}
@Test
public void testRunRedoRemoveInstanceRedoData() throws NacosException {
Set<InstanceRedoData> mockData = generateMockInstanceData(false, true);
when(redoService.findInstanceRedoData()).thenReturn(mockData);
redoTask.run();
verify(redoService).removeInstanceForRedo(SERVICE, GROUP);
}
@Test
public void testRunRedoRegisterInstanceWithClientDisabled() throws NacosException {
when(clientProxy.isEnable()).thenReturn(false);
Set<InstanceRedoData> mockData = generateMockInstanceData(false, false);
when(redoService.findInstanceRedoData()).thenReturn(mockData);
redoTask.run();
verify(clientProxy, never()).doRegisterService(SERVICE, GROUP, INSTANCE);
}
private Set<InstanceRedoData> generateMockInstanceData(boolean registered, boolean unregistering) {
InstanceRedoData redoData = InstanceRedoData.build(SERVICE, GROUP, INSTANCE);
redoData.setRegistered(registered);
redoData.setUnregistering(unregistering);
Set<InstanceRedoData> result = new HashSet<>();
result.add(redoData);
return result;
}
@Test
public void testRunRedoRegisterSubscriber() throws NacosException {
Set<SubscriberRedoData> mockData = generateMockSubscriberData(false, false);
when(redoService.findSubscriberRedoData()).thenReturn(mockData);
redoTask.run();
verify(clientProxy).doSubscribe(SERVICE, GROUP, CLUSTER);
}
@Test
public void testRunRedoDeregisterSubscriber() throws NacosException {
Set<SubscriberRedoData> mockData = generateMockSubscriberData(true, true);
when(redoService.findSubscriberRedoData()).thenReturn(mockData);
redoTask.run();
verify(clientProxy).doUnsubscribe(SERVICE, GROUP, CLUSTER);
}
@Test
public void testRunRedoRemoveSubscriberRedoData() throws NacosException {
Set<SubscriberRedoData> mockData = generateMockSubscriberData(false, true);
when(redoService.findSubscriberRedoData()).thenReturn(mockData);
redoTask.run();
verify(redoService).removeSubscriberForRedo(SERVICE, GROUP, CLUSTER);
}
@Test
public void testRunRedoRegisterSubscriberWithClientDisabled() throws NacosException {
when(clientProxy.isEnable()).thenReturn(false);
Set<SubscriberRedoData> mockData = generateMockSubscriberData(false, false);
when(redoService.findSubscriberRedoData()).thenReturn(mockData);
redoTask.run();
verify(clientProxy, never()).doSubscribe(SERVICE, GROUP, CLUSTER);
}
private Set<SubscriberRedoData> generateMockSubscriberData(boolean registered, boolean unregistering) {
SubscriberRedoData redoData = SubscriberRedoData.build(SERVICE, GROUP, CLUSTER);
redoData.setRegistered(registered);
redoData.setUnregistering(unregistering);
Set<SubscriberRedoData> result = new HashSet<>();
result.add(redoData);
return result;
}
@Test
public void testRunRedoWithDisconnection() {
when(redoService.isConnected()).thenReturn(false);
redoTask.run();
verify(redoService, never()).findInstanceRedoData();
verify(redoService, never()).findSubscriberRedoData();
}
}