diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/DistributedDatabaseOperateImpl.java b/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/DistributedDatabaseOperateImpl.java index 7eca60e85..e31f1e400 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/DistributedDatabaseOperateImpl.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/DistributedDatabaseOperateImpl.java @@ -205,7 +205,7 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen NotifyCenter.registerToPublisher(ConfigDumpEvent.class, NotifyCenter.ringBufferSize); NotifyCenter.registerSubscriber(new DumpConfigHandler()); - this.protocol.addLogProcessors(Collections.singletonList(this)); + this.protocol.addRequestProcessors(Collections.singletonList(this)); LogUtil.DEFAULT_LOG.info("use DistributedTransactionServicesImpl"); } @@ -390,7 +390,7 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen if (submit) { List requests = batchUpdate.stream().map(ModifyRequest::new) .collect(Collectors.toList()); - CompletableFuture future = protocol.submitAsync(WriteRequest.newBuilder().setGroup(group()) + CompletableFuture future = protocol.writeAsync(WriteRequest.newBuilder().setGroup(group()) .setData(ByteString.copyFrom(serializer.serialize(requests))) .putExtendInfo(DATA_IMPORT_KEY, Boolean.TRUE.toString()).build()); futures.add(future); @@ -432,14 +432,14 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen .putAllExtendInfo(EmbeddedStorageContextUtils.getCurrentExtendInfo()) .setType(sqlContext.getClass().getCanonicalName()).build(); if (Objects.isNull(consumer)) { - Response response = this.protocol.submit(request); + Response response = this.protocol.write(request); if (response.getSuccess()) { return true; } LogUtil.DEFAULT_LOG.error("execute sql modify operation failed : {}", response.getErrMsg()); return false; } else { - this.protocol.submitAsync(request).whenComplete((BiConsumer) (response, ex) -> { + this.protocol.writeAsync(request).whenComplete((BiConsumer) (response, ex) -> { String errMsg = Objects.isNull(ex) ? response.getErrMsg() : ExceptionUtil.getCause(ex).getMessage(); consumer.accept(response.getSuccess(), StringUtils.isBlank(errMsg) ? null : new NJdbcException(errMsg)); diff --git a/consistency/src/main/java/com/alibaba/nacos/consistency/ConsistencyProtocol.java b/consistency/src/main/java/com/alibaba/nacos/consistency/ConsistencyProtocol.java index 534423809..a5a98fcb1 100644 --- a/consistency/src/main/java/com/alibaba/nacos/consistency/ConsistencyProtocol.java +++ b/consistency/src/main/java/com/alibaba/nacos/consistency/ConsistencyProtocol.java @@ -48,11 +48,11 @@ public interface ConsistencyProtocol processors); + void addRequestProcessors(Collection

processors); /** * Copy of metadata information for this consensus protocol. @@ -87,7 +87,7 @@ public interface ConsistencyProtocol submitAsync(WriteRequest request); + CompletableFuture writeAsync(WriteRequest request); /** * New member list . diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java index 5f0224ec9..c6f73d164 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java @@ -159,7 +159,7 @@ public class JRaftProtocol extends AbstractConsistencyProtocol processors) { + public void addRequestProcessors(Collection processors) { raftServer.createMultiRaftGroup(processors); } @@ -175,14 +175,14 @@ public class JRaftProtocol extends AbstractConsistencyProtocol future = submitAsync(request); + public Response write(WriteRequest request) throws Exception { + CompletableFuture future = writeAsync(request); // Here you wait for 10 seconds, as long as possible, for the request to complete return future.get(10_000L, TimeUnit.MILLISECONDS); } @Override - public CompletableFuture submitAsync(WriteRequest request) { + public CompletableFuture writeAsync(WriteRequest request) { return raftServer.commit(request.getGroup(), request, new CompletableFuture<>()); } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosGetRequestProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosGetRequestProcessor.java index f87578083..59d826823 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosGetRequestProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosGetRequestProcessor.java @@ -28,6 +28,7 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor; * * @author liaochuntao */ +@Deprecated public class NacosGetRequestProcessor extends AbstractProcessor implements RpcProcessor { private static final String INTEREST_NAME = GetRequest.class.getName(); diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosLogProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosLogProcessor.java index 0511db1e5..a57c6ab05 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosLogProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosLogProcessor.java @@ -28,6 +28,7 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor; * * @author liaochuntao */ +@Deprecated public class NacosLogProcessor extends AbstractProcessor implements RpcProcessor { private static final String INTEREST_NAME = Log.class.getName(); diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java index 05f2d32d3..7dc81750a 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java @@ -18,6 +18,7 @@ package com.alibaba.nacos.core.distributed.raft.processor; import com.alibaba.nacos.consistency.Serializer; import com.alibaba.nacos.consistency.entity.ReadRequest; +import com.alibaba.nacos.core.distributed.raft.JRaftServer; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; @@ -28,17 +29,22 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor; */ public class NacosReadRequestProcessor extends AbstractProcessor implements RpcProcessor { - public NacosReadRequestProcessor(Serializer serializer) { + private static final String INTEREST_NAME = ReadRequest.class.getName(); + + private final JRaftServer server; + + public NacosReadRequestProcessor(JRaftServer server, Serializer serializer) { super(serializer); + this.server = server; } @Override public void handleRequest(RpcContext rpcCtx, ReadRequest request) { - + handleRequest(server, request.getGroup(), rpcCtx, request); } @Override public String interest() { - return null; + return INTEREST_NAME; } } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosWriteRequestProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosWriteRequestProcessor.java index e684ef0a6..6562db2d4 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosWriteRequestProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosWriteRequestProcessor.java @@ -18,6 +18,7 @@ package com.alibaba.nacos.core.distributed.raft.processor; import com.alibaba.nacos.consistency.Serializer; import com.alibaba.nacos.consistency.entity.WriteRequest; +import com.alibaba.nacos.core.distributed.raft.JRaftServer; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; @@ -28,17 +29,22 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor; */ public class NacosWriteRequestProcessor extends AbstractProcessor implements RpcProcessor { - public NacosWriteRequestProcessor(Serializer serializer) { + private static final String INTEREST_NAME = WriteRequest.class.getName(); + + private final JRaftServer server; + + public NacosWriteRequestProcessor(JRaftServer server, Serializer serializer) { super(serializer); + this.server = server; } @Override public void handleRequest(RpcContext rpcCtx, WriteRequest request) { - + handleRequest(server, request.getGroup(), rpcCtx, request); } @Override public String interest() { - return null; + return INTEREST_NAME; } } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftUtils.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftUtils.java index af5c6bc94..3fdba2cf9 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftUtils.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftUtils.java @@ -27,6 +27,8 @@ import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.distributed.raft.JRaftServer; import com.alibaba.nacos.core.distributed.raft.processor.NacosGetRequestProcessor; import com.alibaba.nacos.core.distributed.raft.processor.NacosLogProcessor; +import com.alibaba.nacos.core.distributed.raft.processor.NacosReadRequestProcessor; +import com.alibaba.nacos.core.distributed.raft.processor.NacosWriteRequestProcessor; import com.alibaba.nacos.sys.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.sys.utils.DiskUtils; @@ -79,9 +81,14 @@ public class JRaftUtils { RaftRpcServerFactory.addRaftRequestProcessors(rpcServer, RaftExecutor.getRaftCoreExecutor(), RaftExecutor.getRaftCliServiceExecutor()); + // Deprecated rpcServer.registerProcessor(new NacosLogProcessor(server, SerializeFactory.getDefault())); + // Deprecated rpcServer.registerProcessor(new NacosGetRequestProcessor(server, SerializeFactory.getDefault())); + rpcServer.registerProcessor(new NacosWriteRequestProcessor(server, SerializeFactory.getDefault())); + rpcServer.registerProcessor(new NacosReadRequestProcessor(server, SerializeFactory.getDefault())); + return rpcServer; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java index 4b2cc3fa6..46a90c6dc 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java @@ -65,7 +65,7 @@ public class PersistentServiceProcessor extends BasePersistentServiceProcessor { @Override public void afterConstruct() { super.afterConstruct(); - this.protocol.addLogProcessors(Collections.singletonList(this)); + this.protocol.addRequestProcessors(Collections.singletonList(this)); this.protocol.protocolMetaData() .subscribe(Constants.NAMING_PERSISTENT_SERVICE_GROUP, MetadataKey.LEADER_META_DATA, (o, arg) -> hasLeader = StringUtils.isNotBlank(String.valueOf(arg))); @@ -95,7 +95,7 @@ public class PersistentServiceProcessor extends BasePersistentServiceProcessor { final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build(); try { - protocol.submit(request); + protocol.write(request); } catch (Exception e) { throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); } @@ -108,7 +108,7 @@ public class PersistentServiceProcessor extends BasePersistentServiceProcessor { final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build(); try { - protocol.submit(request); + protocol.write(request); } catch (Exception e) { throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java index 2770016f2..ce687358a 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/ServiceController.java @@ -94,7 +94,7 @@ public class ServiceController { private ServiceOperatorV2Impl serviceOperatorV2; /** - * Create a new service. + * Create a new service. This API will create a persistence service. * * @param namespaceId namespace id * @param serviceName service name @@ -110,30 +110,12 @@ public class ServiceController { @RequestParam String serviceName, @RequestParam(required = false) float protectThreshold, @RequestParam(defaultValue = StringUtils.EMPTY) String metadata, @RequestParam(defaultValue = StringUtils.EMPTY) String selector) throws Exception { - - if (serviceManager.getService(namespaceId, serviceName) != null) { - throw new IllegalArgumentException("specified service already exists, serviceName : " + serviceName); - } - - Map metadataMap = new HashMap<>(16); - if (StringUtils.isNotBlank(metadata)) { - metadataMap = UtilsAndCommons.parseMetadata(metadata); - } - - Service service = new Service(serviceName); - service.setProtectThreshold(protectThreshold); - service.setEnabled(true); - service.setMetadata(metadataMap); - service.setSelector(parseSelector(selector)); - service.setNamespaceId(namespaceId); - - // now valid the service. if failed, exception will be thrown - service.setLastModifiedMillis(System.currentTimeMillis()); - service.recalculateChecksum(); - service.validate(); - - serviceManager.addOrReplaceService(service); - + ServiceMetadata serviceMetadata = new ServiceMetadata(); + serviceMetadata.setProtectThreshold(protectThreshold); + serviceMetadata.setSelector(parseSelector(selector)); + serviceMetadata.setExtendData(UtilsAndCommons.parseMetadata(metadata)); + serviceMetadata.setEphemeral(false); + serviceOperatorV2.create(namespaceId, serviceName, serviceMetadata); return "ok"; } @@ -150,8 +132,7 @@ public class ServiceController { public String remove(@RequestParam(defaultValue = Constants.DEFAULT_NAMESPACE_ID) String namespaceId, @RequestParam String serviceName) throws Exception { - serviceManager.easyRemoveService(namespaceId, serviceName); - + serviceOperatorV2.delete(namespaceId, serviceName); return "ok"; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java index 2d1982361..29a42e742 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java @@ -431,13 +431,13 @@ public class ServiceManager implements RecordListener { * * @param namespaceId namespace * @param serviceName service name - * @throws Exception exception + * @throws NacosException exception */ - public void easyRemoveService(String namespaceId, String serviceName) throws Exception { + public void easyRemoveService(String namespaceId, String serviceName) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { - throw new IllegalArgumentException("specified service not exist, serviceName : " + serviceName); + throw new NacosException(NacosException.INVALID_PARAM, "specified service not exist, serviceName : " + serviceName); } consistencyService.remove(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName)); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperator.java b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperator.java index 107b066b2..f3136f61a 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperator.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperator.java @@ -29,6 +29,16 @@ import java.util.List; */ public interface ServiceOperator { + /** + * Create new service. + * + * @param namespaceId namespace id of service + * @param serviceName grouped service name format like 'groupName@@serviceName' + * @param metadata new metadata of service + * @throws NacosException nacos exception during creating + */ + void create(String namespaceId, String serviceName, ServiceMetadata metadata) throws NacosException; + /** * Update service information. Due to service basic information can't be changed, so update should only update the * metadata of service. @@ -39,6 +49,15 @@ public interface ServiceOperator { */ void update(Service service, ServiceMetadata metadata) throws NacosException; + /** + * Delete service. + * + * @param namespaceId namespace id of service + * @param serviceName grouped service name format like 'groupName@@serviceName' + * @throws NacosException nacos exception during delete + */ + void delete(String namespaceId, String serviceName) throws NacosException; + /** * Page list service name. * diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperatorV1Impl.java b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperatorV1Impl.java index f7d26be10..7094647cb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperatorV1Impl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperatorV1Impl.java @@ -41,10 +41,31 @@ public class ServiceOperatorV1Impl implements ServiceOperator { this.serviceManager = serviceManager; } + @Override + public void create(String namespaceId, String serviceName, ServiceMetadata metadata) throws NacosException { + if (serviceManager.getService(namespaceId, serviceName) != null) { + throw new IllegalArgumentException("specified service already exists, serviceName : " + serviceName); + } + com.alibaba.nacos.naming.core.Service service = new com.alibaba.nacos.naming.core.Service(serviceName); + service.setProtectThreshold(metadata.getProtectThreshold()); + service.setEnabled(true); + service.setMetadata(metadata.getExtendData()); + service.setSelector(metadata.getSelector()); + service.setNamespaceId(namespaceId); + + // now valid the service. if failed, exception will be thrown + service.setLastModifiedMillis(System.currentTimeMillis()); + service.recalculateChecksum(); + service.validate(); + + serviceManager.addOrReplaceService(service); + } + @Override public void update(Service service, ServiceMetadata metadata) throws NacosException { String serviceName = service.getGroupedServiceName(); - com.alibaba.nacos.naming.core.Service serviceV1 = serviceManager.getService(service.getNamespace(), serviceName); + com.alibaba.nacos.naming.core.Service serviceV1 = serviceManager + .getService(service.getNamespace(), serviceName); if (serviceV1 == null) { throw new NacosException(NacosException.INVALID_PARAM, "service " + serviceName + " not found!"); } @@ -57,6 +78,11 @@ public class ServiceOperatorV1Impl implements ServiceOperator { serviceManager.addOrReplaceService(serviceV1); } + @Override + public void delete(String namespaceId, String serviceName) throws NacosException { + serviceManager.easyRemoveService(namespaceId, serviceName); + } + @Override public List listService(String namespaceId, String groupName, String selector, int pageSize, int pageNo) throws NacosException { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperatorV2Impl.java b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperatorV2Impl.java index 54adca98b..45819595c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperatorV2Impl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceOperatorV2Impl.java @@ -17,6 +17,7 @@ package com.alibaba.nacos.naming.core; import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.naming.core.v2.ServiceManager; import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService; import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata; @@ -44,6 +45,16 @@ public class ServiceOperatorV2Impl implements ServiceOperator { this.metadataOperateService = metadataOperateService; } + @Override + public void create(String namespaceId, String serviceName, ServiceMetadata metadata) throws NacosException { + Service service = getServiceFromGroupedServiceName(namespaceId, serviceName, metadata.isEphemeral()); + if (ServiceManager.getInstance().containSingleton(service)) { + throw new NacosException(NacosException.INVALID_PARAM, + String.format("specified service %s already exists!", service.getGroupedServiceName())); + } + metadataOperateService.updateServiceMetadata(service, metadata); + } + @Override public void update(Service service, ServiceMetadata metadata) throws NacosException { if (!ServiceManager.getInstance().containSingleton(service)) { @@ -53,6 +64,11 @@ public class ServiceOperatorV2Impl implements ServiceOperator { metadataOperateService.updateServiceMetadata(service, metadata); } + @Override + public void delete(String namespaceId, String serviceName) throws NacosException { + metadataOperateService.deleteServiceMetadata(getServiceFromGroupedServiceName(namespaceId, serviceName, true)); + } + @Override @SuppressWarnings("unchecked") public List listService(String namespaceId, String groupName, String selector, int pageSize, int pageNo) @@ -75,4 +91,10 @@ public class ServiceOperatorV2Impl implements ServiceOperator { } return result; } + + private Service getServiceFromGroupedServiceName(String namespaceId, String groupedServiceName, boolean ephemeral) { + String groupName = NamingUtils.getGroupName(groupedServiceName); + String serviceName = NamingUtils.getServiceName(groupedServiceName); + return Service.newService(namespaceId, groupName, serviceName, ephemeral); + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/ExpiredMetadataCleaner.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/ExpiredMetadataCleaner.java index 0c9d88b55..ef66547e6 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/ExpiredMetadataCleaner.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/cleaner/ExpiredMetadataCleaner.java @@ -66,11 +66,11 @@ public class ExpiredMetadataCleaner extends AbstractNamingCleaner { private void removeExpiredMetadata(ExpiredMetadataInfo expiredInfo) { Loggers.SRV_LOG.info("Remove expired metadata {}", expiredInfo); if (null == expiredInfo.getInstanceId()) { - if (metadataManager.containInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId())) { + if (metadataManager.containServiceMetadata(expiredInfo.getService())) { metadataOperateService.deleteServiceMetadata(expiredInfo.getService()); } } else { - if (metadataManager.containServiceMetadata(expiredInfo.getService())) { + if (metadataManager.containInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId())) { metadataOperateService.deleteInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId()); } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ServiceStorage.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ServiceStorage.java index 482605c1d..05905930f 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ServiceStorage.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/index/ServiceStorage.java @@ -99,7 +99,7 @@ public class ServiceStorage { } private List getAllInstancesFromIndex(Service service) { - List result = new LinkedList<>(); + Set result = new HashSet<>(); Set clusters = new HashSet<>(); for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) { Optional instancePublishInfo = getInstanceInfo(each, service); @@ -111,7 +111,7 @@ public class ServiceStorage { } // cache clusters of this service serviceClusterIndex.put(service, clusters); - return result; + return new LinkedList<>(result); } private Optional getInstanceInfo(String clientId, Service service) { @@ -135,7 +135,8 @@ public class ServiceStorage { instanceMetadata.put(entry.getKey(), entry.getValue().toString()); } } - Optional metadata = metadataManager.getInstanceMetadata(service, instanceInfo.getInstanceId()); + Optional metadata = metadataManager + .getInstanceMetadata(service, instanceInfo.getInstanceId()); if (metadata.isPresent()) { result.setEnabled(metadata.get().isEnabled()); result.setWeight(metadata.get().getWeight()); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/InstanceMetadataProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/InstanceMetadataProcessor.java index 13eaf38f7..fd45ff4f9 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/InstanceMetadataProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/InstanceMetadataProcessor.java @@ -62,7 +62,7 @@ public class InstanceMetadataProcessor extends RequestProcessor4CP { this.processType = TypeUtils.parameterize(MetadataOperation.class, InstanceMetadata.class); this.lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); - protocolManager.getCpProtocol().addLogProcessors(Collections.singletonList(this)); + protocolManager.getCpProtocol().addRequestProcessors(Collections.singletonList(this)); } @Override diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataManager.java index 5f6e64f0e..09d3a5c19 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataManager.java @@ -20,6 +20,7 @@ import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.listener.SmartSubscriber; import com.alibaba.nacos.common.utils.ConcurrentHashSet; +import com.alibaba.nacos.naming.core.v2.ServiceManager; import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent; import com.alibaba.nacos.naming.core.v2.event.metadata.MetadataEvent; import com.alibaba.nacos.naming.core.v2.pojo.Service; @@ -41,12 +42,12 @@ import java.util.concurrent.ConcurrentMap; @Component public class NamingMetadataManager extends SmartSubscriber { - private final ConcurrentMap serviceMetadataMap; - - private final ConcurrentMap> instanceMetadataMap; - private final Set expiredMetadataInfos; + private ConcurrentMap serviceMetadataMap; + + private ConcurrentMap> instanceMetadataMap; + public NamingMetadataManager() { serviceMetadataMap = new ConcurrentHashMap<>(1 << 10); instanceMetadataMap = new ConcurrentHashMap<>(1 << 10); @@ -75,7 +76,9 @@ public class NamingMetadataManager extends SmartSubscriber { } /** - * Get service metadata for {@link Service}. + * Get service metadata for {@link Service}, which is the original metadata object. + * + *

This method should use only query, can't modified metadata. * * @param service service * @return service metadata @@ -85,7 +88,9 @@ public class NamingMetadataManager extends SmartSubscriber { } /** - * Get instance metadata for instance of {@link Service}. + * Get instance metadata for instance of {@link Service}, which is the original metadata object. + * + *

This method should use only query, can't modified metadata. * * @param service service * @param instanceId instance id @@ -106,6 +111,7 @@ public class NamingMetadataManager extends SmartSubscriber { * @param serviceMetadata new service metadata */ public void updateServiceMetadata(Service service, ServiceMetadata serviceMetadata) { + service.incrementRevision(); serviceMetadataMap.put(service, serviceMetadata); } @@ -171,12 +177,31 @@ public class NamingMetadataManager extends SmartSubscriber { return result; } - public void loadServiceMetadataSnapshot(Map snapshot) { - serviceMetadataMap.putAll(snapshot); + /** + * Load service metadata snapshot. + * + *

Service metadata need load back the service. + * + * @param snapshot snapshot + */ + public void loadServiceMetadataSnapshot(ConcurrentMap snapshot) { + for (Service each : snapshot.keySet()) { + ServiceManager.getInstance().getSingleton(each); + } + ConcurrentMap oldSnapshot = serviceMetadataMap; + serviceMetadataMap = snapshot; + oldSnapshot.clear(); } - public void loadInstanceMetadataSnapshot(Map> snapshot) { - instanceMetadataMap.putAll(snapshot); + /** + * Load instance metadata snapshot. + * + * @param snapshot snapshot + */ + public void loadInstanceMetadataSnapshot(ConcurrentMap> snapshot) { + ConcurrentMap> oldSnapshot = instanceMetadataMap; + instanceMetadataMap = snapshot; + oldSnapshot.clear(); } public Set getExpiredMetadataInfos() { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataOperateService.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataOperateService.java index 9d5523f24..0cce9b7c0 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataOperateService.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/NamingMetadataOperateService.java @@ -117,7 +117,7 @@ public class NamingMetadataOperateService { private void submitMetadataOperation(WriteRequest operationLog) { try { - Response response = cpProtocol.submit(operationLog); + Response response = cpProtocol.write(operationLog); if (!response.getSuccess()) { throw new NacosRuntimeException(NacosException.SERVER_ERROR, "do metadata operation failed " + response.getErrMsg()); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadata.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadata.java index a9f084e78..121875010 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadata.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadata.java @@ -33,6 +33,11 @@ public class ServiceMetadata implements Serializable { private static final long serialVersionUID = -6605609934135069566L; + /** + * Service is ephemeral or persistence. + */ + private boolean ephemeral = true; + /** * protect threshold. */ @@ -47,6 +52,14 @@ public class ServiceMetadata implements Serializable { private Map clusters = new ConcurrentHashMap<>(1); + public boolean isEphemeral() { + return ephemeral; + } + + public void setEphemeral(boolean ephemeral) { + this.ephemeral = ephemeral; + } + public float getProtectThreshold() { return protectThreshold; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadataProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadataProcessor.java index a360974be..fedbb4e5e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadataProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/ServiceMetadataProcessor.java @@ -25,6 +25,7 @@ import com.alibaba.nacos.consistency.entity.Response; import com.alibaba.nacos.consistency.entity.WriteRequest; import com.alibaba.nacos.consistency.snapshot.SnapshotOperation; import com.alibaba.nacos.core.distributed.ProtocolManager; +import com.alibaba.nacos.naming.core.v2.ServiceManager; import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.utils.Constants; import org.apache.commons.lang3.reflect.TypeUtils; @@ -33,6 +34,7 @@ import org.springframework.stereotype.Component; import java.lang.reflect.Type; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -60,7 +62,7 @@ public class ServiceMetadataProcessor extends RequestProcessor4CP { this.processType = TypeUtils.parameterize(MetadataOperation.class, ServiceMetadata.class); this.lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); - protocolManager.getCpProtocol().addLogProcessors(Collections.singletonList(this)); + protocolManager.getCpProtocol().addRequestProcessors(Collections.singletonList(this)); } @Override @@ -99,13 +101,42 @@ public class ServiceMetadataProcessor extends RequestProcessor4CP { } private void updateServiceMetadata(MetadataOperation op) { - Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName()); - namingMetadataManager.updateServiceMetadata(service, op.getMetadata()); + Service service = Service + .newService(op.getNamespace(), op.getGroup(), op.getServiceName(), op.getMetadata().isEphemeral()); + Optional currentMetadata = namingMetadataManager.getServiceMetadata(service); + if (currentMetadata.isPresent()) { + ServiceMetadata newMetadata = mergeMetadata(currentMetadata.get(), op.getMetadata()); + Service singleton = ServiceManager.getInstance().getSingleton(service); + namingMetadataManager.updateServiceMetadata(singleton, newMetadata); + } else { + Service singleton = ServiceManager.getInstance().getSingleton(service); + namingMetadataManager.updateServiceMetadata(singleton, op.getMetadata()); + } + } + + /** + * Do not modified old metadata directly to avoid read half status. + * + *

Ephemeral variable should only use the value the metadata create. + * + * @param oldMetadata old metadata + * @param newMetadata new metadata + * @return merged metadata + */ + private ServiceMetadata mergeMetadata(ServiceMetadata oldMetadata, ServiceMetadata newMetadata) { + ServiceMetadata result = new ServiceMetadata(); + result.setEphemeral(oldMetadata.isEphemeral()); + result.setClusters(oldMetadata.getClusters()); + result.setProtectThreshold(newMetadata.getProtectThreshold()); + result.setSelector(newMetadata.getSelector()); + result.setExtendData(newMetadata.getExtendData()); + return result; } private void deleteServiceMetadata(MetadataOperation op) { Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName()); namingMetadataManager.removeServiceMetadata(service); + ServiceManager.getInstance().removeSingleton(service); } @Override diff --git a/pom.xml b/pom.xml index 59a731f05..1b8844e54 100644 --- a/pom.xml +++ b/pom.xml @@ -1141,71 +1141,16 @@ - - - - - - - - - - - - - - - central - http://mvnrepo.alibaba-inc.com/mvn/repository - - true - - - false - - - - snapshots - http://mvnrepo.alibaba-inc.com/mvn/repository - - false - - - true - - - - - - central - http://mvnrepo.alibaba-inc.com/mvn/repository - - true - - - false - - - - snapshots - http://mvnrepo.alibaba-inc.com/mvn/repository - - false - - - true - - - - - releases - http://mvnrepo.alibaba-inc.com/mvn/releases - - snapshots - http://mvnrepo.alibaba-inc.com/mvn/snapshots + + sona + https://oss.sonatype.org/content/repositories/snapshots/ + + sona + https://oss.sonatype.org/service/local/staging/deploy/maven2/ +