Merge pull request #4438 from KomachiSion/feature_support_grpc_core

Adapt create service and delete service in ServiceController
This commit is contained in:
杨翊 SionYang 2020-12-09 15:10:26 +08:00 committed by GitHub
commit 81762c9817
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 218 additions and 134 deletions

View File

@ -205,7 +205,7 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen
NotifyCenter.registerToPublisher(ConfigDumpEvent.class, NotifyCenter.ringBufferSize);
NotifyCenter.registerSubscriber(new DumpConfigHandler());
this.protocol.addLogProcessors(Collections.singletonList(this));
this.protocol.addRequestProcessors(Collections.singletonList(this));
LogUtil.DEFAULT_LOG.info("use DistributedTransactionServicesImpl");
}
@ -390,7 +390,7 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen
if (submit) {
List<ModifyRequest> requests = batchUpdate.stream().map(ModifyRequest::new)
.collect(Collectors.toList());
CompletableFuture<Response> future = protocol.submitAsync(WriteRequest.newBuilder().setGroup(group())
CompletableFuture<Response> future = protocol.writeAsync(WriteRequest.newBuilder().setGroup(group())
.setData(ByteString.copyFrom(serializer.serialize(requests)))
.putExtendInfo(DATA_IMPORT_KEY, Boolean.TRUE.toString()).build());
futures.add(future);
@ -432,14 +432,14 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen
.putAllExtendInfo(EmbeddedStorageContextUtils.getCurrentExtendInfo())
.setType(sqlContext.getClass().getCanonicalName()).build();
if (Objects.isNull(consumer)) {
Response response = this.protocol.submit(request);
Response response = this.protocol.write(request);
if (response.getSuccess()) {
return true;
}
LogUtil.DEFAULT_LOG.error("execute sql modify operation failed : {}", response.getErrMsg());
return false;
} else {
this.protocol.submitAsync(request).whenComplete((BiConsumer<Response, Throwable>) (response, ex) -> {
this.protocol.writeAsync(request).whenComplete((BiConsumer<Response, Throwable>) (response, ex) -> {
String errMsg = Objects.isNull(ex) ? response.getErrMsg() : ExceptionUtil.getCause(ex).getMessage();
consumer.accept(response.getSuccess(),
StringUtils.isBlank(errMsg) ? null : new NJdbcException(errMsg));

View File

@ -48,11 +48,11 @@ public interface ConsistencyProtocol<T extends Config, P extends RequestProcesso
void init(T config);
/**
* Add a log handler.
* Add a request handler.
*
* @param processors {@link RequestProcessor}
*/
void addLogProcessors(Collection<P> processors);
void addRequestProcessors(Collection<P> processors);
/**
* Copy of metadata information for this consensus protocol.
@ -87,7 +87,7 @@ public interface ConsistencyProtocol<T extends Config, P extends RequestProcesso
* @return submit operation result {@link Response}
* @throws Exception {@link Exception}
*/
Response submit(WriteRequest request) throws Exception;
Response write(WriteRequest request) throws Exception;
/**
* Data submission operation, returning submission results asynchronously.
@ -97,7 +97,7 @@ public interface ConsistencyProtocol<T extends Config, P extends RequestProcesso
* @return {@link CompletableFuture} submit result
* @throws Exception when submit throw Exception
*/
CompletableFuture<Response> submitAsync(WriteRequest request);
CompletableFuture<Response> writeAsync(WriteRequest request);
/**
* New member list .

View File

@ -159,7 +159,7 @@ public class JRaftProtocol extends AbstractConsistencyProtocol<RaftConfig, Reque
}
@Override
public void addLogProcessors(Collection<RequestProcessor4CP> processors) {
public void addRequestProcessors(Collection<RequestProcessor4CP> processors) {
raftServer.createMultiRaftGroup(processors);
}
@ -175,14 +175,14 @@ public class JRaftProtocol extends AbstractConsistencyProtocol<RaftConfig, Reque
}
@Override
public Response submit(WriteRequest request) throws Exception {
CompletableFuture<Response> future = submitAsync(request);
public Response write(WriteRequest request) throws Exception {
CompletableFuture<Response> future = writeAsync(request);
// Here you wait for 10 seconds, as long as possible, for the request to complete
return future.get(10_000L, TimeUnit.MILLISECONDS);
}
@Override
public CompletableFuture<Response> submitAsync(WriteRequest request) {
public CompletableFuture<Response> writeAsync(WriteRequest request) {
return raftServer.commit(request.getGroup(), request, new CompletableFuture<>());
}

View File

@ -28,6 +28,7 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor;
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@Deprecated
public class NacosGetRequestProcessor extends AbstractProcessor implements RpcProcessor<GetRequest> {
private static final String INTEREST_NAME = GetRequest.class.getName();

View File

@ -28,6 +28,7 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor;
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@Deprecated
public class NacosLogProcessor extends AbstractProcessor implements RpcProcessor<Log> {
private static final String INTEREST_NAME = Log.class.getName();

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.core.distributed.raft.processor;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.core.distributed.raft.JRaftServer;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
@ -28,17 +29,22 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor;
*/
public class NacosReadRequestProcessor extends AbstractProcessor implements RpcProcessor<ReadRequest> {
public NacosReadRequestProcessor(Serializer serializer) {
private static final String INTEREST_NAME = ReadRequest.class.getName();
private final JRaftServer server;
public NacosReadRequestProcessor(JRaftServer server, Serializer serializer) {
super(serializer);
this.server = server;
}
@Override
public void handleRequest(RpcContext rpcCtx, ReadRequest request) {
handleRequest(server, request.getGroup(), rpcCtx, request);
}
@Override
public String interest() {
return null;
return INTEREST_NAME;
}
}

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.core.distributed.raft.processor;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.core.distributed.raft.JRaftServer;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
@ -28,17 +29,22 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor;
*/
public class NacosWriteRequestProcessor extends AbstractProcessor implements RpcProcessor<WriteRequest> {
public NacosWriteRequestProcessor(Serializer serializer) {
private static final String INTEREST_NAME = WriteRequest.class.getName();
private final JRaftServer server;
public NacosWriteRequestProcessor(JRaftServer server, Serializer serializer) {
super(serializer);
this.server = server;
}
@Override
public void handleRequest(RpcContext rpcCtx, WriteRequest request) {
handleRequest(server, request.getGroup(), rpcCtx, request);
}
@Override
public String interest() {
return null;
return INTEREST_NAME;
}
}

View File

@ -27,6 +27,8 @@ import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.distributed.raft.JRaftServer;
import com.alibaba.nacos.core.distributed.raft.processor.NacosGetRequestProcessor;
import com.alibaba.nacos.core.distributed.raft.processor.NacosLogProcessor;
import com.alibaba.nacos.core.distributed.raft.processor.NacosReadRequestProcessor;
import com.alibaba.nacos.core.distributed.raft.processor.NacosWriteRequestProcessor;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.utils.DiskUtils;
@ -79,9 +81,14 @@ public class JRaftUtils {
RaftRpcServerFactory.addRaftRequestProcessors(rpcServer, RaftExecutor.getRaftCoreExecutor(),
RaftExecutor.getRaftCliServiceExecutor());
// Deprecated
rpcServer.registerProcessor(new NacosLogProcessor(server, SerializeFactory.getDefault()));
// Deprecated
rpcServer.registerProcessor(new NacosGetRequestProcessor(server, SerializeFactory.getDefault()));
rpcServer.registerProcessor(new NacosWriteRequestProcessor(server, SerializeFactory.getDefault()));
rpcServer.registerProcessor(new NacosReadRequestProcessor(server, SerializeFactory.getDefault()));
return rpcServer;
}

View File

@ -65,7 +65,7 @@ public class PersistentServiceProcessor extends BasePersistentServiceProcessor {
@Override
public void afterConstruct() {
super.afterConstruct();
this.protocol.addLogProcessors(Collections.singletonList(this));
this.protocol.addRequestProcessors(Collections.singletonList(this));
this.protocol.protocolMetaData()
.subscribe(Constants.NAMING_PERSISTENT_SERVICE_GROUP, MetadataKey.LEADER_META_DATA,
(o, arg) -> hasLeader = StringUtils.isNotBlank(String.valueOf(arg)));
@ -95,7 +95,7 @@ public class PersistentServiceProcessor extends BasePersistentServiceProcessor {
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
try {
protocol.submit(request);
protocol.write(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
@ -108,7 +108,7 @@ public class PersistentServiceProcessor extends BasePersistentServiceProcessor {
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build();
try {
protocol.submit(request);
protocol.write(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}

View File

@ -94,7 +94,7 @@ public class ServiceController {
private ServiceOperatorV2Impl serviceOperatorV2;
/**
* Create a new service.
* Create a new service. This API will create a persistence service.
*
* @param namespaceId namespace id
* @param serviceName service name
@ -110,30 +110,12 @@ public class ServiceController {
@RequestParam String serviceName, @RequestParam(required = false) float protectThreshold,
@RequestParam(defaultValue = StringUtils.EMPTY) String metadata,
@RequestParam(defaultValue = StringUtils.EMPTY) String selector) throws Exception {
if (serviceManager.getService(namespaceId, serviceName) != null) {
throw new IllegalArgumentException("specified service already exists, serviceName : " + serviceName);
}
Map<String, String> metadataMap = new HashMap<>(16);
if (StringUtils.isNotBlank(metadata)) {
metadataMap = UtilsAndCommons.parseMetadata(metadata);
}
Service service = new Service(serviceName);
service.setProtectThreshold(protectThreshold);
service.setEnabled(true);
service.setMetadata(metadataMap);
service.setSelector(parseSelector(selector));
service.setNamespaceId(namespaceId);
// now valid the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
service.validate();
serviceManager.addOrReplaceService(service);
ServiceMetadata serviceMetadata = new ServiceMetadata();
serviceMetadata.setProtectThreshold(protectThreshold);
serviceMetadata.setSelector(parseSelector(selector));
serviceMetadata.setExtendData(UtilsAndCommons.parseMetadata(metadata));
serviceMetadata.setEphemeral(false);
serviceOperatorV2.create(namespaceId, serviceName, serviceMetadata);
return "ok";
}
@ -150,8 +132,7 @@ public class ServiceController {
public String remove(@RequestParam(defaultValue = Constants.DEFAULT_NAMESPACE_ID) String namespaceId,
@RequestParam String serviceName) throws Exception {
serviceManager.easyRemoveService(namespaceId, serviceName);
serviceOperatorV2.delete(namespaceId, serviceName);
return "ok";
}

View File

@ -431,13 +431,13 @@ public class ServiceManager implements RecordListener<Service> {
*
* @param namespaceId namespace
* @param serviceName service name
* @throws Exception exception
* @throws NacosException exception
*/
public void easyRemoveService(String namespaceId, String serviceName) throws Exception {
public void easyRemoveService(String namespaceId, String serviceName) throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new IllegalArgumentException("specified service not exist, serviceName : " + serviceName);
throw new NacosException(NacosException.INVALID_PARAM, "specified service not exist, serviceName : " + serviceName);
}
consistencyService.remove(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName));

View File

@ -29,6 +29,16 @@ import java.util.List;
*/
public interface ServiceOperator {
/**
* Create new service.
*
* @param namespaceId namespace id of service
* @param serviceName grouped service name format like 'groupName@@serviceName'
* @param metadata new metadata of service
* @throws NacosException nacos exception during creating
*/
void create(String namespaceId, String serviceName, ServiceMetadata metadata) throws NacosException;
/**
* Update service information. Due to service basic information can't be changed, so update should only update the
* metadata of service.
@ -39,6 +49,15 @@ public interface ServiceOperator {
*/
void update(Service service, ServiceMetadata metadata) throws NacosException;
/**
* Delete service.
*
* @param namespaceId namespace id of service
* @param serviceName grouped service name format like 'groupName@@serviceName'
* @throws NacosException nacos exception during delete
*/
void delete(String namespaceId, String serviceName) throws NacosException;
/**
* Page list service name.
*

View File

@ -41,10 +41,31 @@ public class ServiceOperatorV1Impl implements ServiceOperator {
this.serviceManager = serviceManager;
}
@Override
public void create(String namespaceId, String serviceName, ServiceMetadata metadata) throws NacosException {
if (serviceManager.getService(namespaceId, serviceName) != null) {
throw new IllegalArgumentException("specified service already exists, serviceName : " + serviceName);
}
com.alibaba.nacos.naming.core.Service service = new com.alibaba.nacos.naming.core.Service(serviceName);
service.setProtectThreshold(metadata.getProtectThreshold());
service.setEnabled(true);
service.setMetadata(metadata.getExtendData());
service.setSelector(metadata.getSelector());
service.setNamespaceId(namespaceId);
// now valid the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
service.validate();
serviceManager.addOrReplaceService(service);
}
@Override
public void update(Service service, ServiceMetadata metadata) throws NacosException {
String serviceName = service.getGroupedServiceName();
com.alibaba.nacos.naming.core.Service serviceV1 = serviceManager.getService(service.getNamespace(), serviceName);
com.alibaba.nacos.naming.core.Service serviceV1 = serviceManager
.getService(service.getNamespace(), serviceName);
if (serviceV1 == null) {
throw new NacosException(NacosException.INVALID_PARAM, "service " + serviceName + " not found!");
}
@ -57,6 +78,11 @@ public class ServiceOperatorV1Impl implements ServiceOperator {
serviceManager.addOrReplaceService(serviceV1);
}
@Override
public void delete(String namespaceId, String serviceName) throws NacosException {
serviceManager.easyRemoveService(namespaceId, serviceName);
}
@Override
public List<String> listService(String namespaceId, String groupName, String selector, int pageSize, int pageNo)
throws NacosException {

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService;
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
@ -44,6 +45,16 @@ public class ServiceOperatorV2Impl implements ServiceOperator {
this.metadataOperateService = metadataOperateService;
}
@Override
public void create(String namespaceId, String serviceName, ServiceMetadata metadata) throws NacosException {
Service service = getServiceFromGroupedServiceName(namespaceId, serviceName, metadata.isEphemeral());
if (ServiceManager.getInstance().containSingleton(service)) {
throw new NacosException(NacosException.INVALID_PARAM,
String.format("specified service %s already exists!", service.getGroupedServiceName()));
}
metadataOperateService.updateServiceMetadata(service, metadata);
}
@Override
public void update(Service service, ServiceMetadata metadata) throws NacosException {
if (!ServiceManager.getInstance().containSingleton(service)) {
@ -53,6 +64,11 @@ public class ServiceOperatorV2Impl implements ServiceOperator {
metadataOperateService.updateServiceMetadata(service, metadata);
}
@Override
public void delete(String namespaceId, String serviceName) throws NacosException {
metadataOperateService.deleteServiceMetadata(getServiceFromGroupedServiceName(namespaceId, serviceName, true));
}
@Override
@SuppressWarnings("unchecked")
public List<String> listService(String namespaceId, String groupName, String selector, int pageSize, int pageNo)
@ -75,4 +91,10 @@ public class ServiceOperatorV2Impl implements ServiceOperator {
}
return result;
}
private Service getServiceFromGroupedServiceName(String namespaceId, String groupedServiceName, boolean ephemeral) {
String groupName = NamingUtils.getGroupName(groupedServiceName);
String serviceName = NamingUtils.getServiceName(groupedServiceName);
return Service.newService(namespaceId, groupName, serviceName, ephemeral);
}
}

View File

@ -66,11 +66,11 @@ public class ExpiredMetadataCleaner extends AbstractNamingCleaner {
private void removeExpiredMetadata(ExpiredMetadataInfo expiredInfo) {
Loggers.SRV_LOG.info("Remove expired metadata {}", expiredInfo);
if (null == expiredInfo.getInstanceId()) {
if (metadataManager.containInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId())) {
if (metadataManager.containServiceMetadata(expiredInfo.getService())) {
metadataOperateService.deleteServiceMetadata(expiredInfo.getService());
}
} else {
if (metadataManager.containServiceMetadata(expiredInfo.getService())) {
if (metadataManager.containInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId())) {
metadataOperateService.deleteInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId());
}
}

View File

@ -99,7 +99,7 @@ public class ServiceStorage {
}
private List<Instance> getAllInstancesFromIndex(Service service) {
List<Instance> result = new LinkedList<>();
Set<Instance> result = new HashSet<>();
Set<String> clusters = new HashSet<>();
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
@ -111,7 +111,7 @@ public class ServiceStorage {
}
// cache clusters of this service
serviceClusterIndex.put(service, clusters);
return result;
return new LinkedList<>(result);
}
private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
@ -135,7 +135,8 @@ public class ServiceStorage {
instanceMetadata.put(entry.getKey(), entry.getValue().toString());
}
}
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(service, instanceInfo.getInstanceId());
Optional<InstanceMetadata> metadata = metadataManager
.getInstanceMetadata(service, instanceInfo.getInstanceId());
if (metadata.isPresent()) {
result.setEnabled(metadata.get().isEnabled());
result.setWeight(metadata.get().getWeight());

View File

@ -62,7 +62,7 @@ public class InstanceMetadataProcessor extends RequestProcessor4CP {
this.processType = TypeUtils.parameterize(MetadataOperation.class, InstanceMetadata.class);
this.lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
protocolManager.getCpProtocol().addLogProcessors(Collections.singletonList(this));
protocolManager.getCpProtocol().addRequestProcessors(Collections.singletonList(this));
}
@Override

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.SmartSubscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
import com.alibaba.nacos.naming.core.v2.event.metadata.MetadataEvent;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
@ -41,12 +42,12 @@ import java.util.concurrent.ConcurrentMap;
@Component
public class NamingMetadataManager extends SmartSubscriber {
private final ConcurrentMap<Service, ServiceMetadata> serviceMetadataMap;
private final ConcurrentMap<Service, ConcurrentMap<String, InstanceMetadata>> instanceMetadataMap;
private final Set<ExpiredMetadataInfo> expiredMetadataInfos;
private ConcurrentMap<Service, ServiceMetadata> serviceMetadataMap;
private ConcurrentMap<Service, ConcurrentMap<String, InstanceMetadata>> instanceMetadataMap;
public NamingMetadataManager() {
serviceMetadataMap = new ConcurrentHashMap<>(1 << 10);
instanceMetadataMap = new ConcurrentHashMap<>(1 << 10);
@ -75,7 +76,9 @@ public class NamingMetadataManager extends SmartSubscriber {
}
/**
* Get service metadata for {@link Service}.
* Get service metadata for {@link Service}, which is the original metadata object.
*
* <p>This method should use only query, can't modified metadata.
*
* @param service service
* @return service metadata
@ -85,7 +88,9 @@ public class NamingMetadataManager extends SmartSubscriber {
}
/**
* Get instance metadata for instance of {@link Service}.
* Get instance metadata for instance of {@link Service}, which is the original metadata object.
*
* <p>This method should use only query, can't modified metadata.
*
* @param service service
* @param instanceId instance id
@ -106,6 +111,7 @@ public class NamingMetadataManager extends SmartSubscriber {
* @param serviceMetadata new service metadata
*/
public void updateServiceMetadata(Service service, ServiceMetadata serviceMetadata) {
service.incrementRevision();
serviceMetadataMap.put(service, serviceMetadata);
}
@ -171,12 +177,31 @@ public class NamingMetadataManager extends SmartSubscriber {
return result;
}
public void loadServiceMetadataSnapshot(Map<Service, ServiceMetadata> snapshot) {
serviceMetadataMap.putAll(snapshot);
/**
* Load service metadata snapshot.
*
* <p>Service metadata need load back the service.
*
* @param snapshot snapshot
*/
public void loadServiceMetadataSnapshot(ConcurrentMap<Service, ServiceMetadata> snapshot) {
for (Service each : snapshot.keySet()) {
ServiceManager.getInstance().getSingleton(each);
}
ConcurrentMap<Service, ServiceMetadata> oldSnapshot = serviceMetadataMap;
serviceMetadataMap = snapshot;
oldSnapshot.clear();
}
public void loadInstanceMetadataSnapshot(Map<Service, ConcurrentMap<String, InstanceMetadata>> snapshot) {
instanceMetadataMap.putAll(snapshot);
/**
* Load instance metadata snapshot.
*
* @param snapshot snapshot
*/
public void loadInstanceMetadataSnapshot(ConcurrentMap<Service, ConcurrentMap<String, InstanceMetadata>> snapshot) {
ConcurrentMap<Service, ConcurrentMap<String, InstanceMetadata>> oldSnapshot = instanceMetadataMap;
instanceMetadataMap = snapshot;
oldSnapshot.clear();
}
public Set<ExpiredMetadataInfo> getExpiredMetadataInfos() {

View File

@ -117,7 +117,7 @@ public class NamingMetadataOperateService {
private void submitMetadataOperation(WriteRequest operationLog) {
try {
Response response = cpProtocol.submit(operationLog);
Response response = cpProtocol.write(operationLog);
if (!response.getSuccess()) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR,
"do metadata operation failed " + response.getErrMsg());

View File

@ -33,6 +33,11 @@ public class ServiceMetadata implements Serializable {
private static final long serialVersionUID = -6605609934135069566L;
/**
* Service is ephemeral or persistence.
*/
private boolean ephemeral = true;
/**
* protect threshold.
*/
@ -47,6 +52,14 @@ public class ServiceMetadata implements Serializable {
private Map<String, ClusterMetadata> clusters = new ConcurrentHashMap<>(1);
public boolean isEphemeral() {
return ephemeral;
}
public void setEphemeral(boolean ephemeral) {
this.ephemeral = ephemeral;
}
public float getProtectThreshold() {
return protectThreshold;
}

View File

@ -25,6 +25,7 @@ import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.utils.Constants;
import org.apache.commons.lang3.reflect.TypeUtils;
@ -33,6 +34,7 @@ import org.springframework.stereotype.Component;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@ -60,7 +62,7 @@ public class ServiceMetadataProcessor extends RequestProcessor4CP {
this.processType = TypeUtils.parameterize(MetadataOperation.class, ServiceMetadata.class);
this.lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
protocolManager.getCpProtocol().addLogProcessors(Collections.singletonList(this));
protocolManager.getCpProtocol().addRequestProcessors(Collections.singletonList(this));
}
@Override
@ -99,13 +101,42 @@ public class ServiceMetadataProcessor extends RequestProcessor4CP {
}
private void updateServiceMetadata(MetadataOperation<ServiceMetadata> op) {
Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName());
namingMetadataManager.updateServiceMetadata(service, op.getMetadata());
Service service = Service
.newService(op.getNamespace(), op.getGroup(), op.getServiceName(), op.getMetadata().isEphemeral());
Optional<ServiceMetadata> currentMetadata = namingMetadataManager.getServiceMetadata(service);
if (currentMetadata.isPresent()) {
ServiceMetadata newMetadata = mergeMetadata(currentMetadata.get(), op.getMetadata());
Service singleton = ServiceManager.getInstance().getSingleton(service);
namingMetadataManager.updateServiceMetadata(singleton, newMetadata);
} else {
Service singleton = ServiceManager.getInstance().getSingleton(service);
namingMetadataManager.updateServiceMetadata(singleton, op.getMetadata());
}
}
/**
* Do not modified old metadata directly to avoid read half status.
*
* <p>Ephemeral variable should only use the value the metadata create.
*
* @param oldMetadata old metadata
* @param newMetadata new metadata
* @return merged metadata
*/
private ServiceMetadata mergeMetadata(ServiceMetadata oldMetadata, ServiceMetadata newMetadata) {
ServiceMetadata result = new ServiceMetadata();
result.setEphemeral(oldMetadata.isEphemeral());
result.setClusters(oldMetadata.getClusters());
result.setProtectThreshold(newMetadata.getProtectThreshold());
result.setSelector(newMetadata.getSelector());
result.setExtendData(newMetadata.getExtendData());
return result;
}
private void deleteServiceMetadata(MetadataOperation<ServiceMetadata> op) {
Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName());
namingMetadataManager.removeServiceMetadata(service);
ServiceManager.getInstance().removeSingleton(service);
}
@Override

69
pom.xml
View File

@ -1141,71 +1141,16 @@
</dependencies>
</dependencyManagement>
<!--<distributionManagement>-->
<!--<snapshotRepository>-->
<!--&lt;!&ndash; 这里的ID一定要在maven setting文件中存在于server下的ID &ndash;&gt;-->
<!--<id>sona</id>-->
<!--<url>https://oss.sonatype.org/content/repositories/snapshots/</url>-->
<!--</snapshotRepository>-->
<!--<repository>-->
<!--<id>sona</id>-->
<!--<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>-->
<!--</repository>-->
<!--</distributionManagement>-->
<repositories>
<repository>
<id>central</id>
<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>snapshots</id>
<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>central</id>
<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>snapshots</id>
<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<distributionManagement>
<repository>
<id>releases</id>
<url>http://mvnrepo.alibaba-inc.com/mvn/releases</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<url>http://mvnrepo.alibaba-inc.com/mvn/snapshots</url>
<!-- 这里的ID一定要在maven setting文件中存在于server下的ID -->
<id>sona</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</snapshotRepository>
<repository>
<id>sona</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>
</project>