This commit is contained in:
IanCao 2019-06-19 16:58:27 +08:00
parent d7908c651d
commit f2d22f181a
2 changed files with 15 additions and 47 deletions

View File

@ -32,7 +32,7 @@ public class BeatInfo {
private Map<String, String> metadata; private Map<String, String> metadata;
private volatile boolean scheduled; private volatile boolean scheduled;
private volatile long period; private volatile long period;
private volatile long time; private volatile boolean stop;
@Override @Override
public String toString() { public String toString() {
@ -103,11 +103,11 @@ public class BeatInfo {
this.period = period; this.period = period;
} }
public long getTime() { public boolean isStop() {
return time; return stop;
} }
public void setTime(long time) { public void setStop(boolean stop) {
this.time = time; this.stop = stop;
} }
} }

View File

@ -30,8 +30,7 @@ import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
*/ */
public class BeatReactor { public class BeatReactor {
private ScheduledExecutorService taskExecutorService; private ScheduledExecutorService executorService;
private ExecutorService mainLoopExecutor;
private volatile long clientBeatInterval = 5 * 1000; private volatile long clientBeatInterval = 5 * 1000;
@ -46,7 +45,7 @@ public class BeatReactor {
public BeatReactor(NamingProxy serverProxy, int threadCount) { public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy; this.serverProxy = serverProxy;
taskExecutorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread thread = new Thread(r); Thread thread = new Thread(r);
@ -55,60 +54,27 @@ public class BeatReactor {
return thread; return thread;
} }
}); });
mainLoopExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender.mainloop");
return thread;
}
});
mainLoopExecutor.execute(new BeatProcessor());
} }
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo); dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
} }
public void removeBeatInfo(String serviceName, String ip, int port) { public void removeBeatInfo(String serviceName, String ip, int port) {
NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port); NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port);
dom2Beat.remove(buildKey(serviceName, ip, port)); BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port));
beatInfo.setStop(true);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
} }
public String buildKey(String serviceName, String ip, int port) { private String buildKey(String serviceName, String ip, int port) {
return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER
+ ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port; + ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port;
} }
class BeatProcessor implements Runnable {
@Override
public void run() {
while (true) {
try {
for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) {
BeatInfo beatInfo = entry.getValue();
if (beatInfo.getTime() > System.currentTimeMillis()) {
continue;
}
if (beatInfo.isScheduled()) {
continue;
}
beatInfo.setScheduled(true);
taskExecutorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
}
}
}
}
class BeatTask implements Runnable { class BeatTask implements Runnable {
BeatInfo beatInfo; BeatInfo beatInfo;
@ -120,9 +86,11 @@ public class BeatReactor {
@Override @Override
public void run() { public void run() {
long result = serverProxy.sendBeat(beatInfo); long result = serverProxy.sendBeat(beatInfo);
beatInfo.setScheduled(false);
long nextTime = result > 0 ? System.currentTimeMillis() + result : System.currentTimeMillis() + beatInfo.getPeriod(); long nextTime = result > 0 ? System.currentTimeMillis() + result : System.currentTimeMillis() + beatInfo.getPeriod();
beatInfo.setTime(nextTime); if (beatInfo.isStop()) {
return;
}
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
} }
} }
} }