Support update instance metadata and sync by raft (#4279)

This commit is contained in:
杨翊 SionYang 2020-11-20 10:35:33 +08:00 committed by GitHub
parent 89342da468
commit 1546ff6206
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 329 additions and 69 deletions

View File

@ -173,22 +173,11 @@ public class InstanceController {
@PutMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String update(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
String agent = WebUtils.getUserAgent(request);
ClientInfo clientInfo = new ClientInfo(agent);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
serviceManager.updateInstance(namespaceId, serviceName, instance);
} else {
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String groupName = WebUtils.optional(request, CommonParams.GROUP_NAME, Constants.DEFAULT_GROUP);
instanceService.updateInstance(namespaceId, serviceName, groupName, parseInstance(request));
return "ok";
}

View File

@ -49,6 +49,18 @@ public interface InstanceOperator {
*/
void removeInstance(String namespaceId, String serviceName, Instance instance) throws NacosException;
/**
* Update instance information. Due to the basic information can't be changed, so this update should only update
* metadata.
*
* @param namespaceId namespace
* @param serviceName service name
* @param groupName group name
* @param instance instance
* @throws NacosException nacos exception when update failed
*/
void updateInstance(String namespaceId, String serviceName, String groupName, Instance instance) throws NacosException;
/**
* Get all instance of input service.
*

View File

@ -18,22 +18,31 @@ package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingResponseCode;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.IpPortBasedClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.metadata.InstanceMetadata;
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.service.ClientOperationService;
import com.alibaba.nacos.naming.healthcheck.heartbeat.ClientBeatProcessorV2;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.healthcheck.heartbeat.ClientBeatProcessorV2;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.utils.ServiceUtil;
import java.util.Optional;
/**
* Instance service.
*
@ -42,17 +51,27 @@ import com.alibaba.nacos.naming.utils.ServiceUtil;
@org.springframework.stereotype.Service
public class InstanceOperatorClientImpl implements InstanceOperator {
private final IpPortBasedClientManager ipPortBasedClientManager;
private final ClientManagerDelegate clientManager;
private final ClientOperationService clientOperationService;
private final ServiceStorage serviceStorage;
public InstanceOperatorClientImpl(IpPortBasedClientManager ipPortBasedClientManager,
ClientOperationService clientOperationService, ServiceStorage serviceStorage) {
this.ipPortBasedClientManager = ipPortBasedClientManager;
private final NamingMetadataOperateService metadataOperateService;
private final NamingMetadataManager metadataManager;
private final SwitchDomain switchDomain;
public InstanceOperatorClientImpl(ClientManagerDelegate clientManager,
ClientOperationService clientOperationService, ServiceStorage serviceStorage,
NamingMetadataOperateService metadataOperateService, NamingMetadataManager metadataManager, SwitchDomain switchDomain) {
this.clientManager = clientManager;
this.clientOperationService = clientOperationService;
this.serviceStorage = serviceStorage;
this.metadataOperateService = metadataOperateService;
this.metadataManager = metadataManager;
this.switchDomain = switchDomain;
}
/**
@ -71,7 +90,7 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
@Override
public void removeInstance(String namespaceId, String serviceName, Instance instance) {
String clientId = instance.toInetAddr();
if (!ipPortBasedClientManager.allClientId().contains(clientId)) {
if (!clientManager.allClientId().contains(clientId)) {
Loggers.SRV_LOG.warn("remove instance from non-exist client: {}", clientId);
return;
}
@ -81,6 +100,24 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
clientOperationService.deregisterInstance(service, instance, clientId);
}
@Override
public void updateInstance(String namespaceId, String serviceName, String groupName, Instance instance) throws NacosException {
Service service = Service.newService(namespaceId, groupName, serviceName, instance.isEphemeral());
if (!ServiceManager.getInstance().containSingleton(service)) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + service);
}
metadataOperateService.updateInstanceMetadata(service, instance.getIp(), buildMetadata(instance));
}
private InstanceMetadata buildMetadata(Instance instance) {
InstanceMetadata result = new InstanceMetadata();
result.setEnabled(instance.isEnabled());
result.setWeight(instance.getWeight());
result.getExtendData().putAll(instance.getMetadata());
return result;
}
@Override
public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
boolean healthOnly) {
@ -104,7 +141,7 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
String groupName = NamingUtils.getGroupName(serviceName);
String serviceNameNoGrouped = NamingUtils.getServiceName(serviceName);
String clientId = ip + ":" + port;
IpPortBasedClient client = (IpPortBasedClient) ipPortBasedClientManager.getClient(clientId);
IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(clientId);
if (null == client) {
if (null == clientBeat) {
return NamingResponseCode.RESOURCE_NOT_FOUND;
@ -119,7 +156,7 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
registerInstance(namespaceId, serviceName, instance);
client = (IpPortBasedClient) ipPortBasedClientManager.getClient(clientId);
client = (IpPortBasedClient) clientManager.getClient(clientId);
}
Service service = Service.newService(namespaceId, groupName, serviceNameNoGrouped);
if (!ServiceManager.getInstance().containSingleton(service)) {
@ -141,13 +178,24 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
@Override
public long getHeartBeatInterval(String namespaceId, String serviceName, String ip, int port, String cluster) {
// TODO Get heart beat interval from CP metadata
return 5000L;
String groupName = NamingUtils.getGroupName(serviceName);
String serviceNameNoGrouped = NamingUtils.getServiceName(serviceName);
Service service = Service.newService(namespaceId, groupName, serviceNameNoGrouped);
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(service, ip);
if (metadata.isPresent() && metadata.get().getExtendData().containsKey(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
return ConvertUtils.toLong(metadata.get().getExtendData().get(PreservedMetadataKeys.HEART_BEAT_INTERVAL));
}
String clientId = ip + ":" + port;
InstancePublishInfo instance = clientManager.getClient(clientId).getInstancePublishInfo(service);
if (null != instance && instance.getExtendDatum().containsKey(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
return ConvertUtils.toLong(instance.getExtendDatum().get(PreservedMetadataKeys.HEART_BEAT_INTERVAL));
}
return switchDomain.getClientBeatInterval();
}
private void createIpPortClientIfAbsent(String clientId, boolean ephemeral) {
if (!ipPortBasedClientManager.allClientId().contains(clientId)) {
ipPortBasedClientManager.clientConnected(new IpPortBasedClient(clientId, ephemeral));
if (!clientManager.allClientId().contains(clientId)) {
clientManager.clientConnected(new IpPortBasedClient(clientId, ephemeral));
}
}
}

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.api.naming.NamingResponseCode;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.misc.Loggers;
@ -99,6 +100,13 @@ public class InstanceOperatorServiceImpl implements InstanceOperator {
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), coreInstance);
}
@Override
public void updateInstance(String namespaceId, String serviceName, String groupName, Instance instance) throws NacosException {
com.alibaba.nacos.naming.core.Instance coreInstance = (com.alibaba.nacos.naming.core.Instance) instance;
String groupedServiceName = NamingUtils.getGroupedName(groupName, serviceName);
serviceManager.updateInstance(namespaceId, groupedServiceName, coreInstance);
}
@Override
public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
boolean healthOnly) throws Exception {

View File

@ -21,9 +21,12 @@ import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
import com.alibaba.nacos.naming.core.v2.metadata.InstanceMetadata;
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.SwitchDomain;
@ -54,6 +57,8 @@ public class ServiceStorage {
private final SwitchDomain switchDomain;
private final NamingMetadataManager metadataManager;
private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;
private final ConcurrentMap<Service, Set<String>> serviceClusterIndex;
@ -61,42 +66,16 @@ public class ServiceStorage {
private final ConcurrentMap<String, Set<Service>> namespaceServiceIndex;
public ServiceStorage(ClientServiceIndexesManager serviceIndexesManager, ClientManagerDelegate clientManager,
SwitchDomain switchDomain) {
SwitchDomain switchDomain, NamingMetadataManager metadataManager) {
this.serviceIndexesManager = serviceIndexesManager;
this.clientManager = clientManager;
this.switchDomain = switchDomain;
this.metadataManager = metadataManager;
this.serviceDataIndexes = new ConcurrentHashMap<>();
this.serviceClusterIndex = new ConcurrentHashMap<>();
this.namespaceServiceIndex = new ConcurrentHashMap<>();
}
public ServiceInfo getData(Service service) {
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}
public ServiceInfo getPushData(Service service) {
ServiceInfo result = new ServiceInfo();
result.setName(service.getName());
result.setGroupName(service.getGroup());
result.setLastRefTime(System.currentTimeMillis());
result.setCacheMillis(switchDomain.getDefaultPushCacheMillis());
List<Instance> instances = new LinkedList<>();
Set<String> clusters = new HashSet<>();
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
if (instancePublishInfo.isPresent()) {
Instance instance = parseInstance(service, instancePublishInfo.get());
instances.add(instance);
clusters.add(instance.getClusterName());
}
}
result.setHosts(instances);
serviceDataIndexes.put(service, result);
serviceClusterIndex.put(service, clusters);
updateNamespaceIndex(service);
return result;
}
public Set<String> getClusters(Service service) {
return serviceClusterIndex.getOrDefault(service, new HashSet<>());
}
@ -105,6 +84,46 @@ public class ServiceStorage {
return namespaceServiceIndex.getOrDefault(namespace, new ConcurrentHashSet<>());
}
public ServiceInfo getData(Service service) {
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}
public ServiceInfo getPushData(Service service) {
ServiceInfo result = emptyServiceInfo(service);
if (!ServiceManager.getInstance().containSingleton(service)) {
return result;
}
result.setHosts(getAllInstancesFromIndex(service));
serviceDataIndexes.put(service, result);
updateNamespaceIndex(service);
return result;
}
private ServiceInfo emptyServiceInfo(Service service) {
ServiceInfo result = new ServiceInfo();
result.setName(service.getName());
result.setGroupName(service.getGroup());
result.setLastRefTime(System.currentTimeMillis());
result.setCacheMillis(switchDomain.getDefaultPushCacheMillis());
return result;
}
private List<Instance> getAllInstancesFromIndex(Service service) {
List<Instance> result = new LinkedList<>();
Set<String> clusters = new HashSet<>();
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
if (instancePublishInfo.isPresent()) {
Instance instance = parseInstance(service, instancePublishInfo.get());
result.add(instance);
clusters.add(instance.getClusterName());
}
}
// cache clusters of this service
serviceClusterIndex.put(service, clusters);
return result;
}
private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
Client client = clientManager.getClient(clientId);
if (null == client) {
@ -126,6 +145,14 @@ public class ServiceStorage {
instanceMetadata.put(entry.getKey(), entry.getValue().toString());
}
}
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(service, instancePublishInfo.getIp());
if (metadata.isPresent()) {
result.setEnabled(metadata.get().isEnabled());
result.setWeight(metadata.get().getWeight());
for (Map.Entry<String, Object> entry : metadata.get().getExtendData().entrySet()) {
instanceMetadata.put(entry.getKey(), entry.getValue().toString());
}
}
result.setMetadata(instanceMetadata);
result.setEphemeral(service.isEphemeral());
result.setHealthy(instancePublishInfo.isHealthy());

View File

@ -0,0 +1,100 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.metadata;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.cp.LogProcessor4CP;
import com.alibaba.nacos.consistency.entity.GetRequest;
import com.alibaba.nacos.consistency.entity.Log;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.utils.Constants;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.reflect.TypeUtils;
import org.springframework.stereotype.Component;
import java.lang.reflect.Type;
import java.util.Collections;
/**
* Instance metadata processor.
*
* @author xiweng.yy
*/
@Component
public class InstanceMetadataProcessor extends LogProcessor4CP {
private final NamingMetadataManager namingMetadataManager;
private final Serializer serializer;
private final Type processType;
@SuppressWarnings("unchecked")
public InstanceMetadataProcessor(NamingMetadataManager namingMetadataManager, ProtocolManager protocolManager) {
this.namingMetadataManager = namingMetadataManager;
this.serializer = SerializeFactory.getSerializer("JSON");
this.processType = TypeUtils.parameterize(MetadataOperation.class, InstanceMetadata.class);
protocolManager.getCpProtocol().addLogProcessors(Collections.singletonList(this));
}
@Override
public Response onRequest(GetRequest request) {
return null;
}
@Override
public Response onApply(Log log) {
switch (DataOperation.valueOf(log.getOperation())) {
case ADD:
case CHANGE:
updateInstanceMetadata(log.getData());
break;
case DELETE:
deleteInstanceMetadata(log.getData());
break;
default:
return Response.newBuilder().setSuccess(false).setErrMsg("Unsupported operation " + log.getOperation())
.build();
}
return Response.newBuilder().setSuccess(true).build();
}
private void updateInstanceMetadata(ByteString data) {
MetadataOperation<InstanceMetadata> op = serializer.deserialize(data.toByteArray(), processType);
Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName());
namingMetadataManager.updateInstanceMetadata(service, op.getTag(), op.getMetadata());
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
}
private void deleteInstanceMetadata(ByteString data) {
MetadataOperation<InstanceMetadata> op = serializer.deserialize(data.toByteArray(), processType);
Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName());
namingMetadataManager.removeInstanceMetadata(service, op.getTag());
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
}
@Override
public String group() {
return Constants.INSTANCE_METADATA;
}
}

View File

@ -29,6 +29,11 @@ public class MetadataOperation<T> {
private String serviceName;
/**
* If the metadata is cluster or instance, the tag should be added with the identity of cluster or instance.
*/
private String tag;
private T metadata;
public String getNamespace() {
@ -55,6 +60,14 @@ public class MetadataOperation<T> {
this.serviceName = serviceName;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public T getMetadata() {
return metadata;
}

View File

@ -54,13 +54,11 @@ public class NamingMetadataOperateService {
* @param serviceMetadata metadata
*/
public void updateServiceMetadata(Service service, ServiceMetadata serviceMetadata) {
MetadataOperation<ServiceMetadata> operation = new MetadataOperation<>();
operation.setNamespace(service.getNamespace());
operation.setGroup(service.getGroup());
operation.setServiceName(service.getName());
MetadataOperation<ServiceMetadata> operation = buildMetadataOperation(service);
operation.setMetadata(serviceMetadata);
Log operationLog = Log.newBuilder().setGroup(Constants.SERVICE_METADATA).setOperation(DataOperation.CHANGE.name())
.setData(ByteString.copyFrom(serializer.serialize(operation))).build();
Log operationLog = Log.newBuilder().setGroup(Constants.SERVICE_METADATA)
.setOperation(DataOperation.CHANGE.name()).setData(ByteString.copyFrom(serializer.serialize(operation)))
.build();
submitMetadataOperation(operationLog);
}
@ -70,15 +68,53 @@ public class NamingMetadataOperateService {
* @param service service of metadata
*/
public void deleteServiceMetadata(Service service) {
MetadataOperation<ServiceMetadata> operation = new MetadataOperation<>();
operation.setNamespace(service.getNamespace());
operation.setGroup(service.getGroup());
operation.setServiceName(service.getGroupedServiceName());
Log operationLog = Log.newBuilder().setGroup(Constants.SERVICE_METADATA).setOperation(DataOperation.DELETE.name())
.setData(ByteString.copyFrom(serializer.serialize(operation))).build();
MetadataOperation<ServiceMetadata> operation = buildMetadataOperation(service);
Log operationLog = Log.newBuilder().setGroup(Constants.SERVICE_METADATA)
.setOperation(DataOperation.DELETE.name()).setData(ByteString.copyFrom(serializer.serialize(operation)))
.build();
submitMetadataOperation(operationLog);
}
/**
* Update instance metadata.
*
* @param service service of metadata
* @param instanceId instance Id
* @param instanceMetadata metadata
*/
public void updateInstanceMetadata(Service service, String instanceId, InstanceMetadata instanceMetadata) {
MetadataOperation<InstanceMetadata> operation = buildMetadataOperation(service);
operation.setTag(instanceId);
operation.setMetadata(instanceMetadata);
Log operationLog = Log.newBuilder().setGroup(Constants.INSTANCE_METADATA)
.setOperation(DataOperation.CHANGE.name()).setData(ByteString.copyFrom(serializer.serialize(operation)))
.build();
submitMetadataOperation(operationLog);
}
/**
* Delete instance metadata.
*
* @param service service of metadata
* @param instanceId instance Id
*/
public void deleteInstanceMetadata(Service service, String instanceId) {
MetadataOperation<InstanceMetadata> operation = buildMetadataOperation(service);
operation.setTag(instanceId);
Log operationLog = Log.newBuilder().setGroup(Constants.INSTANCE_METADATA)
.setOperation(DataOperation.DELETE.name()).setData(ByteString.copyFrom(serializer.serialize(operation)))
.build();
submitMetadataOperation(operationLog);
}
private <T> MetadataOperation<T> buildMetadataOperation(Service service) {
MetadataOperation<T> result = new MetadataOperation<>();
result.setNamespace(service.getNamespace());
result.setGroup(service.getGroup());
result.setServiceName(service.getName());
return result;
}
private void submitMetadataOperation(Log operationLog) {
try {
Response response = cpProtocol.submit(operationLog);

View File

@ -68,7 +68,6 @@ public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCh
@Override
public void doHealthCheck() {
// TODO add white list like v1 {@code marked}
try {
Collection<Service> services = client.getAllPublishedService();
for (Service each : services) {

View File

@ -18,9 +18,11 @@ package com.alibaba.nacos.naming.push.v2.task;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.utils.ServiceUtil;
/**
* Nacos naming push execute task.
@ -42,6 +44,7 @@ public class PushExecuteTask extends AbstractExecuteTask {
public void run() {
try {
ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
serviceInfo = ServiceUtil.filterInstances(serviceInfo, StringUtils.EMPTY, true);
for (String each : delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)) {
Subscriber subscriber = delayTaskEngine.getClientManager().getClient(each).getSubscriber(service);
delayTaskEngine.getPushExecuteService().doPush(each, subscriber, serviceInfo);

View File

@ -26,6 +26,8 @@ public final class Constants {
public static final String SERVICE_METADATA = "naming_service_metadata";
public static final String INSTANCE_METADATA = "naming_instance_metadata";
public static final String NAMING_PERSISTENT_SERVICE_GROUP = "naming_persistent_service";
public static final String NACOS_NAMING_USE_NEW_RAFT_FIRST = "nacos.naming.use-new-raft.first";

View File

@ -206,4 +206,27 @@ public class ServiceUtil {
private static boolean checkHealthy(boolean healthyOnly, com.alibaba.nacos.api.naming.pojo.Instance instance) {
return !healthyOnly || instance.isHealthy();
}
/**
* Filter instances which enabled.
*
* @param serviceInfo service info
* @return new service info
*/
public static ServiceInfo filterEnabledInstances(ServiceInfo serviceInfo) {
ServiceInfo result = new ServiceInfo();
result.setName(serviceInfo.getName());
result.setGroupName(serviceInfo.getGroupName());
result.setCacheMillis(serviceInfo.getCacheMillis());
result.setLastRefTime(System.currentTimeMillis());
result.setClusters(serviceInfo.getClusters());
List<com.alibaba.nacos.api.naming.pojo.Instance> filteredInstance = new LinkedList<>();
for (com.alibaba.nacos.api.naming.pojo.Instance each : serviceInfo.getHosts()) {
if (each.isEnabled()) {
filteredInstance.add(each);
}
}
result.setHosts(filteredInstance);
return result;
}
}