Nacos2.0 adapt batch instance metadata operate (#5141)

* Rebuild ui

* Nacos2.0 adapt batch instance metadata operate
This commit is contained in:
杨翊 SionYang 2021-03-19 11:48:06 +08:00 committed by GitHub
parent 9bb08d9f4d
commit fb9300d87f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 170 additions and 72 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -36,7 +36,6 @@ import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.SwitchEntry;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.InstanceOperationContext;
import com.alibaba.nacos.naming.pojo.InstanceOperationInfo;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.web.CanDistro;
@ -58,16 +57,11 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.DEFAULT_CLUSTER_NAME;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.EPHEMERAL;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.PERSIST;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.UPDATE_INSTANCE_METADATA_ACTION_REMOVE;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.UPDATE_INSTANCE_METADATA_ACTION_UPDATE;
/**
* Instance operation controller.
@ -165,32 +159,24 @@ public class InstanceController {
@CanDistro
@PutMapping(value = "/metadata/batch")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode batchUpdateInstanceMatadata(HttpServletRequest request) throws Exception {
public ObjectNode batchUpdateInstanceMetadata(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String consistencyType = WebUtils.optional(request, "consistencyType", StringUtils.EMPTY);
String instances = WebUtils.optional(request, "instances", StringUtils.EMPTY);
List<Instance> targetInstances = parseBatchInstances(instances);
String metadata = WebUtils.required(request, "metadata");
Map<String, String> targetMetadata = UtilsAndCommons.parseMetadata(metadata);
InstanceOperationInfo instanceOperationInfo = buildOperationInfo(serviceName, consistencyType, targetInstances);
List<Instance> operatedInstances = batchOperateMetadata(namespaceId,
buildOperationInfo(serviceName, consistencyType, targetInstances), targetMetadata,
UPDATE_INSTANCE_METADATA_ACTION_UPDATE);
List<String> operatedInstances = getInstanceOperator()
.batchUpdateMetadata(namespaceId, instanceOperationInfo, targetMetadata);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
ArrayNode ipArray = JacksonUtils.createEmptyArrayNode();
for (Instance ip : operatedInstances) {
ipArray.add(ip.getDatumKey() + ":" + (ip.isEphemeral() ? EPHEMERAL : PERSIST));
for (String ip : operatedInstances) {
ipArray.add(ip);
}
result.replace("updated", ipArray);
return result;
}
@ -206,32 +192,24 @@ public class InstanceController {
@CanDistro
@DeleteMapping("/metadata/batch")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode batchDeleteInstanceMatadata(HttpServletRequest request) throws Exception {
public ObjectNode batchDeleteInstanceMetadata(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String consistencyType = WebUtils.optional(request, "consistencyType", StringUtils.EMPTY);
String instances = WebUtils.optional(request, "instances", StringUtils.EMPTY);
List<Instance> targetInstances = parseBatchInstances(instances);
String metadata = WebUtils.required(request, "metadata");
Map<String, String> targetMetadata = UtilsAndCommons.parseMetadata(metadata);
List<Instance> operatedInstances = batchOperateMetadata(namespaceId,
buildOperationInfo(serviceName, consistencyType, targetInstances), targetMetadata,
UPDATE_INSTANCE_METADATA_ACTION_REMOVE);
InstanceOperationInfo instanceOperationInfo = buildOperationInfo(serviceName, consistencyType, targetInstances);
List<String> operatedInstances = getInstanceOperator()
.batchDeleteMetadata(namespaceId, instanceOperationInfo, targetMetadata);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
ArrayNode ipArray = JacksonUtils.createEmptyArrayNode();
for (Instance ip : operatedInstances) {
ipArray.add(ip.getDatumKey() + ":" + (ip.isEphemeral() ? EPHEMERAL : PERSIST));
for (String ip : operatedInstances) {
ipArray.add(ip);
}
result.replace("updated", ipArray);
return result;
}
@ -255,23 +233,9 @@ public class InstanceController {
} catch (Exception e) {
Loggers.SRV_LOG.warn("UPDATE-METADATA: Param 'instances' is illegal, ignore this operation", e);
}
return null;
return Collections.emptyList();
}
private List<Instance> batchOperateMetadata(String namespace, InstanceOperationInfo instanceOperationInfo,
Map<String, String> metadata, String action) {
Function<InstanceOperationContext, List<Instance>> operateFunction = instanceOperationContext -> {
try {
return serviceManager.updateMetadata(instanceOperationContext.getNamespace(),
instanceOperationContext.getServiceName(), instanceOperationContext.getEphemeral(), action,
instanceOperationContext.getAll(), instanceOperationContext.getInstances(), metadata);
} catch (NacosException e) {
Loggers.SRV_LOG.warn("UPDATE-METADATA: updateMetadata failed", e);
}
return new ArrayList<>();
};
return serviceManager.batchOperate(namespace, instanceOperationInfo, operateFunction);
}
/**
* Patch instance.

View File

@ -20,9 +20,11 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.pojo.InstanceOperationInfo;
import com.alibaba.nacos.naming.pojo.Subscriber;
import java.util.List;
import java.util.Map;
/**
* Instance operator.
@ -141,4 +143,28 @@ public interface InstanceOperator {
* @throws NacosException nacos exception during query
*/
List<? extends Instance> listAllInstances(String namespaceId, String serviceName) throws NacosException;
/**
* Batch update metadata of instances.
*
* @param namespaceId namespace Id of instances
* @param instanceOperationInfo instance operation info
* @param metadata updated metadata
* @return updated instance
* @throws NacosException nacos exception during update
*/
List<String> batchUpdateMetadata(String namespaceId, InstanceOperationInfo instanceOperationInfo,
Map<String, String> metadata) throws NacosException;
/**
* Batch delete metadata of instances.
*
* @param namespaceId namespace Id of instances
* @param instanceOperationInfo instance operation info
* @param metadata delete metadata
* @return updated instance
* @throws NacosException nacos exception during update
*/
List<String> batchDeleteMetadata(String namespaceId, InstanceOperationInfo instanceOperationInfo,
Map<String, String> metadata) throws NacosException;
}

View File

@ -43,11 +43,15 @@ import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.healthcheck.heartbeat.ClientBeatProcessorV2;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.InstanceOperationInfo;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.utils.ServiceUtil;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
@ -113,7 +117,8 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + service);
}
String metadataId = InstancePublishInfo.genMetadataId(instance.getIp(), instance.getPort(), instance.getClusterName());
String metadataId = InstancePublishInfo
.genMetadataId(instance.getIp(), instance.getPort(), instance.getClusterName());
metadataOperateService.updateInstanceMetadata(service, metadataId, buildMetadata(instance));
}
@ -170,7 +175,8 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
}
ServiceInfo serviceInfo = serviceStorage.getData(service);
ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
ServiceInfo result = ServiceUtil.selectInstancesWithHealthyProtection(serviceInfo, serviceMetadata, cluster, healthOnly, true);
ServiceInfo result = ServiceUtil
.selectInstancesWithHealthyProtection(serviceInfo, serviceMetadata, cluster, healthOnly, true);
// adapt for v1.x sdk
result.setName(NamingUtils.getGroupedName(result.getName(), result.getGroupName()));
return result;
@ -180,10 +186,14 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
public Instance getInstance(String namespaceId, String serviceName, String cluster, String ip, int port)
throws NacosException {
Service service = getService(namespaceId, serviceName, true);
return getInstance0(service, cluster, ip, port);
}
private Instance getInstance0(Service service, String cluster, String ip, int port) throws NacosException {
ServiceInfo serviceInfo = serviceStorage.getData(service);
if (serviceInfo.getHosts().isEmpty()) {
throw new NacosException(NacosException.NOT_FOUND,
"no ips found for cluster " + cluster + " in service " + serviceName);
"no ips found for cluster " + cluster + " in service " + service.getGroupedServiceName());
}
for (Instance each : serviceInfo.getHosts()) {
if (cluster.equals(each.getClusterName()) && ip.equals(each.getIp()) && port == each.getPort()) {
@ -256,6 +266,61 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
return serviceStorage.getData(service).getHosts();
}
@Override
public List<String> batchUpdateMetadata(String namespaceId, InstanceOperationInfo instanceOperationInfo,
Map<String, String> metadata) throws NacosException {
boolean isEphemeral = !UtilsAndCommons.PERSIST.equals(instanceOperationInfo.getConsistencyType());
String serviceName = instanceOperationInfo.getServiceName();
Service service = getService(namespaceId, serviceName, isEphemeral);
List<String> result = new LinkedList<>();
List<Instance> needUpdateInstance = findBatchUpdateInstance(instanceOperationInfo, service);
for (Instance each : needUpdateInstance) {
String metadataId = InstancePublishInfo.genMetadataId(each.getIp(), each.getPort(), each.getClusterName());
Optional<InstanceMetadata> instanceMetadata = metadataManager.getInstanceMetadata(service, metadataId);
InstanceMetadata newMetadata = instanceMetadata.map(this::cloneMetadata).orElseGet(InstanceMetadata::new);
newMetadata.getExtendData().putAll(metadata);
metadataOperateService.updateInstanceMetadata(service, metadataId, newMetadata);
result.add(each.toInetAddr() + ":" + UtilsAndCommons.LOCALHOST_SITE + ":" + each.getClusterName() + ":" + (
each.isEphemeral() ? UtilsAndCommons.EPHEMERAL : UtilsAndCommons.PERSIST));
}
return result;
}
@Override
public List<String> batchDeleteMetadata(String namespaceId, InstanceOperationInfo instanceOperationInfo,
Map<String, String> metadata) throws NacosException {
boolean isEphemeral = !UtilsAndCommons.PERSIST.equals(instanceOperationInfo.getConsistencyType());
String serviceName = instanceOperationInfo.getServiceName();
Service service = getService(namespaceId, serviceName, isEphemeral);
List<String> result = new LinkedList<>();
List<Instance> needUpdateInstance = findBatchUpdateInstance(instanceOperationInfo, service);
for (Instance each : needUpdateInstance) {
String metadataId = InstancePublishInfo.genMetadataId(each.getIp(), each.getPort(), each.getClusterName());
Optional<InstanceMetadata> instanceMetadata = metadataManager.getInstanceMetadata(service, metadataId);
InstanceMetadata newMetadata = instanceMetadata.map(this::cloneMetadata).orElseGet(InstanceMetadata::new);
metadata.keySet().forEach(key -> newMetadata.getExtendData().remove(key));
metadataOperateService.updateInstanceMetadata(service, metadataId, newMetadata);
result.add(each.toInetAddr() + ":" + UtilsAndCommons.LOCALHOST_SITE + ":" + each.getClusterName() + ":" + (
each.isEphemeral() ? UtilsAndCommons.EPHEMERAL : UtilsAndCommons.PERSIST));
}
return result;
}
private List<Instance> findBatchUpdateInstance(InstanceOperationInfo instanceOperationInfo, Service service) {
if (null == instanceOperationInfo.getInstances() || instanceOperationInfo.getInstances().isEmpty()) {
return serviceStorage.getData(service).getHosts();
}
List<Instance> result = new LinkedList<>();
for (Instance each : instanceOperationInfo.getInstances()) {
try {
getInstance0(service, each.getClusterName(), each.getIp(), each.getPort());
result.add(each);
} catch (NacosException ignored) {
}
}
return result;
}
private void createIpPortClientIfAbsent(String clientId, boolean ephemeral) {
if (!clientManager.contains(clientId)) {
clientManager.clientConnected(new IpPortBasedClient(clientId, ephemeral));
@ -267,4 +332,5 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
String serviceNameNoGrouped = NamingUtils.getServiceName(serviceName);
return Service.newService(namespaceId, groupName, serviceNameNoGrouped, ephemeral);
}
}

View File

@ -25,6 +25,8 @@ import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.InstanceOperationContext;
import com.alibaba.nacos.naming.pojo.InstanceOperationInfo;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.naming.push.v1.ClientInfo;
@ -44,9 +46,15 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.EPHEMERAL;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.PERSIST;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.UPDATE_INSTANCE_METADATA_ACTION_REMOVE;
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.UPDATE_INSTANCE_METADATA_ACTION_UPDATE;
/**
* Implementation of {@link InstanceOperator} by service for v1.x.
*
@ -217,13 +225,10 @@ public class InstanceOperatorServiceImpl implements InstanceOperator {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", result.getName());
result.setReachProtectionThreshold(true);
hosts = Stream.of(Boolean.TRUE, Boolean.FALSE)
.map(ipMap::get)
.flatMap(Collection::stream)
hosts = Stream.of(Boolean.TRUE, Boolean.FALSE).map(ipMap::get).flatMap(Collection::stream)
.map(InstanceUtil::deepCopy)
// set all to `healthy` state to protect
.peek(instance -> instance.setHealthy(true))
.collect(Collectors.toCollection(LinkedList::new));
.peek(instance -> instance.setHealthy(true)).collect(Collectors.toCollection(LinkedList::new));
} else {
result.setReachProtectionThreshold(false);
hosts = new LinkedList<>(ipMap.get(Boolean.TRUE));
@ -328,4 +333,41 @@ public class InstanceOperatorServiceImpl implements InstanceOperator {
}
return service.allIPs();
}
@Override
public List<String> batchUpdateMetadata(String namespaceId, InstanceOperationInfo instanceOperationInfo,
Map<String, String> metadata) throws NacosException {
return batchOperate(namespaceId, instanceOperationInfo, metadata, UPDATE_INSTANCE_METADATA_ACTION_UPDATE);
}
@Override
public List<String> batchDeleteMetadata(String namespaceId, InstanceOperationInfo instanceOperationInfo,
Map<String, String> metadata) throws NacosException {
return batchOperate(namespaceId, instanceOperationInfo, metadata, UPDATE_INSTANCE_METADATA_ACTION_REMOVE);
}
private List<String> batchOperate(String namespaceId, InstanceOperationInfo instanceOperationInfo,
Map<String, String> metadata, String updateInstanceMetadataAction) {
List<String> result = new LinkedList<>();
for (com.alibaba.nacos.naming.core.Instance each : batchOperateMetadata(namespaceId, instanceOperationInfo,
metadata, updateInstanceMetadataAction)) {
result.add(each.getDatumKey() + ":" + (each.isEphemeral() ? EPHEMERAL : PERSIST));
}
return result;
}
private List<com.alibaba.nacos.naming.core.Instance> batchOperateMetadata(String namespace,
InstanceOperationInfo instanceOperationInfo, Map<String, String> metadata, String action) {
Function<InstanceOperationContext, List<com.alibaba.nacos.naming.core.Instance>> operateFunction = instanceOperationContext -> {
try {
return serviceManager.updateMetadata(instanceOperationContext.getNamespace(),
instanceOperationContext.getServiceName(), instanceOperationContext.getEphemeral(), action,
instanceOperationContext.getAll(), instanceOperationContext.getInstances(), metadata);
} catch (NacosException e) {
Loggers.SRV_LOG.warn("UPDATE-METADATA: updateMetadata failed", e);
}
return new ArrayList<>();
};
return serviceManager.batchOperate(namespace, instanceOperationInfo, operateFunction);
}
}

View File

@ -746,7 +746,7 @@ public class ServiceManager implements RecordListener<Service> {
break;
}
} else {
List<Instance> instances = operationInfo.getInstances();
List<Instance> instances = (List<Instance>) operationInfo.getInstances();
if (!CollectionUtils.isEmpty(instances)) {
//ephemeral:instances or persist:instances
Map<Boolean, List<Instance>> instanceMap = instances.stream()

View File

@ -16,7 +16,7 @@
package com.alibaba.nacos.naming.pojo;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.api.naming.pojo.Instance;
import java.util.List;
@ -31,7 +31,7 @@ public class InstanceOperationInfo {
public InstanceOperationInfo() {
}
public InstanceOperationInfo(String serviceName, String consistencyType, List<Instance> instances) {
public InstanceOperationInfo(String serviceName, String consistencyType, List<? extends Instance> instances) {
this.serviceName = serviceName;
this.consistencyType = consistencyType;
this.instances = instances;
@ -54,7 +54,7 @@ public class InstanceOperationInfo {
/**
* instances which need operate.
*/
private List<Instance> instances;
private List<? extends Instance> instances;
public String getServiceName() {
return serviceName;
@ -64,7 +64,7 @@ public class InstanceOperationInfo {
return consistencyType;
}
public List<Instance> getInstances() {
public List<? extends Instance> getInstances() {
return instances;
}