Merge pull request #2420 from chuntaojun/fea_auto_clean_service
Fea auto clean service
This commit is contained in:
commit
fe4946a1e3
1
.gitignore
vendored
1
.gitignore
vendored
@ -12,3 +12,4 @@ target
|
||||
node_modules
|
||||
test/derby.log
|
||||
derby.log
|
||||
work
|
||||
|
@ -232,7 +232,9 @@ public class ServerHttpAgent implements HttpAgent {
|
||||
}
|
||||
|
||||
private String getUrl(String serverAddr, String relativePath) {
|
||||
return serverAddr + "/" + serverListMgr.getContentPath() + relativePath;
|
||||
String contextPath = serverListMgr.getContentPath().startsWith("/") ?
|
||||
serverListMgr.getContentPath() : "/" + serverListMgr.getContentPath();
|
||||
return serverAddr + contextPath + relativePath;
|
||||
}
|
||||
|
||||
public static String getAppname() {
|
||||
|
@ -84,6 +84,7 @@ public class SecurityProxy {
|
||||
username = properties.getProperty(PropertyKeyConst.USERNAME, StringUtils.EMPTY);
|
||||
password = properties.getProperty(PropertyKeyConst.PASSWORD, StringUtils.EMPTY);
|
||||
contextPath = properties.getProperty(PropertyKeyConst.CONTEXT_PATH, "/nacos");
|
||||
contextPath = contextPath.startsWith("/") ? contextPath : "/" + contextPath;
|
||||
}
|
||||
|
||||
public boolean login(List<String> servers) {
|
||||
@ -99,7 +100,7 @@ public class SecurityProxy {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
} catch (Throwable ignore) {
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -36,14 +36,15 @@ public class NamingTest {
|
||||
public void testServiceList() throws Exception {
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.put(PropertyKeyConst.SERVER_ADDR, "11.160.165.126:8848");
|
||||
properties.put(PropertyKeyConst.NAMESPACE, "t1");
|
||||
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
|
||||
properties.put(PropertyKeyConst.USERNAME, "nacos");
|
||||
properties.put(PropertyKeyConst.PASSWORD, "nacos");
|
||||
|
||||
NamingService namingService = NacosFactory.createNamingService(properties);
|
||||
|
||||
Instance instance = new Instance();
|
||||
instance.setIp("1.1.1.1");
|
||||
instance.setPort(80);
|
||||
instance.setPort(800);
|
||||
instance.setWeight(2);
|
||||
Map<String, String> map = new HashMap<String, String>();
|
||||
map.put("netType", "external");
|
||||
@ -56,7 +57,6 @@ public class NamingTest {
|
||||
// expressionSelector.setExpression("INSTANCE.metadata.registerSource = 'dubbo'");
|
||||
// ListView<String> serviceList = namingService.getServicesOfServer(1, 10, expressionSelector);
|
||||
|
||||
Thread.sleep(1000000000L);
|
||||
}
|
||||
|
||||
|
||||
|
@ -36,5 +36,8 @@ nacos.core.auth.system.type=nacos
|
||||
nacos.core.auth.enabled=false
|
||||
nacos.core.auth.default.token.secret.key=SecretKey012345678901234567890123456789012345678901234567890123456789
|
||||
|
||||
nacos.naming.empty-service.auto-clean=true
|
||||
nacos.naming.empty-service.clean.initial-delay-ms=50000
|
||||
nacos.naming.empty-service.clean.period-time-ms=30000
|
||||
|
||||
tldSkipPatterns=derbyLocale_*.jar,jaxb-api.jar,jsr173_1.0_api.jar,jaxb1-impl.jar,activation.jar
|
||||
|
@ -43,6 +43,14 @@ server.port=8848
|
||||
# nacos.naming.expireInstance=true
|
||||
|
||||
|
||||
### If enable the empty service auto clean, services with an empty instance are automatically cleared
|
||||
nacos.naming.empty-service.auto-clean=false
|
||||
### The empty service cleanup task delays startup time in milliseconds
|
||||
nacos.naming.empty-service.clean.initial-delay-ms=60000
|
||||
### The empty service cleanup task cycle execution time in milliseconds
|
||||
nacos.naming.empty-service.clean.period-time-ms=20000
|
||||
|
||||
|
||||
#*************** CMDB Module Related Configurations ***************#
|
||||
### The interval to dump external CMDB in seconds:
|
||||
# nacos.cmdb.dumpTaskInterval=3600
|
||||
|
@ -89,8 +89,6 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
||||
|
||||
private LoadDataTask loadDataTask = new LoadDataTask();
|
||||
|
||||
private LoadDataTask loadDataTask = new LoadDataTask();
|
||||
|
||||
private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
|
||||
|
||||
private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);
|
||||
|
@ -181,6 +181,10 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
|
||||
return cluster;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return ephemeralInstances.isEmpty() && persistentInstances.isEmpty();
|
||||
}
|
||||
|
||||
public void updateIPs(List<Instance> ips, boolean ephemeral) {
|
||||
|
||||
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
|
||||
|
@ -57,6 +57,11 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
|
||||
@JSONField(serialize = false)
|
||||
private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);
|
||||
|
||||
/**
|
||||
* Identify the information used to determine how many isEmpty judgments the service has experienced
|
||||
*/
|
||||
private int finalizeCount = 0;
|
||||
|
||||
private String token;
|
||||
private List<String> owners = new ArrayList<>();
|
||||
private Boolean resetWeight = false;
|
||||
@ -268,6 +273,16 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
|
||||
HealthCheckReactor.cancelCheck(clientBeatCheckTask);
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
|
||||
final Cluster cluster = entry.getValue();
|
||||
if (!cluster.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public List<Instance> allIPs() {
|
||||
List<Instance> allIPs = new ArrayList<>();
|
||||
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
|
||||
@ -502,6 +517,14 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
|
||||
}
|
||||
}
|
||||
|
||||
public int getFinalizeCount() {
|
||||
return finalizeCount;
|
||||
}
|
||||
|
||||
public void setFinalizeCount(int finalizeCount) {
|
||||
this.finalizeCount = finalizeCount;
|
||||
}
|
||||
|
||||
public void addCluster(Cluster cluster) {
|
||||
clusterMap.put(cluster.getName(), cluster);
|
||||
}
|
||||
|
@ -29,26 +29,41 @@ import com.alibaba.nacos.naming.consistency.KeyBuilder;
|
||||
import com.alibaba.nacos.naming.consistency.RecordListener;
|
||||
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
|
||||
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeerSet;
|
||||
import com.alibaba.nacos.naming.misc.*;
|
||||
import com.alibaba.nacos.naming.misc.GlobalExecutor;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.naming.misc.Message;
|
||||
import com.alibaba.nacos.naming.misc.NamingProxy;
|
||||
import com.alibaba.nacos.naming.misc.NetUtils;
|
||||
import com.alibaba.nacos.naming.misc.ServiceStatusSynchronizer;
|
||||
import com.alibaba.nacos.naming.misc.SwitchDomain;
|
||||
import com.alibaba.nacos.naming.misc.Synchronizer;
|
||||
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
|
||||
import com.alibaba.nacos.naming.push.PushService;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
/**
|
||||
* Core manager storing all services in Nacos
|
||||
@ -88,8 +103,19 @@ public class ServiceManager implements RecordListener<Service> {
|
||||
@Autowired
|
||||
private RaftPeerSet raftPeerSet;
|
||||
|
||||
@Value("${nacos.naming.empty-service.auto-clean:false}")
|
||||
private boolean emptyServiceAutoClean;
|
||||
|
||||
private int maxFinalizeCount = 3;
|
||||
|
||||
private final Object putServiceLock = new Object();
|
||||
|
||||
@Value("${nacos.naming.empty-service.clean.initial-delay-ms:60000}")
|
||||
private int cleanEmptyServiceDelay;
|
||||
|
||||
@Value("${nacos.naming.empty-service.clean.period-time-ms:20000}")
|
||||
private int cleanEmptyServicePeriod;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
|
||||
@ -97,6 +123,19 @@ public class ServiceManager implements RecordListener<Service> {
|
||||
|
||||
UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());
|
||||
|
||||
if (emptyServiceAutoClean) {
|
||||
|
||||
Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms", cleanEmptyServiceDelay, cleanEmptyServicePeriod);
|
||||
|
||||
// delay 60s, period 20s;
|
||||
|
||||
// This task is not recommended to be performed frequently in order to avoid
|
||||
// the possibility that the service cache information may just be deleted
|
||||
// and then created due to the heartbeat mechanism
|
||||
|
||||
GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay, cleanEmptyServicePeriod);
|
||||
}
|
||||
|
||||
try {
|
||||
Loggers.SRV_LOG.info("listen for service meta change");
|
||||
consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
|
||||
@ -765,11 +804,63 @@ public class ServiceManager implements RecordListener<Service> {
|
||||
serviceName, checksum);
|
||||
return;
|
||||
}
|
||||
|
||||
serviceName2Checksum.put(serviceName, checksum);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class EmptyServiceAutoClean implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
// Parallel flow opening threshold
|
||||
|
||||
int parallelSize = 100;
|
||||
|
||||
serviceMap.forEach((namespace, stringServiceMap) -> {
|
||||
Stream<Map.Entry<String, Service>> stream = null;
|
||||
if (stringServiceMap.size() > parallelSize) {
|
||||
stream = stringServiceMap.entrySet().parallelStream();
|
||||
} else {
|
||||
stream = stringServiceMap.entrySet().stream();
|
||||
}
|
||||
stream
|
||||
.filter(entry -> {
|
||||
final String serviceName = entry.getKey();
|
||||
return distroMapper.responsible(serviceName);
|
||||
})
|
||||
.forEach(entry -> stringServiceMap.computeIfPresent(entry.getKey(), (serviceName, service) -> {
|
||||
if (service.isEmpty()) {
|
||||
|
||||
// To avoid violent Service removal, the number of times the Service
|
||||
// experiences Empty is determined by finalizeCnt, and if the specified
|
||||
// value is reached, it is removed
|
||||
|
||||
if (service.getFinalizeCount() > maxFinalizeCount) {
|
||||
Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned",
|
||||
namespace, serviceName);
|
||||
try {
|
||||
easyRemoveService(namespace, serviceName);
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.error("namespace : {}, [{}] services are automatically clean has " +
|
||||
"error : {}", namespace, serviceName, e);
|
||||
}
|
||||
}
|
||||
|
||||
service.setFinalizeCount(service.getFinalizeCount() + 1);
|
||||
|
||||
Loggers.SRV_LOG.debug("namespace : {}, [{}] The number of times the current service experiences " +
|
||||
"an empty instance is : {}", namespace, serviceName, service.getFinalizeCount());
|
||||
} else {
|
||||
service.setFinalizeCount(0);
|
||||
}
|
||||
return service;
|
||||
}));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private class ServiceReporter implements Runnable {
|
||||
|
||||
@Override
|
||||
@ -794,7 +885,7 @@ public class ServiceManager implements RecordListener<Service> {
|
||||
|
||||
Service service = getService(namespaceId, serviceName);
|
||||
|
||||
if (service == null) {
|
||||
if (service == null || service.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -113,6 +113,17 @@ public class GlobalExecutor {
|
||||
}
|
||||
});
|
||||
|
||||
private static ScheduledExecutorService emptyServiceAutoCleanExecutor = Executors.newSingleThreadScheduledExecutor(
|
||||
new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r);
|
||||
t.setName("com.alibaba.nacos.naming.service.empty.auto-clean");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
private static ScheduledExecutorService distroNotifyExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
|
||||
@Override
|
||||
@ -186,4 +197,8 @@ public class GlobalExecutor {
|
||||
public static void submitServiceUpdate(Runnable runnable) {
|
||||
serviceUpdateExecutor.execute(runnable);
|
||||
}
|
||||
|
||||
public static void scheduleServiceAutoClean(Runnable runnable, long initialDelay, long period) {
|
||||
emptyServiceAutoCleanExecutor.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
@ -71,6 +71,7 @@ public class ConfigAPI_ITCase {
|
||||
public void setUp() throws Exception {
|
||||
Properties properties = new Properties();
|
||||
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1"+":"+port);
|
||||
properties.put(PropertyKeyConst.CONTEXT_PATH, "/nacos");
|
||||
iconfig = NacosFactory.createConfigService(properties);
|
||||
|
||||
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
|
||||
|
Loading…
Reference in New Issue
Block a user