This commit is contained in:
nkorange 2019-12-27 17:47:54 +08:00
parent c52b6e392d
commit ca1d85c852
3 changed files with 45 additions and 27 deletions

View File

@ -59,18 +59,6 @@ import java.util.concurrent.*;
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.distro.notifier");
return t;
}
});
@Autowired
private DistroMapper distroMapper;
@ -80,9 +68,6 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
@Autowired
private TaskDispatcher taskDispatcher;
@Autowired
private DataSyncer dataSyncer;
@Autowired
private Serializer serializer;
@ -105,18 +90,23 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
@PostConstruct
public void init() {
GlobalExecutor.submit(new Runnable() {
GlobalExecutor.submit(new LoadDataTask());
GlobalExecutor.submitDistroNotifyTask(notifier);
}
private class LoadDataTask implements Runnable {
@Override
public void run() {
try {
load();
if (!initialized) {
GlobalExecutor.submit(this, globalConfig.getLoadDataRetryDelayMillis());
}
} catch (Exception e) {
Loggers.DISTRO.error("load data failed.", e);
}
}
});
executor.submit(notifier);
}
public void load() throws Exception {

View File

@ -42,6 +42,9 @@ public class GlobalConfig {
@Value("${nacos.naming.expireInstance:true}")
private boolean expireInstance = true;
@Value("${nacos.naming.distro.loadDataRetryDelayMillis:30000}")
private long loadDataRetryDelayMillis = 30000;
public int getTaskDispatchPeriod() {
return taskDispatchPeriod;
}
@ -61,4 +64,8 @@ public class GlobalConfig {
public boolean isExpireInstance() {
return expireInstance;
}
public long getLoadDataRetryDelayMillis() {
return loadDataRetryDelayMillis;
}
}

View File

@ -37,7 +37,7 @@ public class GlobalExecutor {
private static final long SERVER_STATUS_UPDATE_PERIOD = TimeUnit.SECONDS.toMillis(5);
private static ScheduledExecutorService executorService =
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
@ -114,6 +114,19 @@ public class GlobalExecutor {
}
});
private static ScheduledExecutorService distroNotifyExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.distro.notifier");
return t;
}
});
public static void submitDataSync(Runnable runnable, long delay) {
dataSyncExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
@ -163,6 +176,14 @@ public class GlobalExecutor {
executorService.submit(runnable);
}
public static void submit(Runnable runnable, long delay) {
executorService.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
public static void submitDistroNotifyTask(Runnable runnable) {
distroNotifyExecutor.submit(runnable);
}
public static void submitServiceUpdate(Runnable runnable) {
serviceUpdateExecutor.execute(runnable);
}