Merge branch 'develop' into fea_auto_clean_service
This commit is contained in:
commit
aa9a0c045a
@ -40,7 +40,10 @@ import javax.annotation.PostConstruct;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A consistency protocol algorithm called <b>Distro</b>
|
* A consistency protocol algorithm called <b>Distro</b>
|
||||||
@ -59,18 +62,6 @@ import java.util.concurrent.*;
|
|||||||
@org.springframework.stereotype.Service("distroConsistencyService")
|
@org.springframework.stereotype.Service("distroConsistencyService")
|
||||||
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
|
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
|
||||||
|
|
||||||
private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
|
|
||||||
@Override
|
|
||||||
public Thread newThread(Runnable r) {
|
|
||||||
Thread t = new Thread(r);
|
|
||||||
|
|
||||||
t.setDaemon(true);
|
|
||||||
t.setName("com.alibaba.nacos.naming.distro.notifier");
|
|
||||||
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private DistroMapper distroMapper;
|
private DistroMapper distroMapper;
|
||||||
|
|
||||||
@ -80,9 +71,6 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TaskDispatcher taskDispatcher;
|
private TaskDispatcher taskDispatcher;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private DataSyncer dataSyncer;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private Serializer serializer;
|
private Serializer serializer;
|
||||||
|
|
||||||
@ -97,7 +85,9 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
|||||||
|
|
||||||
private boolean initialized = false;
|
private boolean initialized = false;
|
||||||
|
|
||||||
public volatile Notifier notifier = new Notifier();
|
private volatile Notifier notifier = new Notifier();
|
||||||
|
|
||||||
|
private LoadDataTask loadDataTask = new LoadDataTask();
|
||||||
|
|
||||||
private LoadDataTask loadDataTask = new LoadDataTask();
|
private LoadDataTask loadDataTask = new LoadDataTask();
|
||||||
|
|
||||||
@ -107,19 +97,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
|||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
GlobalExecutor.submit(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
load();
|
|
||||||
} catch (Exception e) {
|
|
||||||
Loggers.DISTRO.error("load data failed.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
executor.submit(notifier);
|
|
||||||
GlobalExecutor.submit(loadDataTask);
|
GlobalExecutor.submit(loadDataTask);
|
||||||
|
GlobalExecutor.submitDistroNotifyTask(notifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class LoadDataTask implements Runnable {
|
private class LoadDataTask implements Runnable {
|
||||||
|
@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSON;
|
|||||||
import com.alibaba.fastjson.annotation.JSONField;
|
import com.alibaba.fastjson.annotation.JSONField;
|
||||||
import com.alibaba.nacos.naming.boot.RunningConfig;
|
import com.alibaba.nacos.naming.boot.RunningConfig;
|
||||||
import com.alibaba.nacos.naming.boot.SpringContext;
|
import com.alibaba.nacos.naming.boot.SpringContext;
|
||||||
|
import com.alibaba.nacos.naming.consistency.KeyBuilder;
|
||||||
import com.alibaba.nacos.naming.core.DistroMapper;
|
import com.alibaba.nacos.naming.core.DistroMapper;
|
||||||
import com.alibaba.nacos.naming.core.Instance;
|
import com.alibaba.nacos.naming.core.Instance;
|
||||||
import com.alibaba.nacos.naming.core.Service;
|
import com.alibaba.nacos.naming.core.Service;
|
||||||
@ -65,7 +66,7 @@ public class ClientBeatCheckTask implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public String taskKey() {
|
public String taskKey() {
|
||||||
return service.getName();
|
return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -125,6 +125,18 @@ public class GlobalExecutor {
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
private static ScheduledExecutorService distroNotifyExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r) {
|
||||||
|
Thread t = new Thread(r);
|
||||||
|
|
||||||
|
t.setDaemon(true);
|
||||||
|
t.setName("com.alibaba.nacos.naming.distro.notifier");
|
||||||
|
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
public static void submitDataSync(Runnable runnable, long delay) {
|
public static void submitDataSync(Runnable runnable, long delay) {
|
||||||
dataSyncExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
|
dataSyncExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
@ -178,6 +190,10 @@ public class GlobalExecutor {
|
|||||||
executorService.schedule(runnable, delay, TimeUnit.MILLISECONDS);
|
executorService.schedule(runnable, delay, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void submitDistroNotifyTask(Runnable runnable) {
|
||||||
|
distroNotifyExecutor.submit(runnable);
|
||||||
|
}
|
||||||
|
|
||||||
public static void submitServiceUpdate(Runnable runnable) {
|
public static void submitServiceUpdate(Runnable runnable) {
|
||||||
serviceUpdateExecutor.execute(runnable);
|
serviceUpdateExecutor.execute(runnable);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user