Refactor auto cleaner to adapt clean metadata (#4354)
* Add Auto clean expired metadata * Refactor and move old EmptyServiceAutoCleaner
This commit is contained in:
parent
826450532e
commit
ba93573eb1
@ -29,6 +29,7 @@ import com.alibaba.nacos.naming.consistency.KeyBuilder;
|
||||
import com.alibaba.nacos.naming.consistency.RecordListener;
|
||||
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
|
||||
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
|
||||
import com.alibaba.nacos.naming.core.v2.cleaner.EmptyServiceAutoCleaner;
|
||||
import com.alibaba.nacos.naming.misc.GlobalExecutor;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.naming.misc.Message;
|
||||
@ -67,7 +68,6 @@ import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.UPDATE_INSTANCE_METADATA_ACTION_REMOVE;
|
||||
import static com.alibaba.nacos.naming.misc.UtilsAndCommons.UPDATE_INSTANCE_METADATA_ACTION_UPDATE;
|
||||
@ -104,8 +104,6 @@ public class ServiceManager implements RecordListener<Service> {
|
||||
|
||||
private final RaftPeerSet raftPeerSet;
|
||||
|
||||
private int maxFinalizeCount = 3;
|
||||
|
||||
private final Object putServiceLock = new Object();
|
||||
|
||||
@Value("${nacos.naming.empty-service.auto-clean:false}")
|
||||
@ -146,7 +144,7 @@ public class ServiceManager implements RecordListener<Service> {
|
||||
// the possibility that the service cache information may just be deleted
|
||||
// and then created due to the heartbeat mechanism
|
||||
|
||||
GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay,
|
||||
GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoCleaner(this, distroMapper), cleanEmptyServiceDelay,
|
||||
cleanEmptyServicePeriod);
|
||||
}
|
||||
|
||||
@ -1026,58 +1024,6 @@ public class ServiceManager implements RecordListener<Service> {
|
||||
}
|
||||
}
|
||||
|
||||
private class EmptyServiceAutoClean implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
// Parallel flow opening threshold
|
||||
|
||||
int parallelSize = 100;
|
||||
|
||||
serviceMap.forEach((namespace, stringServiceMap) -> {
|
||||
Stream<Map.Entry<String, Service>> stream = null;
|
||||
if (stringServiceMap.size() > parallelSize) {
|
||||
stream = stringServiceMap.entrySet().parallelStream();
|
||||
} else {
|
||||
stream = stringServiceMap.entrySet().stream();
|
||||
}
|
||||
stream.filter(entry -> {
|
||||
final String serviceName = entry.getKey();
|
||||
return distroMapper.responsible(serviceName);
|
||||
}).forEach(entry -> stringServiceMap.computeIfPresent(entry.getKey(), (serviceName, service) -> {
|
||||
if (service.isEmpty()) {
|
||||
|
||||
// To avoid violent Service removal, the number of times the Service
|
||||
// experiences Empty is determined by finalizeCnt, and if the specified
|
||||
// value is reached, it is removed
|
||||
|
||||
if (service.getFinalizeCount() > maxFinalizeCount) {
|
||||
Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", namespace,
|
||||
serviceName);
|
||||
try {
|
||||
easyRemoveService(namespace, serviceName);
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.error("namespace : {}, [{}] services are automatically clean has "
|
||||
+ "error : {}", namespace, serviceName, e);
|
||||
}
|
||||
}
|
||||
|
||||
service.setFinalizeCount(service.getFinalizeCount() + 1);
|
||||
|
||||
Loggers.SRV_LOG
|
||||
.debug("namespace : {}, [{}] The number of times the current service experiences "
|
||||
+ "an empty instance is : {}", namespace, serviceName,
|
||||
service.getFinalizeCount());
|
||||
} else {
|
||||
service.setFinalizeCount(0);
|
||||
}
|
||||
return service;
|
||||
}));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private class ServiceReporter implements Runnable {
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.cleaner;
|
||||
|
||||
import com.alibaba.nacos.common.task.AbstractExecuteTask;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
|
||||
/**
|
||||
* Abstract Nacos naming cleaner.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public abstract class AbstractNamingCleaner extends AbstractExecuteTask implements NamingCleaner {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
doClean();
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.error("Clean {} fail. ", getType(), e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,103 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.cleaner;
|
||||
|
||||
import com.alibaba.nacos.naming.core.DistroMapper;
|
||||
import com.alibaba.nacos.naming.core.Service;
|
||||
import com.alibaba.nacos.naming.core.ServiceManager;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Empty Service Auto Cleaner.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public class EmptyServiceAutoCleaner extends AbstractNamingCleaner {
|
||||
|
||||
private static final int MAX_FINALIZE_COUNT = 3;
|
||||
|
||||
private final ServiceManager serviceManager;
|
||||
|
||||
private final DistroMapper distroMapper;
|
||||
|
||||
public EmptyServiceAutoCleaner(ServiceManager serviceManager, DistroMapper distroMapper) {
|
||||
this.serviceManager = serviceManager;
|
||||
this.distroMapper = distroMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
// Parallel flow opening threshold
|
||||
int parallelSize = 100;
|
||||
|
||||
for (String each : serviceManager.getAllNamespaces()) {
|
||||
Map<String, Service> serviceMap = serviceManager.chooseServiceMap(each);
|
||||
|
||||
Stream<Map.Entry<String, Service>> stream = null;
|
||||
if (serviceMap.size() > parallelSize) {
|
||||
stream = serviceMap.entrySet().parallelStream();
|
||||
} else {
|
||||
stream = serviceMap.entrySet().stream();
|
||||
}
|
||||
stream.filter(entry -> {
|
||||
final String serviceName = entry.getKey();
|
||||
return distroMapper.responsible(serviceName);
|
||||
}).forEach(entry -> serviceMap.computeIfPresent(entry.getKey(), (serviceName, service) -> {
|
||||
if (service.isEmpty()) {
|
||||
|
||||
// To avoid violent Service removal, the number of times the Service
|
||||
// experiences Empty is determined by finalizeCnt, and if the specified
|
||||
// value is reached, it is removed
|
||||
|
||||
if (service.getFinalizeCount() > MAX_FINALIZE_COUNT) {
|
||||
Loggers.SRV_LOG
|
||||
.warn("namespace : {}, [{}] services are automatically cleaned", each, serviceName);
|
||||
try {
|
||||
serviceManager.easyRemoveService(each, serviceName);
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG
|
||||
.error("namespace : {}, [{}] services are automatically clean has " + "error : {}",
|
||||
each, serviceName, e);
|
||||
}
|
||||
}
|
||||
|
||||
service.setFinalizeCount(service.getFinalizeCount() + 1);
|
||||
|
||||
Loggers.SRV_LOG.debug("namespace : {}, [{}] The number of times the current service experiences "
|
||||
+ "an empty instance is : {}", each, serviceName, service.getFinalizeCount());
|
||||
} else {
|
||||
service.setFinalizeCount(0);
|
||||
}
|
||||
return service;
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doClean() {
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.cleaner;
|
||||
|
||||
/**
|
||||
* Empty service auto cleaner for v2.x.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public class EmptyServiceAutoCleanerV2 extends AbstractNamingCleaner {
|
||||
|
||||
private static final String EMPTY_SERVICE = "emptyService";
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return EMPTY_SERVICE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doClean() {
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.cleaner;
|
||||
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.ExpiredMetadataInfo;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService;
|
||||
import com.alibaba.nacos.naming.misc.GlobalExecutor;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Expired metadata cleaner.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
@Component
|
||||
public class ExpiredMetadataCleaner extends AbstractNamingCleaner {
|
||||
|
||||
private static final String EXPIRED_METADATA = "expiredMetadata";
|
||||
|
||||
private final NamingMetadataManager metadataManager;
|
||||
|
||||
private final NamingMetadataOperateService metadataOperateService;
|
||||
|
||||
public ExpiredMetadataCleaner(NamingMetadataManager metadataManager,
|
||||
NamingMetadataOperateService metadataOperateService) {
|
||||
this.metadataManager = metadataManager;
|
||||
this.metadataOperateService = metadataOperateService;
|
||||
// TODO get internal from config
|
||||
GlobalExecutor.scheduleExpiredClientCleaner(this, 5000, 5000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return EXPIRED_METADATA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doClean() {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
for (ExpiredMetadataInfo each : metadataManager.getExpiredMetadataInfos()) {
|
||||
// TODO get expired time from config
|
||||
if (currentTime - each.getCreateTime() > TimeUnit.MINUTES.toMillis(1)) {
|
||||
removeExpiredMetadata(each);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void removeExpiredMetadata(ExpiredMetadataInfo expiredInfo) {
|
||||
Loggers.SRV_LOG.info("Remove expired metadata {}", expiredInfo);
|
||||
if (null == expiredInfo.getInstanceId()) {
|
||||
metadataOperateService.deleteServiceMetadata(expiredInfo.getService());
|
||||
} else {
|
||||
metadataOperateService.deleteInstanceMetadata(expiredInfo.getService(), expiredInfo.getInstanceId());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.cleaner;
|
||||
|
||||
/**
|
||||
* Nacos naming cleaner.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public interface NamingCleaner {
|
||||
|
||||
/**
|
||||
* The type which be cleaned.
|
||||
*
|
||||
* @return cleaned type
|
||||
*/
|
||||
String getType();
|
||||
|
||||
/**
|
||||
* Do clean operation.
|
||||
*/
|
||||
void doClean();
|
||||
}
|
@ -63,10 +63,10 @@ public abstract class AbstractClient implements Client {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeServiceInstance(Service service) {
|
||||
publishers.remove(service);
|
||||
public InstancePublishInfo removeServiceInstance(Service service) {
|
||||
InstancePublishInfo result = publishers.remove(service);
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
|
||||
return true;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,9 +71,9 @@ public interface Client {
|
||||
* Remove service instance from client.
|
||||
*
|
||||
* @param service service of instance
|
||||
* @return true if remove successfully, otherwise false
|
||||
* @return instance info if exist, otherwise {@code null}
|
||||
*/
|
||||
boolean removeServiceInstance(Service service);
|
||||
InstancePublishInfo removeServiceInstance(Service service);
|
||||
|
||||
/**
|
||||
* Get instance info of service from client.
|
||||
|
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.event.metadata;
|
||||
|
||||
import com.alibaba.nacos.common.notify.SlowEvent;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
|
||||
/**
|
||||
* Metadata event.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public class MetadataEvent extends SlowEvent {
|
||||
|
||||
private static final long serialVersionUID = -5842659852664110805L;
|
||||
|
||||
private final Service service;
|
||||
|
||||
private final boolean expired;
|
||||
|
||||
public MetadataEvent(Service service, boolean expired) {
|
||||
this.service = service;
|
||||
this.expired = expired;
|
||||
}
|
||||
|
||||
public Service getService() {
|
||||
return service;
|
||||
}
|
||||
|
||||
public boolean isExpired() {
|
||||
return expired;
|
||||
}
|
||||
|
||||
public static class ServiceMetadataEvent extends MetadataEvent {
|
||||
|
||||
private static final long serialVersionUID = -2888112042649967804L;
|
||||
|
||||
public ServiceMetadataEvent(Service service, boolean expired) {
|
||||
super(service, expired);
|
||||
}
|
||||
}
|
||||
|
||||
public static class InstanceMetadataEvent extends MetadataEvent {
|
||||
|
||||
private static final long serialVersionUID = 5781016126117637520L;
|
||||
|
||||
private final String instanceId;
|
||||
|
||||
public InstanceMetadataEvent(Service service, String instanceId, boolean expired) {
|
||||
super(service, expired);
|
||||
this.instanceId = instanceId;
|
||||
}
|
||||
|
||||
public String getInstanceId() {
|
||||
return instanceId;
|
||||
}
|
||||
}
|
||||
}
|
@ -46,7 +46,14 @@ public class ServiceEvent extends SlowEvent {
|
||||
private static final long serialVersionUID = 2123694271992630822L;
|
||||
|
||||
public ServiceChangedEvent(Service service) {
|
||||
this(service, false);
|
||||
}
|
||||
|
||||
public ServiceChangedEvent(Service service, boolean incrementRevision) {
|
||||
super(service);
|
||||
if (incrementRevision) {
|
||||
service.incrementRevision();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ public class ClientServiceIndexesManager extends SmartSubscriber {
|
||||
publisherIndexes.putIfAbsent(service, new ConcurrentHashSet<>());
|
||||
}
|
||||
publisherIndexes.get(service).add(clientId);
|
||||
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
|
||||
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
|
||||
}
|
||||
|
||||
private void removePublisherIndexes(Service service, String clientId) {
|
||||
@ -118,7 +118,7 @@ public class ClientServiceIndexesManager extends SmartSubscriber {
|
||||
if (publisherIndexes.get(service).isEmpty()) {
|
||||
publisherIndexes.remove(service);
|
||||
}
|
||||
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
|
||||
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
|
||||
}
|
||||
|
||||
private void addSubscriberIndexes(Service service, String clientId) {
|
||||
|
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.metadata;
|
||||
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Expired metadata information.
|
||||
* <p>
|
||||
* When an original object like service or instance be remove, the metadata need to be removed.
|
||||
* </p>
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public class ExpiredMetadataInfo {
|
||||
|
||||
private final Service service;
|
||||
|
||||
private final String instanceId;
|
||||
|
||||
private final long createTime;
|
||||
|
||||
private ExpiredMetadataInfo(Service service, String instanceId) {
|
||||
this.service = service;
|
||||
this.instanceId = instanceId;
|
||||
createTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public static ExpiredMetadataInfo newExpiredServiceMetadata(Service service) {
|
||||
return new ExpiredMetadataInfo(service, null);
|
||||
}
|
||||
|
||||
public static ExpiredMetadataInfo newExpiredInstanceMetadata(Service service, String instanceId) {
|
||||
return new ExpiredMetadataInfo(service, instanceId);
|
||||
}
|
||||
|
||||
public Service getService() {
|
||||
return service;
|
||||
}
|
||||
|
||||
public String getInstanceId() {
|
||||
return instanceId;
|
||||
}
|
||||
|
||||
public long getCreateTime() {
|
||||
return createTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof ExpiredMetadataInfo)) {
|
||||
return false;
|
||||
}
|
||||
ExpiredMetadataInfo that = (ExpiredMetadataInfo) o;
|
||||
return Objects.equals(service, that.service) && Objects.equals(instanceId, that.instanceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(service, instanceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ExpiredMetadataInfo{" + "service=" + service + ", instanceId='" + instanceId + '\'' + ", createTime="
|
||||
+ new Date(createTime) + '}';
|
||||
}
|
||||
}
|
@ -83,14 +83,14 @@ public class InstanceMetadataProcessor extends RequestProcessor4CP {
|
||||
MetadataOperation<InstanceMetadata> op = serializer.deserialize(data.toByteArray(), processType);
|
||||
Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName());
|
||||
namingMetadataManager.updateInstanceMetadata(service, op.getTag(), op.getMetadata());
|
||||
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
|
||||
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
|
||||
}
|
||||
|
||||
private void deleteInstanceMetadata(ByteString data) {
|
||||
MetadataOperation<InstanceMetadata> op = serializer.deserialize(data.toByteArray(), processType);
|
||||
Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName());
|
||||
namingMetadataManager.removeInstanceMetadata(service, op.getTag());
|
||||
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
|
||||
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, false));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -16,10 +16,19 @@
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.metadata;
|
||||
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.notify.listener.SmartSubscriber;
|
||||
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
|
||||
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
|
||||
import com.alibaba.nacos.naming.core.v2.event.metadata.MetadataEvent;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
@ -29,15 +38,19 @@ import java.util.concurrent.ConcurrentMap;
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
@Component
|
||||
public class NamingMetadataManager {
|
||||
public class NamingMetadataManager extends SmartSubscriber {
|
||||
|
||||
private final ConcurrentMap<Service, ServiceMetadata> serviceMetadataMap;
|
||||
|
||||
private final ConcurrentMap<Service, ConcurrentMap<String, InstanceMetadata>> instanceMetadataMap;
|
||||
|
||||
private final Set<ExpiredMetadataInfo> expiredMetadataInfos;
|
||||
|
||||
public NamingMetadataManager() {
|
||||
serviceMetadataMap = new ConcurrentHashMap<>(1 << 10);
|
||||
instanceMetadataMap = new ConcurrentHashMap<>(1 << 10);
|
||||
expiredMetadataInfos = new ConcurrentHashSet<>();
|
||||
NotifyCenter.registerSubscriber(this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -116,6 +129,7 @@ public class NamingMetadataManager {
|
||||
*/
|
||||
public void removeServiceMetadata(Service service) {
|
||||
serviceMetadataMap.remove(service);
|
||||
expiredMetadataInfos.remove(ExpiredMetadataInfo.newExpiredServiceMetadata(service));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -130,5 +144,63 @@ public class NamingMetadataManager {
|
||||
if (instanceMetadataMapForService.isEmpty()) {
|
||||
serviceMetadataMap.remove(service);
|
||||
}
|
||||
expiredMetadataInfos.remove(ExpiredMetadataInfo.newExpiredInstanceMetadata(service, instanceId));
|
||||
}
|
||||
|
||||
public Set<ExpiredMetadataInfo> getExpiredMetadataInfos() {
|
||||
return expiredMetadataInfos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Class<? extends Event>> subscribeTypes() {
|
||||
List<Class<? extends Event>> result = new LinkedList<>();
|
||||
result.add(MetadataEvent.InstanceMetadataEvent.class);
|
||||
result.add(MetadataEvent.ServiceMetadataEvent.class);
|
||||
result.add(ClientEvent.ClientDisconnectEvent.class);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
if (event instanceof MetadataEvent.InstanceMetadataEvent) {
|
||||
handleInstanceMetadataEvent((MetadataEvent.InstanceMetadataEvent) event);
|
||||
} else if (event instanceof MetadataEvent.ServiceMetadataEvent) {
|
||||
handleServiceMetadataEvent((MetadataEvent.ServiceMetadataEvent) event);
|
||||
} else {
|
||||
handleClientDisconnectEvent((ClientEvent.ClientDisconnectEvent) event);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleClientDisconnectEvent(ClientEvent.ClientDisconnectEvent event) {
|
||||
for (Service each : event.getClient().getAllPublishedService()) {
|
||||
String instanceId = event.getClient().getInstancePublishInfo(each).getIp();
|
||||
updateExpiredInfo(true, ExpiredMetadataInfo.newExpiredInstanceMetadata(each, instanceId));
|
||||
}
|
||||
}
|
||||
|
||||
private void handleServiceMetadataEvent(MetadataEvent.ServiceMetadataEvent event) {
|
||||
Service service = event.getService();
|
||||
if (!containServiceMetadata(service)) {
|
||||
return;
|
||||
}
|
||||
updateExpiredInfo(event.isExpired(), ExpiredMetadataInfo.newExpiredServiceMetadata(service));
|
||||
}
|
||||
|
||||
private void handleInstanceMetadataEvent(MetadataEvent.InstanceMetadataEvent event) {
|
||||
Service service = event.getService();
|
||||
String instanceId = event.getInstanceId();
|
||||
if (!containInstanceMetadata(service, instanceId)) {
|
||||
return;
|
||||
}
|
||||
updateExpiredInfo(event.isExpired(),
|
||||
ExpiredMetadataInfo.newExpiredInstanceMetadata(event.getService(), event.getInstanceId()));
|
||||
}
|
||||
|
||||
private void updateExpiredInfo(boolean expired, ExpiredMetadataInfo expiredMetadataInfo) {
|
||||
if (expired) {
|
||||
expiredMetadataInfos.add(expiredMetadataInfo);
|
||||
} else {
|
||||
expiredMetadataInfos.remove(expiredMetadataInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
|
||||
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
|
||||
import com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent;
|
||||
import com.alibaba.nacos.naming.core.v2.event.metadata.MetadataEvent;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.core.v2.service.ClientOperationService;
|
||||
@ -55,6 +56,7 @@ public class EphemeralClientOperationServiceImpl implements ClientOperationServi
|
||||
client.addServiceInstance(singleton, instancePublishInfo);
|
||||
client.setLastUpdatedTime();
|
||||
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
|
||||
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instancePublishInfo.getIp(), false));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -65,9 +67,12 @@ public class EphemeralClientOperationServiceImpl implements ClientOperationServi
|
||||
}
|
||||
Service singleton = ServiceManager.getInstance().getSingleton(service);
|
||||
Client client = clientManager.getClient(clientId);
|
||||
client.removeServiceInstance(singleton);
|
||||
InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
|
||||
client.setLastUpdatedTime();
|
||||
NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
|
||||
if (null != removedInstance) {
|
||||
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getIp(), true));
|
||||
}
|
||||
}
|
||||
|
||||
private InstancePublishInfo getPublishInfo(Instance instance) {
|
||||
|
Loading…
Reference in New Issue
Block a user