diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatInfo.java b/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatInfo.java index 8a8ab263c..4a49d7802 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatInfo.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatInfo.java @@ -32,7 +32,7 @@ public class BeatInfo { private Map metadata; private volatile boolean scheduled; private volatile long period; - private volatile long time; + private volatile boolean stop; @Override public String toString() { @@ -103,11 +103,11 @@ public class BeatInfo { this.period = period; } - public long getTime() { - return time; + public boolean isStop() { + return stop; } - public void setTime(long time) { - this.time = time; + public void setStop(boolean stop) { + this.stop = stop; } } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java b/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java index 46453dc8f..e0844e625 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java @@ -30,8 +30,7 @@ import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER; */ public class BeatReactor { - private ScheduledExecutorService taskExecutorService; - private ExecutorService mainLoopExecutor; + private ScheduledExecutorService executorService; private volatile long clientBeatInterval = 5 * 1000; @@ -46,7 +45,7 @@ public class BeatReactor { public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; - taskExecutorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { + executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); @@ -55,60 +54,27 @@ public class BeatReactor { return thread; } }); - - mainLoopExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r); - thread.setDaemon(true); - thread.setName("com.alibaba.nacos.naming.beat.sender.mainloop"); - return thread; - } - }); - - mainLoopExecutor.execute(new BeatProcessor()); } public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo); + executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } public void removeBeatInfo(String serviceName, String ip, int port) { NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port); - dom2Beat.remove(buildKey(serviceName, ip, port)); + BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port)); + beatInfo.setStop(true); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } - public String buildKey(String serviceName, String ip, int port) { + private String buildKey(String serviceName, String ip, int port) { return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER + ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port; } - class BeatProcessor implements Runnable { - @Override - public void run() { - while (true) { - try { - for (Map.Entry entry : dom2Beat.entrySet()) { - BeatInfo beatInfo = entry.getValue(); - if (beatInfo.getTime() > System.currentTimeMillis()) { - continue; - } - if (beatInfo.isScheduled()) { - continue; - } - beatInfo.setScheduled(true); - taskExecutorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS); - } - } catch (Exception e) { - NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e); - } - } - } - } - class BeatTask implements Runnable { BeatInfo beatInfo; @@ -120,9 +86,11 @@ public class BeatReactor { @Override public void run() { long result = serverProxy.sendBeat(beatInfo); - beatInfo.setScheduled(false); long nextTime = result > 0 ? System.currentTimeMillis() + result : System.currentTimeMillis() + beatInfo.getPeriod(); - beatInfo.setTime(nextTime); + if (beatInfo.isStop()) { + return; + } + executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } } }