#502 Fix concurrent problem.
This commit is contained in:
parent
c9ed06107d
commit
7ca6143550
@ -71,3 +71,4 @@ nacos.naming.distro.batchSyncKeyCount=1000
|
||||
nacos.naming.distro.initDataRatio=0.9
|
||||
nacos.naming.distro.syncRetryDelay=5000
|
||||
nacos.naming.data.warmup=false
|
||||
nacos.naming.expireInstance=true
|
||||
|
@ -45,3 +45,4 @@ nacos.naming.distro.batchSyncKeyCount=1000
|
||||
nacos.naming.distro.initDataRatio=0.9
|
||||
nacos.naming.distro.syncRetryDelay=5000
|
||||
nacos.naming.data.warmup=true
|
||||
nacos.naming.expireInstance=true
|
||||
|
@ -23,6 +23,7 @@ import com.alibaba.nacos.naming.cluster.ServerMode;
|
||||
import com.alibaba.nacos.naming.cluster.ServerStatus;
|
||||
import com.alibaba.nacos.naming.cluster.servers.Server;
|
||||
import com.alibaba.nacos.naming.cluster.transport.Serializer;
|
||||
import com.alibaba.nacos.naming.consistency.ApplyAction;
|
||||
import com.alibaba.nacos.naming.consistency.Datum;
|
||||
import com.alibaba.nacos.naming.consistency.KeyBuilder;
|
||||
import com.alibaba.nacos.naming.consistency.RecordListener;
|
||||
@ -32,13 +33,15 @@ import com.alibaba.nacos.naming.core.Instances;
|
||||
import com.alibaba.nacos.naming.core.Service;
|
||||
import com.alibaba.nacos.naming.misc.*;
|
||||
import com.alibaba.nacos.naming.pojo.Record;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.javatuples.Pair;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* A consistency protocol algorithm called <b>Partition</b>
|
||||
@ -57,6 +60,18 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
@org.springframework.stereotype.Service("distroConsistencyService")
|
||||
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
|
||||
|
||||
private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r);
|
||||
|
||||
t.setDaemon(true);
|
||||
t.setName("com.alibaba.nacos.naming.distro.notifier");
|
||||
|
||||
return t;
|
||||
}
|
||||
});
|
||||
|
||||
@Autowired
|
||||
private DistroMapper distroMapper;
|
||||
|
||||
@ -83,7 +98,9 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
||||
|
||||
private boolean initialized = false;
|
||||
|
||||
private volatile Map<String, List<RecordListener>> listeners = new ConcurrentHashMap<>();
|
||||
public volatile Notifier notifier = new Notifier();
|
||||
|
||||
private volatile Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Exception {
|
||||
@ -97,6 +114,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
executor.submit(notifier);
|
||||
}
|
||||
|
||||
public void load() throws Exception {
|
||||
@ -154,13 +173,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
||||
if (!listeners.containsKey(key)) {
|
||||
return;
|
||||
}
|
||||
for (RecordListener listener : listeners.get(key)) {
|
||||
try {
|
||||
listener.onChange(key, value);
|
||||
} catch (Exception e) {
|
||||
Loggers.EPHEMERAL.error("notify " + listener + ", key:" + key + " failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
notifier.addTask(key, ApplyAction.CHANGE);
|
||||
}
|
||||
|
||||
public void onRemove(String key) {
|
||||
@ -170,13 +184,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
||||
if (!listeners.containsKey(key)) {
|
||||
return;
|
||||
}
|
||||
for (RecordListener listener : listeners.get(key)) {
|
||||
try {
|
||||
listener.onDelete(key);
|
||||
} catch (Exception e) {
|
||||
Loggers.EPHEMERAL.error("notify " + listener + ", key:" + key + " failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
notifier.addTask(key, ApplyAction.DELETE);
|
||||
}
|
||||
|
||||
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
|
||||
@ -288,7 +297,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
||||
@Override
|
||||
public void listen(String key, RecordListener listener) throws NacosException {
|
||||
if (!listeners.containsKey(key)) {
|
||||
listeners.put(key, new ArrayList<>());
|
||||
listeners.put(key, new CopyOnWriteArrayList<>());
|
||||
}
|
||||
listeners.get(key).add(listener);
|
||||
}
|
||||
@ -314,4 +323,78 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
||||
public boolean isInitialized() {
|
||||
return initialized || !globalConfig.isDataWarmup();
|
||||
}
|
||||
|
||||
public class Notifier implements Runnable {
|
||||
|
||||
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
|
||||
|
||||
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
|
||||
|
||||
public void addTask(String datumKey, ApplyAction action) {
|
||||
|
||||
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
|
||||
return;
|
||||
}
|
||||
if (action == ApplyAction.CHANGE) {
|
||||
services.put(datumKey, StringUtils.EMPTY);
|
||||
}
|
||||
tasks.add(Pair.with(datumKey, action));
|
||||
}
|
||||
|
||||
public int getTaskSize() {
|
||||
return tasks.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Loggers.EPHEMERAL.info("distro notifier started");
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
|
||||
Pair pair = tasks.take();
|
||||
|
||||
if (pair == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String datumKey = (String) pair.getValue0();
|
||||
ApplyAction action = (ApplyAction) pair.getValue1();
|
||||
|
||||
services.remove(datumKey);
|
||||
|
||||
int count = 0;
|
||||
|
||||
if (!listeners.containsKey(datumKey)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (RecordListener listener : listeners.get(datumKey)) {
|
||||
|
||||
count++;
|
||||
|
||||
try {
|
||||
if (action == ApplyAction.CHANGE) {
|
||||
listener.onChange(datumKey, dataStore.get(datumKey).value);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (action == ApplyAction.DELETE) {
|
||||
listener.onDelete(datumKey);
|
||||
continue;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while notifying listener of key: {} {}", datumKey, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (Loggers.EPHEMERAL.isDebugEnabled()) {
|
||||
Loggers.EPHEMERAL.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}", datumKey, count);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
Loggers.EPHEMERAL.error("[NACOS-DISTRO] Error while handling notifying task", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -557,7 +557,7 @@ public class RaftCore {
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Loggers.RAFT.error("VIPSRV error while sending heart-beat to peer: {} {}", server, e);
|
||||
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
|
||||
MetricsMonitor.getLeaderSendBeatFailedException().increment();
|
||||
}
|
||||
}
|
||||
|
@ -154,7 +154,7 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
|
||||
@Override
|
||||
public void onChange(String key, Instances value) throws Exception {
|
||||
|
||||
Loggers.RAFT.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
|
||||
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
|
||||
|
||||
for (Instance ip : value.getInstanceList()) {
|
||||
|
||||
|
@ -22,10 +22,7 @@ import com.alibaba.nacos.naming.boot.SpringContext;
|
||||
import com.alibaba.nacos.naming.core.DistroMapper;
|
||||
import com.alibaba.nacos.naming.core.Instance;
|
||||
import com.alibaba.nacos.naming.core.Service;
|
||||
import com.alibaba.nacos.naming.misc.HttpClient;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.naming.misc.NamingProxy;
|
||||
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
|
||||
import com.alibaba.nacos.naming.misc.*;
|
||||
import com.alibaba.nacos.naming.push.PushService;
|
||||
import com.ning.http.client.AsyncCompletionHandler;
|
||||
import com.ning.http.client.Response;
|
||||
@ -57,6 +54,10 @@ public class ClientBeatCheckTask implements Runnable {
|
||||
return SpringContext.getAppContext().getBean(DistroMapper.class);
|
||||
}
|
||||
|
||||
public GlobalConfig getGlobalConfig() {
|
||||
return SpringContext.getAppContext().getBean(GlobalConfig.class);
|
||||
}
|
||||
|
||||
public String taskKey() {
|
||||
return service.getName();
|
||||
}
|
||||
@ -85,6 +86,10 @@ public class ClientBeatCheckTask implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
if (!getGlobalConfig().isExpireInstance()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// then remove obsolete instances:
|
||||
for (Instance instance : instances) {
|
||||
if (System.currentTimeMillis() - instance.getLastBeat() > service.getIpDeleteTimeout()) {
|
||||
|
@ -42,6 +42,9 @@ public class GlobalConfig {
|
||||
@Value("${nacos.naming.data.warmup}")
|
||||
private boolean dataWarmup = false;
|
||||
|
||||
@Value("${nacos.naming.expireInstance}")
|
||||
private boolean expireInstance = true;
|
||||
|
||||
public int getTaskDispatchPeriod() {
|
||||
return taskDispatchPeriod;
|
||||
}
|
||||
@ -61,4 +64,8 @@ public class GlobalConfig {
|
||||
public boolean isDataWarmup() {
|
||||
return dataWarmup;
|
||||
}
|
||||
|
||||
public boolean isExpireInstance() {
|
||||
return expireInstance;
|
||||
}
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ public class HttpClient {
|
||||
|
||||
return getResult(conn);
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.warn("[VIPSRV] Exception while request: {}, caused: {}", url, e);
|
||||
Loggers.SRV_LOG.warn("Exception while request: {}, caused: {}", url, e);
|
||||
return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());
|
||||
} finally {
|
||||
if (conn != null) {
|
||||
|
@ -139,10 +139,9 @@ public class SwitchDomain implements Record, Cloneable {
|
||||
// the followings are not implemented
|
||||
|
||||
public String getName() {
|
||||
return "00-00---000-VIPSRV_SWITCH_DOMAIN-000---00-00";
|
||||
return UtilsAndCommons.SWITCH_DOMAIN_NAME;
|
||||
}
|
||||
|
||||
|
||||
public void update(SwitchDomain domain) {
|
||||
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ public class UtilsAndCommons {
|
||||
|
||||
public static final String IPADDRESS_DATA_ID_PRE = "com.alibaba.nacos.naming.iplist.";
|
||||
|
||||
public static final String SWITCH_DOMAIN_NAME = "00-00---000-VIPSRV_SWITCH_DOMAIN-000---00-00";
|
||||
public static final String SWITCH_DOMAIN_NAME = "00-00---000-NACOS_SWITCH_DOMAIN-000---00-00";
|
||||
|
||||
public static final String CIDR_REGEX = "[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}/[0-9]+";
|
||||
|
||||
|
@ -33,3 +33,4 @@ nacos.naming.distro.initDataRatio=0.9
|
||||
nacos.naming.distro.syncRetryDelay=5000
|
||||
|
||||
nacos.naming.data.warmup=true
|
||||
nacos.naming.expireInstance=true
|
||||
|
@ -25,3 +25,4 @@ nacos.naming.distro.batchSyncKeyCount=1000
|
||||
nacos.naming.distro.initDataRatio=0.9
|
||||
nacos.naming.distro.syncRetryDelay=5000
|
||||
nacos.naming.data.warmup=false
|
||||
nacos.naming.expireInstance=true
|
||||
|
Loading…
Reference in New Issue
Block a user