From ca1d85c85223c9de8b6dc74a51263fb3cfbd5d65 Mon Sep 17 00:00:00 2001 From: nkorange Date: Fri, 27 Dec 2019 17:47:54 +0800 Subject: [PATCH 1/3] Fix #2232 --- .../distro/DistroConsistencyServiceImpl.java | 42 +++++++------------ .../nacos/naming/misc/GlobalConfig.java | 7 ++++ .../nacos/naming/misc/GlobalExecutor.java | 23 +++++++++- 3 files changed, 45 insertions(+), 27 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java index 84bfd5401..744d76fd7 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java @@ -59,18 +59,6 @@ import java.util.concurrent.*; @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService { - private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - - t.setDaemon(true); - t.setName("com.alibaba.nacos.naming.distro.notifier"); - - return t; - } - }); - @Autowired private DistroMapper distroMapper; @@ -80,9 +68,6 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService @Autowired private TaskDispatcher taskDispatcher; - @Autowired - private DataSyncer dataSyncer; - @Autowired private Serializer serializer; @@ -105,18 +90,23 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService @PostConstruct public void init() { - GlobalExecutor.submit(new Runnable() { - @Override - public void run() { - try { - load(); - } catch (Exception e) { - Loggers.DISTRO.error("load data failed.", e); - } - } - }); + GlobalExecutor.submit(new LoadDataTask()); + GlobalExecutor.submitDistroNotifyTask(notifier); + } - executor.submit(notifier); + private class LoadDataTask implements Runnable { + + @Override + public void run() { + try { + load(); + if (!initialized) { + GlobalExecutor.submit(this, globalConfig.getLoadDataRetryDelayMillis()); + } + } catch (Exception e) { + Loggers.DISTRO.error("load data failed.", e); + } + } } public void load() throws Exception { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java index c5703f9d4..c5e4eb25b 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalConfig.java @@ -42,6 +42,9 @@ public class GlobalConfig { @Value("${nacos.naming.expireInstance:true}") private boolean expireInstance = true; + @Value("${nacos.naming.distro.loadDataRetryDelayMillis:30000}") + private long loadDataRetryDelayMillis = 30000; + public int getTaskDispatchPeriod() { return taskDispatchPeriod; } @@ -61,4 +64,8 @@ public class GlobalConfig { public boolean isExpireInstance() { return expireInstance; } + + public long getLoadDataRetryDelayMillis() { + return loadDataRetryDelayMillis; + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java index 0f2722428..ea1e1eeef 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java @@ -37,7 +37,7 @@ public class GlobalExecutor { private static final long SERVER_STATUS_UPDATE_PERIOD = TimeUnit.SECONDS.toMillis(5); private static ScheduledExecutorService executorService = - new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { + new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); @@ -114,6 +114,19 @@ public class GlobalExecutor { } }); + + private static ScheduledExecutorService distroNotifyExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + + t.setDaemon(true); + t.setName("com.alibaba.nacos.naming.distro.notifier"); + + return t; + } + }); + public static void submitDataSync(Runnable runnable, long delay) { dataSyncExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS); } @@ -163,6 +176,14 @@ public class GlobalExecutor { executorService.submit(runnable); } + public static void submit(Runnable runnable, long delay) { + executorService.schedule(runnable, delay, TimeUnit.MILLISECONDS); + } + + public static void submitDistroNotifyTask(Runnable runnable) { + distroNotifyExecutor.submit(runnable); + } + public static void submitServiceUpdate(Runnable runnable) { serviceUpdateExecutor.execute(runnable); } From 95a9ba9a98d8a3fac2e2a3cc4b99d10db4097b6e Mon Sep 17 00:00:00 2001 From: nkorange Date: Mon, 6 Jan 2020 16:09:56 +0800 Subject: [PATCH 2/3] Fix #2258 --- .../alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java index 485d28214..7dfc57c06 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java @@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.annotation.JSONField; import com.alibaba.nacos.naming.boot.RunningConfig; import com.alibaba.nacos.naming.boot.SpringContext; +import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.Instance; import com.alibaba.nacos.naming.core.Service; @@ -61,7 +62,7 @@ public class ClientBeatCheckTask implements Runnable { } public String taskKey() { - return service.getName(); + return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()); } @Override From f32d6c864a1df77e571b07cbdddb53ad457e1b22 Mon Sep 17 00:00:00 2001 From: nkorange Date: Mon, 6 Jan 2020 16:27:01 +0800 Subject: [PATCH 3/3] #2232 Code consistency --- .../distro/DistroConsistencyServiceImpl.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java index 744d76fd7..e4c027254 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java @@ -40,7 +40,10 @@ import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingQueue; /** * A consistency protocol algorithm called Distro @@ -82,7 +85,9 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService private boolean initialized = false; - public volatile Notifier notifier = new Notifier(); + private volatile Notifier notifier = new Notifier(); + + private LoadDataTask loadDataTask = new LoadDataTask(); private Map> listeners = new ConcurrentHashMap<>(); @@ -90,7 +95,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService @PostConstruct public void init() { - GlobalExecutor.submit(new LoadDataTask()); + GlobalExecutor.submit(loadDataTask); GlobalExecutor.submitDistroNotifyTask(notifier); }