This commit is contained in:
parent
8a4b885fd6
commit
e6402bc2a8
@ -25,6 +25,7 @@ import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
|
||||
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
|
||||
import com.alibaba.nacos.naming.misc.SwitchDomain;
|
||||
@ -60,12 +61,13 @@ public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements Na
|
||||
|
||||
public NamingSubscriberServiceV2Impl(ClientManagerDelegate clientManager,
|
||||
ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage,
|
||||
NamingMetadataManager metadataManager,
|
||||
PushExecutorDelegate pushExecutor, UpgradeJudgement upgradeJudgement, SwitchDomain switchDomain) {
|
||||
this.clientManager = clientManager;
|
||||
this.indexesManager = indexesManager;
|
||||
this.upgradeJudgement = upgradeJudgement;
|
||||
this.delayTaskEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage,
|
||||
pushExecutor, switchDomain);
|
||||
metadataManager, pushExecutor, switchDomain);
|
||||
NotifyCenter.registerSubscriber(this);
|
||||
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine;
|
||||
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.naming.misc.NamingExecuteTaskDispatcher;
|
||||
@ -41,16 +42,20 @@ public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
|
||||
|
||||
private final ServiceStorage serviceStorage;
|
||||
|
||||
private final NamingMetadataManager metadataManager;
|
||||
|
||||
private final PushExecutor pushExecutor;
|
||||
|
||||
private final SwitchDomain switchDomain;
|
||||
|
||||
public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,
|
||||
ServiceStorage serviceStorage, PushExecutor pushExecutor, SwitchDomain switchDomain) {
|
||||
ServiceStorage serviceStorage, NamingMetadataManager metadataManager,
|
||||
PushExecutor pushExecutor, SwitchDomain switchDomain) {
|
||||
super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH);
|
||||
this.clientManager = clientManager;
|
||||
this.indexesManager = indexesManager;
|
||||
this.serviceStorage = serviceStorage;
|
||||
this.metadataManager = metadataManager;
|
||||
this.pushExecutor = pushExecutor;
|
||||
this.switchDomain = switchDomain;
|
||||
setDefaultTaskProcessor(new PushDelayTaskProcessor(this));
|
||||
@ -68,6 +73,10 @@ public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
|
||||
return serviceStorage;
|
||||
}
|
||||
|
||||
public NamingMetadataManager getMetadataManager() {
|
||||
return metadataManager;
|
||||
}
|
||||
|
||||
public PushExecutor getPushExecutor() {
|
||||
return pushExecutor;
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
|
||||
import com.alibaba.nacos.api.remote.PushCallBack;
|
||||
import com.alibaba.nacos.common.task.AbstractExecuteTask;
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.naming.pojo.Subscriber;
|
||||
@ -72,7 +73,8 @@ public class PushExecuteTask extends AbstractExecuteTask {
|
||||
|
||||
private PushDataWrapper generatePushData() {
|
||||
ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
|
||||
serviceInfo = ServiceUtil.selectInstances(serviceInfo, false, true);
|
||||
ServiceMetadata serviceMetadata = delayTaskEngine.getMetadataManager().getServiceMetadata(service).orElse(null);
|
||||
serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceInfo, serviceMetadata, false, true);
|
||||
return new PushDataWrapper(serviceInfo);
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,8 @@ import com.alibaba.nacos.auth.common.ActionTypes;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacos.core.remote.RequestHandler;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl;
|
||||
import com.alibaba.nacos.naming.pojo.Subscriber;
|
||||
@ -46,11 +48,15 @@ public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServ
|
||||
|
||||
private final ServiceStorage serviceStorage;
|
||||
|
||||
private final NamingMetadataManager metadataManager;
|
||||
|
||||
private final EphemeralClientOperationServiceImpl clientOperationService;
|
||||
|
||||
public SubscribeServiceRequestHandler(ServiceStorage serviceStorage,
|
||||
NamingMetadataManager metadataManager,
|
||||
EphemeralClientOperationServiceImpl clientOperationService) {
|
||||
this.serviceStorage = serviceStorage;
|
||||
this.metadataManager = metadataManager;
|
||||
this.clientOperationService = clientOperationService;
|
||||
}
|
||||
|
||||
@ -64,7 +70,9 @@ public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServ
|
||||
Service service = Service.newService(namespaceId, groupName, serviceName, true);
|
||||
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), "unknown",
|
||||
meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());
|
||||
ServiceInfo serviceInfo = handleClusterData(serviceStorage.getData(service), subscriber);
|
||||
ServiceInfo serviceInfo = handleClusterData(serviceStorage.getData(service),
|
||||
metadataManager.getServiceMetadata(service).orElse(null),
|
||||
subscriber);
|
||||
if (request.isSubscribe()) {
|
||||
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
|
||||
} else {
|
||||
@ -77,12 +85,13 @@ public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServ
|
||||
* For adapt push cluster feature. Will be remove after 2.1.x.
|
||||
*
|
||||
* @param data original data
|
||||
* @param metadata service metadata
|
||||
* @param subscriber subscriber information
|
||||
* @return cluster filtered data
|
||||
*/
|
||||
@Deprecated
|
||||
private ServiceInfo handleClusterData(ServiceInfo data, Subscriber subscriber) {
|
||||
private ServiceInfo handleClusterData(ServiceInfo data, ServiceMetadata metadata, Subscriber subscriber) {
|
||||
return StringUtils.isBlank(subscriber.getCluster()) ? data
|
||||
: ServiceUtil.selectInstances(data, subscriber.getCluster());
|
||||
: ServiceUtil.selectInstancesWithHealthyProtection(data, metadata, subscriber.getCluster());
|
||||
}
|
||||
}
|
||||
|
@ -244,6 +244,36 @@ public class ServiceUtil {
|
||||
return doSelectInstances(serviceInfo, cluster, healthyOnly, enableOnly, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Select instance of service info with healthy protection.
|
||||
*
|
||||
* @param serviceInfo original service info
|
||||
* @param serviceMetadata service meta info
|
||||
* @param cluster cluster of instances
|
||||
* @return new service info
|
||||
*/
|
||||
public static ServiceInfo selectInstancesWithHealthyProtection(ServiceInfo serviceInfo,
|
||||
ServiceMetadata serviceMetadata,
|
||||
String cluster) {
|
||||
return selectInstancesWithHealthyProtection(serviceInfo, serviceMetadata, cluster, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Select instance of service info with healthy protection.
|
||||
*
|
||||
* @param serviceInfo original service info
|
||||
* @param serviceMetadata service meta info
|
||||
* @param healthyOnly whether only select instance which healthy
|
||||
* @param enableOnly whether only select instance which enabled
|
||||
* @return new service info
|
||||
*/
|
||||
public static ServiceInfo selectInstancesWithHealthyProtection(ServiceInfo serviceInfo,
|
||||
ServiceMetadata serviceMetadata,
|
||||
boolean healthyOnly,
|
||||
boolean enableOnly) {
|
||||
return selectInstancesWithHealthyProtection(serviceInfo, serviceMetadata, StringUtils.EMPTY, healthyOnly, enableOnly);
|
||||
}
|
||||
|
||||
/**
|
||||
* Select instance of service info with healthy protection.
|
||||
*
|
||||
|
@ -74,7 +74,7 @@ public class NamingSubscriberServiceV2ImplTest {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
subscriberService = new NamingSubscriberServiceV2Impl(clientManager, indexesManager, null, null,
|
||||
subscriberService = new NamingSubscriberServiceV2Impl(clientManager, indexesManager, null, null, null,
|
||||
upgradeJudgement, switchDomain);
|
||||
ReflectionTestUtils.setField(subscriberService, "delayTaskEngine", delayTaskEngine);
|
||||
when(indexesManager.getAllClientsSubscribeService(service)).thenReturn(Collections.singletonList(testClientId));
|
||||
|
@ -22,6 +22,7 @@ import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.misc.SwitchDomain;
|
||||
import com.alibaba.nacos.naming.pojo.Subscriber;
|
||||
@ -54,6 +55,9 @@ public class PushDelayTaskExecuteEngineTest {
|
||||
@Mock
|
||||
private ServiceStorage serviceStorage;
|
||||
|
||||
@Mock
|
||||
private NamingMetadataManager metadataManager;
|
||||
|
||||
@Mock
|
||||
private PushExecutor pushExecutor;
|
||||
|
||||
@ -79,7 +83,7 @@ public class PushDelayTaskExecuteEngineTest {
|
||||
when(clientManager.getClient(clientId)).thenReturn(client);
|
||||
when(client.getSubscriber(service)).thenReturn(subscriber);
|
||||
when(switchDomain.isPushEnabled()).thenReturn(true);
|
||||
executeEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage, pushExecutor,
|
||||
executeEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage, metadataManager, pushExecutor,
|
||||
switchDomain);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user