diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java index 9ed4d998f..5560529c5 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java @@ -40,7 +40,10 @@ import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingQueue; /** * A consistency protocol algorithm called Distro @@ -59,18 +62,6 @@ import java.util.concurrent.*; @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService { - private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - - t.setDaemon(true); - t.setName("com.alibaba.nacos.naming.distro.notifier"); - - return t; - } - }); - @Autowired private DistroMapper distroMapper; @@ -80,9 +71,6 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService @Autowired private TaskDispatcher taskDispatcher; - @Autowired - private DataSyncer dataSyncer; - @Autowired private Serializer serializer; @@ -97,7 +85,9 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService private boolean initialized = false; - public volatile Notifier notifier = new Notifier(); + private volatile Notifier notifier = new Notifier(); + + private LoadDataTask loadDataTask = new LoadDataTask(); private LoadDataTask loadDataTask = new LoadDataTask(); @@ -107,19 +97,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService @PostConstruct public void init() { - GlobalExecutor.submit(new Runnable() { - @Override - public void run() { - try { - load(); - } catch (Exception e) { - Loggers.DISTRO.error("load data failed.", e); - } - } - }); - - executor.submit(notifier); GlobalExecutor.submit(loadDataTask); + GlobalExecutor.submitDistroNotifyTask(notifier); } private class LoadDataTask implements Runnable { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java index ba9c965ad..ea7537e75 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java @@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.annotation.JSONField; import com.alibaba.nacos.naming.boot.RunningConfig; import com.alibaba.nacos.naming.boot.SpringContext; +import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.Instance; import com.alibaba.nacos.naming.core.Service; @@ -65,7 +66,7 @@ public class ClientBeatCheckTask implements Runnable { } public String taskKey() { - return service.getName(); + return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()); } @Override diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java index 39c0604bb..7907885cf 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java @@ -113,6 +113,19 @@ public class GlobalExecutor { } }); + + private static ScheduledExecutorService distroNotifyExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + + t.setDaemon(true); + t.setName("com.alibaba.nacos.naming.distro.notifier"); + + return t; + } + }); + public static void submitDataSync(Runnable runnable, long delay) { dataSyncExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS); } @@ -166,6 +179,10 @@ public class GlobalExecutor { executorService.schedule(runnable, delay, TimeUnit.MILLISECONDS); } + public static void submitDistroNotifyTask(Runnable runnable) { + distroNotifyExecutor.submit(runnable); + } + public static void submitServiceUpdate(Runnable runnable) { serviceUpdateExecutor.execute(runnable); }