Add unit test for naming push (#4545)

* Add single push task for retry push

* Add unit test for Push task

* Add unit test for Push executor

* Add unit test for Subscriber service v2

* PushService.java --> UdpPushService

* Refactor and add unit test for ClientInfo

* Solve Conflict and compile problem
This commit is contained in:
杨翊 SionYang 2020-12-22 17:56:34 +08:00 committed by GitHub
parent 452410825f
commit 4445bad715
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1007 additions and 157 deletions

View File

@ -121,7 +121,7 @@ public class ConfigQueryRequest extends AbstractConfigRequest {
}
public boolean isNotify() {
String notify = getHeader("notify","false");
String notify = getHeader("notify", "false");
return Boolean.valueOf(notify);
}
}

View File

@ -54,7 +54,7 @@ public class NotifySubscriberRequest extends ServerPushRequest {
* Build fail response.
*
* @param message error message
* @return faile response
* @return fail response
*/
public static NotifySubscriberRequest buildFailResponse(String message) {
NotifySubscriberRequest result = new NotifySubscriberRequest();

View File

@ -16,8 +16,6 @@
package com.alibaba.nacos.api.remote;
import com.alibaba.nacos.api.remote.PushCallBack;
/**
* abstract callback of push service.
*

View File

@ -98,7 +98,7 @@ public class DefaultRequestFuture implements RequestFuture {
synchronized (this) {
notifyAll();
}
callBacInvoke();
}
@ -109,13 +109,14 @@ public class DefaultRequestFuture implements RequestFuture {
synchronized (this) {
notifyAll();
}
callBacInvoke();
}
private void callBacInvoke(){
private void callBacInvoke() {
if (requestCallBack != null) {
requestCallBack.getExcutor().execute(new CallBackHandler());
}else{
requestCallBack.getExecutor().execute(new CallBackHandler());
} else {
new CallBackHandler().run();
}
}

View File

@ -25,22 +25,22 @@ package com.alibaba.nacos.api.remote;
public interface PushCallBack {
/**
* timie out mills.
* Push timeout mills.
*
* @return
* @return timeout milliseconds
*/
public long getTimeout();
long getTimeout();
/**
* invoked on success.
*/
public void onSuccess();
void onSuccess();
/**
* invoked on fail.
*
* @param e exception throwed.
*/
public void onFail(Throwable e);
void onFail(Throwable e);
}

View File

@ -21,7 +21,7 @@ import com.alibaba.nacos.api.remote.response.Response;
import java.util.concurrent.Executor;
/**
* call bakck for request.
* call back for request.
*
* @author liuzunfei
* @version $Id: PushCallBack.java, v 0.1 2020年09月01日 6:33 PM liuzunfei Exp $

View File

@ -61,7 +61,7 @@ public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
tasks = new ConcurrentHashMap<>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);

View File

@ -343,7 +343,7 @@ public class AsyncNotifyService {
}
@Override
public Executor getExcutor() {
public Executor getExecutor() {
return ConfigExecutor.getConfigSubServiceExecutor();
}

View File

@ -30,7 +30,7 @@ import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.api.naming.pojo.healthcheck.HealthCheckType;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.naming.web.CanDistro;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
@ -63,7 +63,7 @@ public class HealthController {
private ServiceManager serviceManager;
@Autowired
private PushService pushService;
private UdpPushService pushService;
/**
* Just a health check.

View File

@ -35,7 +35,7 @@ import com.alibaba.nacos.naming.misc.SwitchEntry;
import com.alibaba.nacos.naming.misc.SwitchManager;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.naming.remote.udp.AckEntry;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.fasterxml.jackson.databind.JsonNode;
@ -105,7 +105,7 @@ public class OperatorController {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
List<AckEntry> failedPushes = PushService.getFailedPushes();
List<AckEntry> failedPushes = UdpPushService.getFailedPushes();
int failedPushCount = MetricsMonitor.getFailedPushMonitor().get();
int totalPushCount = MetricsMonitor.getTotalPushMonitor().get();
result.put("succeed", totalPushCount - failedPushCount);
@ -130,7 +130,7 @@ public class OperatorController {
}
if (reset) {
PushService.resetPushState();
UdpPushService.resetPushState();
}
result.put("reset", reset);

View File

@ -30,7 +30,7 @@ import com.alibaba.nacos.naming.push.ClientInfo;
import com.alibaba.nacos.naming.push.DataSource;
import com.alibaba.nacos.naming.push.NamingSubscriberServiceV1Impl;
import com.alibaba.nacos.naming.push.PushClient;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
@ -55,7 +55,7 @@ public class InstanceOperatorServiceImpl implements InstanceOperator {
private final SwitchDomain switchDomain;
private final PushService pushService;
private final UdpPushService pushService;
private final NamingSubscriberServiceV1Impl subscriberServiceV1;
@ -81,7 +81,7 @@ public class InstanceOperatorServiceImpl implements InstanceOperator {
};
public InstanceOperatorServiceImpl(ServiceManager serviceManager, SwitchDomain switchDomain,
PushService pushService, NamingSubscriberServiceV1Impl subscriberServiceV1) {
UdpPushService pushService, NamingSubscriberServiceV1Impl subscriberServiceV1) {
this.serviceManager = serviceManager;
this.switchDomain = switchDomain;
this.pushService = pushService;

View File

@ -29,7 +29,7 @@ import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.naming.selector.NoneSelector;
import com.alibaba.nacos.naming.selector.Selector;
import com.fasterxml.jackson.annotation.JsonIgnore;
@ -108,8 +108,8 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
}
@JsonIgnore
public PushService getPushService() {
return ApplicationUtils.getBean(PushService.class);
public UdpPushService getPushService() {
return ApplicationUtils.getBean(UdpPushService.class);
}
public long getIpDeleteTimeout() {

View File

@ -40,7 +40,7 @@ import com.alibaba.nacos.naming.misc.Synchronizer;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.InstanceOperationContext;
import com.alibaba.nacos.naming.pojo.InstanceOperationInfo;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.Sets;
@ -100,7 +100,7 @@ public class ServiceManager implements RecordListener<Service> {
private final ServerMemberManager memberManager;
private final PushService pushService;
private final UdpPushService pushService;
private final RaftPeerSet raftPeerSet;
@ -116,7 +116,7 @@ public class ServiceManager implements RecordListener<Service> {
private int cleanEmptyServicePeriod;
public ServiceManager(SwitchDomain switchDomain, DistroMapper distroMapper, ServerMemberManager memberManager,
PushService pushService, RaftPeerSet raftPeerSet) {
UdpPushService pushService, RaftPeerSet raftPeerSet) {
this.switchDomain = switchDomain;
this.distroMapper = distroMapper;
this.memberManager = memberManager;

View File

@ -31,7 +31,7 @@ import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
@ -52,8 +52,8 @@ public class ClientBeatCheckTask implements BeatCheckTask {
}
@JsonIgnore
public PushService getPushService() {
return ApplicationUtils.getBean(PushService.class);
public UdpPushService getPushService() {
return ApplicationUtils.getBean(UdpPushService.class);
}
@JsonIgnore

View File

@ -23,7 +23,7 @@ import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
@ -43,8 +43,8 @@ public class ClientBeatProcessor implements BeatProcessor {
private Service service;
@JsonIgnore
public PushService getPushService() {
return ApplicationUtils.getBean(PushService.class);
public UdpPushService getPushService() {
return ApplicationUtils.getBean(UdpPushService.class);
}
public RsInfo getRsInfo() {

View File

@ -23,7 +23,7 @@ import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -44,7 +44,7 @@ public class HealthCheckCommon {
private SwitchDomain switchDomain;
@Autowired
private PushService pushService;
private UdpPushService pushService;
/**
* Re-evaluate check responsce time.

View File

@ -28,127 +28,82 @@ import org.codehaus.jackson.util.VersionUtil;
*/
public class ClientInfo {
public Version version = Version.unknownVersion();
public Version version;
public ClientType type = ClientType.UNKNOWN;
public ClientType type;
public ClientInfo(String userAgent) {
String versionStr = StringUtils.isEmpty(userAgent) ? StringUtils.EMPTY : userAgent;
if (versionStr.startsWith(ClientTypeDescription.JAVA_CLIENT)) {
type = ClientType.JAVA;
versionStr = versionStr.substring(versionStr.indexOf(":v") + 2, versionStr.length());
version = VersionUtil.parseVersion(versionStr);
return;
}
if (versionStr.startsWith(ClientTypeDescription.DNSF_CLIENT)) {
type = ClientType.DNS;
versionStr = versionStr.substring(versionStr.indexOf(":v") + 2, versionStr.length());
version = VersionUtil.parseVersion(versionStr);
return;
}
if (versionStr.startsWith(ClientTypeDescription.C_CLIENT)) {
type = ClientType.C;
versionStr = versionStr.substring(versionStr.indexOf(":v") + 2, versionStr.length());
version = VersionUtil.parseVersion(versionStr);
return;
}
if (versionStr.startsWith(ClientTypeDescription.SDK_CLIENT)) {
type = ClientType.JAVA_SDK;
versionStr = versionStr.substring(versionStr.indexOf(":v") + 2, versionStr.length());
version = VersionUtil.parseVersion(versionStr);
return;
}
if (versionStr.startsWith(UtilsAndCommons.NACOS_SERVER_HEADER)) {
type = ClientType.NACOS_SERVER;
versionStr = versionStr.substring(versionStr.indexOf(":v") + 2, versionStr.length());
version = VersionUtil.parseVersion(versionStr);
return;
}
if (versionStr.startsWith(ClientTypeDescription.NGINX_CLIENT)) {
type = ClientType.TENGINE;
versionStr = versionStr.substring(versionStr.indexOf(":v") + 2, versionStr.length());
version = VersionUtil.parseVersion(versionStr);
return;
}
this.type = ClientType.getType(userAgent);
if (versionStr.startsWith(ClientTypeDescription.CPP_CLIENT)) {
type = ClientType.C;
versionStr = versionStr.substring(versionStr.indexOf(":v") + 2, versionStr.length());
version = VersionUtil.parseVersion(versionStr);
return;
this.type = ClientType.C;
}
if (versionStr.startsWith(ClientTypeDescription.GO_CLIENT)) {
type = ClientType.GO;
versionStr = versionStr.substring(versionStr.indexOf(":v") + 2, versionStr.length());
version = VersionUtil.parseVersion(versionStr);
return;
this.version = parseVersion(versionStr.substring(versionStr.indexOf(":v") + 2));
}
private Version parseVersion(String versionStr) {
if (StringUtils.isBlank(versionStr) || ClientType.UNKNOWN.equals(this.type)) {
return Version.unknownVersion();
}
//we're not eager to implement other type yet
this.type = ClientType.UNKNOWN;
this.version = Version.unknownVersion();
return VersionUtil.parseVersion(versionStr);
}
public enum ClientType {
/**
* Go client type.
*/
GO,
GO(ClientTypeDescription.GO_CLIENT),
/**
* Java client type.
*/
JAVA,
JAVA(ClientTypeDescription.JAVA_CLIENT),
/**
* C client type.
*/
C,
C(ClientTypeDescription.C_CLIENT),
/**
* php client type.
*/
PHP,
PHP(ClientTypeDescription.PHP_CLIENT),
/**
* dns-f client type.
*/
DNS,
DNS(ClientTypeDescription.DNSF_CLIENT),
/**
* nginx client type.
*/
TENGINE,
TENGINE(ClientTypeDescription.NGINX_CLIENT),
/**
* sdk client type.
*/
JAVA_SDK,
JAVA_SDK(ClientTypeDescription.SDK_CLIENT),
/**
* Server notify each other.
*/
NACOS_SERVER,
NACOS_SERVER(UtilsAndCommons.NACOS_SERVER_HEADER),
/**
* Unknown client type.
*/
UNKNOWN
UNKNOWN(UtilsAndCommons.UNKNOWN_SITE);
private final String clientTypeDescription;
ClientType(String clientTypeDescription) {
this.clientTypeDescription = clientTypeDescription;
}
public String getClientTypeDescription() {
return clientTypeDescription;
}
public static ClientType getType(String userAgent) {
for (ClientType each : ClientType.values()) {
if (userAgent.startsWith(each.getClientTypeDescription())) {
return each;
}
}
return UNKNOWN;
}
}
public static class ClientTypeDescription {
@ -167,6 +122,7 @@ public class ClientInfo {
public static final String GO_CLIENT = "Nacos-Go-Client";
public static final String PHP_CLIENT = "Nacos-Php-Client";
}
}

View File

@ -17,7 +17,7 @@
package com.alibaba.nacos.naming.push;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
@ -63,7 +63,7 @@ import java.util.zip.GZIPOutputStream;
*/
@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
public class UdpPushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
@Autowired
private SwitchDomain switchDomain;
@ -99,7 +99,7 @@ public class PushService implements ApplicationContextAware, ApplicationListener
}
}
public PushService(UdpConnector udpConnector) {
public UdpPushService(UdpConnector udpConnector) {
this.udpConnector = udpConnector;
}

View File

@ -17,7 +17,7 @@
package com.alibaba.nacos.naming.push.v2.executor;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.naming.pojo.Subscriber;
/**

View File

@ -17,7 +17,7 @@
package com.alibaba.nacos.naming.push.v2.executor;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.naming.pojo.Subscriber;
import org.springframework.stereotype.Component;

View File

@ -18,7 +18,7 @@ package com.alibaba.nacos.naming.push.v2.executor;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.request.NotifySubscriberRequest;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.core.remote.RpcPushService;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.pojo.Subscriber;

View File

@ -18,9 +18,9 @@ package com.alibaba.nacos.naming.push.v2.executor;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import org.springframework.stereotype.Component;
/**
@ -31,9 +31,9 @@ import org.springframework.stereotype.Component;
@Component
public class PushExecutorUdpImpl implements PushExecutor {
private final PushService pushService;
private final UdpPushService pushService;
public PushExecutorUdpImpl(PushService pushService) {
public PushExecutorUdpImpl(UdpPushService pushService) {
this.pushService = pushService;
}

View File

@ -18,6 +18,10 @@ package com.alibaba.nacos.naming.push.v2.task;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import java.util.HashSet;
import java.util.Set;
/**
* Nacos naming push delay task.
@ -28,8 +32,23 @@ public class PushDelayTask extends AbstractDelayTask {
private final Service service;
private boolean pushToAll;
private Set<String> targetClients;
public PushDelayTask(Service service, long delay) {
this.service = service;
pushToAll = true;
targetClients = null;
setTaskInterval(delay);
setLastProcessTime(System.currentTimeMillis());
}
public PushDelayTask(Service service, long delay, String targetClient) {
this.service = service;
this.pushToAll = false;
this.targetClients = new HashSet<>(1);
this.targetClients.add(targetClient);
setTaskInterval(delay);
setLastProcessTime(System.currentTimeMillis());
}
@ -39,10 +58,26 @@ public class PushDelayTask extends AbstractDelayTask {
if (!(task instanceof PushDelayTask)) {
return;
}
PushDelayTask oldTask = (PushDelayTask) task;
if (isPushToAll() || oldTask.isPushToAll()) {
pushToAll = true;
targetClients = null;
} else {
targetClients.addAll(oldTask.getTargetClients());
}
setLastProcessTime(Math.min(getLastProcessTime(), task.getLastProcessTime()));
Loggers.PUSH.info("[PUSH] Task merge for {}", service);
}
public Service getService() {
return service;
}
public boolean isPushToAll() {
return pushToAll;
}
public Set<String> getTargetClients() {
return targetClients;
}
}

View File

@ -78,9 +78,10 @@ public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
@Override
public boolean process(NacosTask task) {
Service service = ((PushDelayTask) task).getService();
NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service,
new PushExecuteTask(service, executeEngine, ((PushDelayTask) task).getLastProcessTime()));
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
}

View File

@ -17,7 +17,7 @@
package com.alibaba.nacos.naming.push.v2.task;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
@ -25,9 +25,10 @@ import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.NoRequiredRetryException;
import com.alibaba.nacos.naming.utils.Constants;
import com.alibaba.nacos.naming.utils.ServiceUtil;
import java.util.Collection;
/**
* Nacos naming push execute task.
*
@ -39,15 +40,12 @@ public class PushExecuteTask extends AbstractExecuteTask {
private final PushDelayTaskExecuteEngine delayTaskEngine;
/**
* Record the push task start time from delay push.
*/
private final long pushTaskStartTime;
private final PushDelayTask delayTask;
public PushExecuteTask(Service service, PushDelayTaskExecuteEngine delayTaskEngine, long pushTaskStartTime) {
public PushExecuteTask(Service service, PushDelayTaskExecuteEngine delayTaskEngine, PushDelayTask delayTask) {
this.service = service;
this.delayTaskEngine = delayTaskEngine;
this.pushTaskStartTime = pushTaskStartTime;
this.delayTask = delayTask;
}
@Override
@ -55,11 +53,11 @@ public class PushExecuteTask extends AbstractExecuteTask {
try {
ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
serviceInfo = ServiceUtil.selectInstances(serviceInfo, false, true);
for (String each : delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)) {
for (String each : getTargetClientIds()) {
Subscriber subscriber = delayTaskEngine.getClientManager().getClient(each).getSubscriber(service);
delayTaskEngine.getPushExecutor()
.doPushWithCallback(each, subscriber, handleClusterData(serviceInfo, subscriber),
new NamingPushCallback(subscriber, serviceInfo));
new NamingPushCallback(each, subscriber, serviceInfo));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
@ -67,6 +65,11 @@ public class PushExecuteTask extends AbstractExecuteTask {
}
}
private Collection<String> getTargetClientIds() {
return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)
: delayTask.getTargetClients();
}
/**
* For adapt push cluster feature for v1.x.
*
@ -83,6 +86,8 @@ public class PushExecuteTask extends AbstractExecuteTask {
private class NamingPushCallback implements PushCallBack {
private final String clientId;
private final Subscriber subscriber;
private final ServiceInfo serviceInfo;
@ -92,7 +97,8 @@ public class PushExecuteTask extends AbstractExecuteTask {
*/
private final long executeStartTime;
private NamingPushCallback(Subscriber subscriber, ServiceInfo serviceInfo) {
private NamingPushCallback(String clientId, Subscriber subscriber, ServiceInfo serviceInfo) {
this.clientId = clientId;
this.subscriber = subscriber;
this.serviceInfo = serviceInfo;
this.executeStartTime = System.currentTimeMillis();
@ -100,14 +106,15 @@ public class PushExecuteTask extends AbstractExecuteTask {
@Override
public long getTimeout() {
return Constants.DEFAULT_PUSH_TIMEOUT_MILLS;
// TODO timeout should can be config
return 3000L;
}
@Override
public void onSuccess() {
long pushFinishTime = System.currentTimeMillis();
long pushCostTimeForNetWork = pushFinishTime - executeStartTime;
long pushCostTimeForAll = pushFinishTime - pushTaskStartTime;
long pushCostTimeForAll = pushFinishTime - delayTask.getLastProcessTime();
long serviceLevelAgreementTime = pushFinishTime - service.getLastUpdatedTime();
Loggers.PUSH.info("[PUSH-SUCC] {}ms, all delay time {}ms, SLA {}ms, {}, DataSize={}, target={}",
pushCostTimeForNetWork, pushCostTimeForAll, serviceLevelAgreementTime, service,
@ -125,8 +132,7 @@ public class PushExecuteTask extends AbstractExecuteTask {
MetricsMonitor.incrementFailPush();
if (!(e instanceof NoRequiredRetryException)) {
Loggers.PUSH.error("Reason detail: ", e);
// TODO should only push for single client
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L, clientId));
}
}
}

View File

@ -17,7 +17,6 @@
package com.alibaba.nacos.naming.remote.udp;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.misc.GlobalExecutor;

View File

@ -24,7 +24,7 @@ import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.healthcheck.HealthCheckProcessorDelegate;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.junit.Before;
@ -76,7 +76,7 @@ public class BaseTest {
protected HealthCheckProcessorDelegate delegate;
@Mock
protected PushService pushService;
protected UdpPushService pushService;
@Spy
private MockEnvironment environment;
@ -97,7 +97,7 @@ public class BaseTest {
}
protected void mockInjectPushServer() {
doReturn(pushService).when(context).getBean(PushService.class);
doReturn(pushService).when(context).getBean(UdpPushService.class);
}
protected void mockInjectHealthCheckProcessor() {

View File

@ -22,7 +22,7 @@ import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.UdpPushService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -56,7 +56,7 @@ public class ClientBeatCheckTaskTest {
private GlobalConfig globalConfig;
@Mock
private PushService pushService;
private UdpPushService pushService;
@Mock
private SwitchDomain switchDomain;

View File

@ -0,0 +1,103 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ClientInfoTest {
private final String testVersionString = "2.0.0-ALPHA";
@Test
public void testGetClientInfoForJava() {
String userAgent = getUserAgent(ClientInfo.ClientTypeDescription.JAVA_CLIENT);
ClientInfo actual = new ClientInfo(userAgent);
assertEquals(ClientInfo.ClientType.JAVA, actual.type);
assertEquals(testVersionString, actual.version.toString());
}
@Test
public void testGetClientInfoForGo() {
String userAgent = getUserAgent(ClientInfo.ClientTypeDescription.GO_CLIENT);
ClientInfo actual = new ClientInfo(userAgent);
assertEquals(ClientInfo.ClientType.GO, actual.type);
assertEquals(testVersionString, actual.version.toString());
}
@Test
public void testGetClientInfoForC() {
String userAgent = getUserAgent(ClientInfo.ClientTypeDescription.C_CLIENT);
ClientInfo actual = new ClientInfo(userAgent);
assertEquals(ClientInfo.ClientType.C, actual.type);
assertEquals(testVersionString, actual.version.toString());
}
@Test
public void testGetClientInfoForCpp() {
String userAgent = getUserAgent(ClientInfo.ClientTypeDescription.CPP_CLIENT);
ClientInfo actual = new ClientInfo(userAgent);
assertEquals(ClientInfo.ClientType.C, actual.type);
assertEquals(testVersionString, actual.version.toString());
}
@Test
public void testGetClientInfoForDns() {
String userAgent = getUserAgent(ClientInfo.ClientTypeDescription.DNSF_CLIENT);
ClientInfo actual = new ClientInfo(userAgent);
assertEquals(ClientInfo.ClientType.DNS, actual.type);
assertEquals(testVersionString, actual.version.toString());
}
@Test
public void testGetClientInfoForSdk() {
String userAgent = getUserAgent(ClientInfo.ClientTypeDescription.SDK_CLIENT);
ClientInfo actual = new ClientInfo(userAgent);
assertEquals(ClientInfo.ClientType.JAVA_SDK, actual.type);
assertEquals(testVersionString, actual.version.toString());
}
@Test
public void testGetClientInfoForServer() {
String userAgent = getUserAgent(UtilsAndCommons.NACOS_SERVER_HEADER);
ClientInfo actual = new ClientInfo(userAgent);
assertEquals(ClientInfo.ClientType.NACOS_SERVER, actual.type);
assertEquals(testVersionString, actual.version.toString());
}
@Test
public void testGetClientInfoForNginx() {
String userAgent = getUserAgent(ClientInfo.ClientTypeDescription.NGINX_CLIENT);
ClientInfo actual = new ClientInfo(userAgent);
assertEquals(ClientInfo.ClientType.TENGINE, actual.type);
assertEquals(testVersionString, actual.version.toString());
}
@Test
public void testGetClientInfoForUnknown() {
String userAgent = getUserAgent("TestClient");
ClientInfo actual = new ClientInfo(userAgent);
assertEquals(ClientInfo.ClientType.UNKNOWN, actual.type);
assertEquals("0.0.0", actual.version.toString());
}
private String getUserAgent(String client) {
return client + ":v" + testVersionString;
}
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push.v2;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.task.PushDelayTask;
import com.alibaba.nacos.naming.push.v2.task.PushDelayTaskExecuteEngine;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class NamingSubscriberServiceV2ImplTest {
private final String testClientId = "testClientId";
private final Service service = Service.newService("N", "G", "S");
private final Service service1 = Service.newService("N", "G1", "S1");
@Mock
private ClientManagerDelegate clientManager;
@Mock
private ClientServiceIndexesManager indexesManager;
@Mock
private PushDelayTaskExecuteEngine delayTaskEngine;
@Mock
private Client client;
private NamingSubscriberServiceV2Impl subscriberService;
@Before
public void setUp() throws Exception {
subscriberService = new NamingSubscriberServiceV2Impl(clientManager, indexesManager, null, null);
ReflectionTestUtils.setField(subscriberService, "delayTaskEngine", delayTaskEngine);
when(indexesManager.getAllClientsSubscribeService(service)).thenReturn(Collections.singletonList(testClientId));
when(indexesManager.getAllClientsSubscribeService(service1))
.thenReturn(Collections.singletonList(testClientId));
Collection<Service> services = new LinkedList<>();
services.add(service);
services.add(service1);
when(indexesManager.getSubscribedService()).thenReturn(services);
when(clientManager.getClient(testClientId)).thenReturn(client);
when(client.getSubscriber(service)).thenReturn(
new Subscriber("1.1.1.1:1111", "Test", "unknown", "1.1.1.1", "N", service.getGroupedServiceName(), 0));
when(client.getSubscriber(service1)).thenReturn(
new Subscriber("1.1.1.1:1111", "Test", "unknown", "1.1.1.1", "N", service1.getGroupedServiceName(), 0));
}
@Test
public void testGetSubscribersByString() {
Collection<Subscriber> actual = subscriberService
.getSubscribers(service.getNamespace(), service.getGroupedServiceName());
assertEquals(1, actual.size());
assertEquals(service.getGroupedServiceName(), actual.iterator().next().getServiceName());
}
@Test
public void testGetSubscribersByService() {
Collection<Subscriber> actual = subscriberService.getSubscribers(service);
assertEquals(1, actual.size());
assertEquals(service.getGroupedServiceName(), actual.iterator().next().getServiceName());
}
@Test
public void testGetFuzzySubscribersByString() {
Collection<Subscriber> actual = subscriberService
.getFuzzySubscribers(service.getNamespace(), service.getGroupedServiceName());
assertEquals(2, actual.size());
}
@Test
public void testGetFuzzySubscribersByService() {
Collection<Subscriber> actual = subscriberService.getFuzzySubscribers(service);
assertEquals(2, actual.size());
}
@Test
public void onEvent() {
subscriberService.onEvent(new ServiceEvent.ServiceChangedEvent(service));
verify(delayTaskEngine).addTask(eq(service), any(PushDelayTask.class));
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push.v2.executor;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.naming.pojo.Subscriber;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.UUID;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class PushExecutorDelegateTest {
private final String udpClientId = "1.1.1.1:60000";
private final String rpcClientId = UUID.randomUUID().toString();
@Mock
private PushExecutorRpcImpl pushExecutorRpc;
@Mock
private PushExecutorUdpImpl pushExecutorUdp;
@Mock
private Subscriber subscriber;
@Mock
private PushCallBack pushCallBack;
private ServiceInfo serviceInfo;
private PushExecutorDelegate delegate;
@Before
public void setUp() throws Exception {
serviceInfo = new ServiceInfo("G@@S");
delegate = new PushExecutorDelegate(pushExecutorRpc, pushExecutorUdp);
}
@Test
public void testDoPushForUdp() {
delegate.doPush(udpClientId, subscriber, serviceInfo);
verify(pushExecutorUdp).doPush(udpClientId, subscriber, serviceInfo);
}
@Test
public void testDoPushForRpc() {
delegate.doPush(rpcClientId, subscriber, serviceInfo);
verify(pushExecutorRpc).doPush(rpcClientId, subscriber, serviceInfo);
}
@Test
public void doPushWithCallbackForUdp() {
delegate.doPushWithCallback(udpClientId, subscriber, serviceInfo, pushCallBack);
verify(pushExecutorUdp).doPushWithCallback(udpClientId, subscriber, serviceInfo, pushCallBack);
}
@Test
public void doPushWithCallbackForRpc() {
delegate.doPushWithCallback(rpcClientId, subscriber, serviceInfo, pushCallBack);
verify(pushExecutorRpc).doPushWithCallback(rpcClientId, subscriber, serviceInfo, pushCallBack);
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push.v2.executor;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.request.NotifySubscriberRequest;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.core.remote.RpcPushService;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class PushExecutorRpcImplTest {
private final String rpcClientId = UUID.randomUUID().toString();
@Mock
private RpcPushService pushService;
@Mock
private Subscriber subscriber;
@Mock
private PushCallBack pushCallBack;
private ServiceInfo serviceInfo;
private PushExecutorRpcImpl pushExecutor;
@Before
public void setUp() throws Exception {
serviceInfo = new ServiceInfo("G@@S");
pushExecutor = new PushExecutorRpcImpl(pushService);
doAnswer(new CallbackAnswer()).when(pushService)
.pushWithCallback(eq(rpcClientId), any(NotifySubscriberRequest.class), eq(pushCallBack),
eq(GlobalExecutor.getCallbackExecutor()));
}
@Test
public void testDoPush() {
pushExecutor.doPush(rpcClientId, subscriber, serviceInfo);
verify(pushService).pushWithoutAck(eq(rpcClientId), any(NotifySubscriberRequest.class));
}
@Test
public void testDoPushWithCallback() {
pushExecutor.doPushWithCallback(rpcClientId, subscriber, serviceInfo, pushCallBack);
verify(pushCallBack).onSuccess();
}
private class CallbackAnswer implements Answer<Void> {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
NotifySubscriberRequest pushRequest = invocationOnMock.getArgument(1);
assertEquals(serviceInfo, pushRequest.getServiceInfo());
PushCallBack callBack = invocationOnMock.getArgument(2);
callBack.onSuccess();
return null;
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push.v2.executor;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.UdpPushService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class PushExecutorUdpImplTest {
private final String rpcClientId = "1.1.1.1:10000";
@Mock
private UdpPushService pushService;
@Mock
private Subscriber subscriber;
@Mock
private PushCallBack pushCallBack;
private ServiceInfo serviceInfo;
private PushExecutorUdpImpl pushExecutor;
@Before
public void setUp() throws Exception {
serviceInfo = new ServiceInfo("G@@S");
pushExecutor = new PushExecutorUdpImpl(pushService);
doAnswer(new CallbackAnswer()).when(pushService)
.pushDataWithCallback(eq(subscriber), any(ServiceInfo.class), eq(pushCallBack));
}
@Test
public void testDoPush() {
pushExecutor.doPush(rpcClientId, subscriber, serviceInfo);
verify(pushService).pushDataWithoutCallback(eq(subscriber), any(ServiceInfo.class));
}
@Test
public void testDoPushWithCallback() {
pushExecutor.doPushWithCallback(rpcClientId, subscriber, serviceInfo, pushCallBack);
verify(pushCallBack).onSuccess();
}
private static class CallbackAnswer implements Answer<Void> {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
ServiceInfo serviceInfo = invocationOnMock.getArgument(1);
assertEquals("G@@S", serviceInfo.getName());
PushCallBack callBack = invocationOnMock.getArgument(2);
callBack.onSuccess();
return null;
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push.v2.task;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.executor.PushExecutor;
public class FixturePushExecutor implements PushExecutor {
private boolean shouldSuccess = true;
private Throwable failedException;
@Override
public void doPush(String clientId, Subscriber subscriber, ServiceInfo data) {
}
@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, ServiceInfo data, PushCallBack callBack) {
if (shouldSuccess) {
callBack.onSuccess();
} else {
callBack.onFail(failedException);
}
}
public void setShouldSuccess(boolean shouldSuccess) {
this.shouldSuccess = shouldSuccess;
}
public void setFailedException(Throwable failedException) {
this.failedException = failedException;
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push.v2.task;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.executor.PushExecutor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PushDelayTaskExecuteEngineTest {
@Mock
private ClientManager clientManager;
@Mock
private ClientServiceIndexesManager indexesManager;
@Mock
private ServiceStorage serviceStorage;
@Mock
private PushExecutor pushExecutor;
@Mock
private Client client;
@Mock
private Subscriber subscriber;
private final Service service = Service.newService("N", "G", "S");
private final String clientId = "testClient";
private PushDelayTaskExecuteEngine executeEngine;
@Before
public void setUp() throws Exception {
when(serviceStorage.getPushData(service)).thenReturn(new ServiceInfo("G@@S"));
when(indexesManager.getAllClientsSubscribeService(service)).thenReturn(Collections.singletonList(clientId));
when(clientManager.getClient(clientId)).thenReturn(client);
when(client.getSubscriber(service)).thenReturn(subscriber);
executeEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage, pushExecutor);
}
@After
public void tearDown() throws Exception {
executeEngine.shutdown();
}
@Test
public void testAddTask() throws InterruptedException {
PushDelayTask pushDelayTask = new PushDelayTask(service, 0L);
executeEngine.addTask(service, pushDelayTask);
TimeUnit.MILLISECONDS.sleep(200L);
verify(pushExecutor).doPushWithCallback(anyString(), any(Subscriber.class), any(ServiceInfo.class),
any(PushCallBack.class));
}
}

View File

@ -0,0 +1,95 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push.v2.task;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* For Understand this test case, Please Read {@link com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#addTask(Object,
* AbstractDelayTask)}.
*
* @author xiweng.yy
*/
public class PushDelayTaskTest {
private final Service service = Service.newService("N", "G", "S");
private final String singleTargetClientId = "testClientId";
private PushDelayTask pushToAllTask;
private PushDelayTask singlePushTask;
@Before
public void setUp() throws Exception {
pushToAllTask = new PushDelayTask(service, 0L);
singlePushTask = new PushDelayTask(service, 0L, singleTargetClientId);
}
@Test
public void testMergeAllToSingle() {
PushDelayTask newTask = singlePushTask;
PushDelayTask oldTask = pushToAllTask;
newTask.merge(oldTask);
assertTrue(newTask.isPushToAll());
assertNull(newTask.getTargetClients());
}
@Test
public void testMergeSingleToAll() {
PushDelayTask newTask = pushToAllTask;
PushDelayTask oldTask = singlePushTask;
newTask.merge(oldTask);
assertTrue(newTask.isPushToAll());
assertNull(newTask.getTargetClients());
}
@Test
public void testMergeSingleToSingle() {
PushDelayTask oldTask = singlePushTask;
PushDelayTask newTask = new PushDelayTask(service, 0L, "newClient");
newTask.merge(oldTask);
assertFalse(newTask.isPushToAll());
assertNotNull(newTask.getTargetClients());
assertFalse(newTask.getTargetClients().isEmpty());
assertEquals(2, newTask.getTargetClients().size());
assertTrue(newTask.getTargetClients().contains(singleTargetClientId));
assertTrue(newTask.getTargetClients().contains("newClient"));
}
@Test
public void testMergeAllToAll() throws InterruptedException {
PushDelayTask oldTask = pushToAllTask;
TimeUnit.MILLISECONDS.sleep(10);
PushDelayTask newTask = new PushDelayTask(service, 0L);
newTask.merge(oldTask);
newTask.merge(oldTask);
assertTrue(newTask.isPushToAll());
assertEquals(oldTask.getLastProcessTime(), newTask.getLastProcessTime());
}
}

View File

@ -0,0 +1,135 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push.v2.task;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.NoRequiredRetryException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PushExecuteTaskTest {
private final Service service = Service.newService("N", "G", "S");
private final String clientId = "testClient";
private final FixturePushExecutor pushExecutor = new FixturePushExecutor();
@Mock
private PushDelayTaskExecuteEngine delayTaskExecuteEngine;
@Mock
private ClientManager clientManager;
@Mock
private ClientServiceIndexesManager indexesManager;
@Mock
private ServiceStorage serviceStorage;
@Mock
private Client client;
@Mock
private Subscriber subscriber;
@Before
public void setUp() {
when(indexesManager.getAllClientsSubscribeService(service)).thenReturn(Collections.singletonList(clientId));
when(clientManager.getClient(clientId)).thenReturn(client);
when(client.getSubscriber(service)).thenReturn(subscriber);
when(serviceStorage.getPushData(service)).thenReturn(new ServiceInfo("G@@S"));
when(delayTaskExecuteEngine.getClientManager()).thenReturn(clientManager);
when(delayTaskExecuteEngine.getIndexesManager()).thenReturn(indexesManager);
when(delayTaskExecuteEngine.getPushExecutor()).thenReturn(pushExecutor);
when(delayTaskExecuteEngine.getServiceStorage()).thenReturn(serviceStorage);
}
@After
public void tearDown() {
MetricsMonitor.resetAll();
}
@Test
public void testRunSuccessForPushAll() {
PushDelayTask delayTask = new PushDelayTask(service, 0L);
PushExecuteTask executeTask = new PushExecuteTask(service, delayTaskExecuteEngine, delayTask);
executeTask.run();
assertEquals(1, MetricsMonitor.getTotalPushMonitor().get());
}
@Test
public void testRunSuccessForPushSingle() {
PushDelayTask delayTask = new PushDelayTask(service, 0L, clientId);
PushExecuteTask executeTask = new PushExecuteTask(service, delayTaskExecuteEngine, delayTask);
executeTask.run();
assertEquals(1, MetricsMonitor.getTotalPushMonitor().get());
}
@Test
public void testRunFailedWithHandleException() {
PushDelayTask delayTask = new PushDelayTask(service, 0L);
PushExecuteTask executeTask = new PushExecuteTask(service, delayTaskExecuteEngine, delayTask);
when(delayTaskExecuteEngine.getServiceStorage()).thenThrow(new RuntimeException());
executeTask.run();
assertEquals(0, MetricsMonitor.getFailedPushMonitor().get());
verify(delayTaskExecuteEngine).addTask(eq(service), any(PushDelayTask.class));
}
@Test
public void testRunFailedWithNoRetry() {
PushDelayTask delayTask = new PushDelayTask(service, 0L);
PushExecuteTask executeTask = new PushExecuteTask(service, delayTaskExecuteEngine, delayTask);
pushExecutor.setShouldSuccess(false);
pushExecutor.setFailedException(new NoRequiredRetryException());
executeTask.run();
assertEquals(1, MetricsMonitor.getFailedPushMonitor().get());
verify(delayTaskExecuteEngine, never()).addTask(eq(service), any(PushDelayTask.class));
}
@Test
public void testRunFailedWithRetry() {
PushDelayTask delayTask = new PushDelayTask(service, 0L);
PushExecuteTask executeTask = new PushExecuteTask(service, delayTaskExecuteEngine, delayTask);
pushExecutor.setShouldSuccess(false);
pushExecutor.setFailedException(new RuntimeException());
executeTask.run();
assertEquals(1, MetricsMonitor.getFailedPushMonitor().get());
verify(delayTaskExecuteEngine).addTask(eq(service), any(PushDelayTask.class));
}
}