1. Optimize log printing
2. Improve the robustness and readability of your code
This commit is contained in:
parent
19a5ac632c
commit
f09909cd1f
@ -59,10 +59,11 @@ public class DataStore {
|
||||
public Map<String, Datum> batchGet(List<String> keys) {
|
||||
Map<String, Datum> map = new HashMap<>(128);
|
||||
for (String key : keys) {
|
||||
if (!dataMap.containsKey(key)) {
|
||||
Datum datum = dataMap.get(key);
|
||||
if (datum == null) {
|
||||
continue;
|
||||
}
|
||||
map.put(key, dataMap.get(key));
|
||||
map.put(key, datum);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
@ -88,52 +88,42 @@ public class DataSyncer {
|
||||
return;
|
||||
}
|
||||
|
||||
GlobalExecutor.submitDataSync(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
GlobalExecutor.submitDataSync(() -> {
|
||||
// 1. check the server
|
||||
if (getServers() == null || getServers().isEmpty()) {
|
||||
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (getServers() == null || getServers().isEmpty()) {
|
||||
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
|
||||
return;
|
||||
}
|
||||
List<String> keys = task.getKeys();
|
||||
if (Loggers.EPHEMERAL.isDebugEnabled()) {
|
||||
Loggers.EPHEMERAL.debug("sync keys: {}", keys);
|
||||
}
|
||||
|
||||
List<String> keys = task.getKeys();
|
||||
// 2. get the datums by keys and check the datum is empty or not
|
||||
Map<String, Datum> datumMap = dataStore.batchGet(keys);
|
||||
if (datumMap == null || datumMap.isEmpty()) {
|
||||
// clear all flags of this task:
|
||||
for (String key : keys) {
|
||||
taskMap.remove(buildKey(key, task.getTargetServer()));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (Loggers.EPHEMERAL.isDebugEnabled()) {
|
||||
Loggers.EPHEMERAL.debug("sync keys: {}", keys);
|
||||
}
|
||||
|
||||
Map<String, Datum> datumMap = dataStore.batchGet(keys);
|
||||
|
||||
if (datumMap == null || datumMap.isEmpty()) {
|
||||
// clear all flags of this task:
|
||||
for (String key : task.getKeys()) {
|
||||
taskMap.remove(buildKey(key, task.getTargetServer()));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] data = serializer.serialize(datumMap);
|
||||
|
||||
long timestamp = System.currentTimeMillis();
|
||||
boolean success = NamingProxy.syncData(data, task.getTargetServer());
|
||||
if (!success) {
|
||||
SyncTask syncTask = new SyncTask();
|
||||
syncTask.setKeys(task.getKeys());
|
||||
syncTask.setRetryCount(task.getRetryCount() + 1);
|
||||
syncTask.setLastExecuteTime(timestamp);
|
||||
syncTask.setTargetServer(task.getTargetServer());
|
||||
retrySync(syncTask);
|
||||
} else {
|
||||
// clear all flags of this task:
|
||||
for (String key : task.getKeys()) {
|
||||
taskMap.remove(buildKey(key, task.getTargetServer()));
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
Loggers.EPHEMERAL.error("sync data failed.", e);
|
||||
byte[] data = serializer.serialize(datumMap);
|
||||
long timestamp = System.currentTimeMillis();
|
||||
boolean success = NamingProxy.syncData(data, task.getTargetServer());
|
||||
if (!success) {
|
||||
SyncTask syncTask = new SyncTask();
|
||||
syncTask.setKeys(task.getKeys());
|
||||
syncTask.setRetryCount(task.getRetryCount() + 1);
|
||||
syncTask.setLastExecuteTime(timestamp);
|
||||
syncTask.setTargetServer(task.getTargetServer());
|
||||
retrySync(syncTask);
|
||||
} else {
|
||||
// clear all flags of this task:
|
||||
for (String key : task.getKeys()) {
|
||||
taskMap.remove(buildKey(key, task.getTargetServer()));
|
||||
}
|
||||
}
|
||||
}, delay);
|
||||
@ -149,7 +139,6 @@ public class DataSyncer {
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO may choose other retry policy.
|
||||
submit(syncTask, partitionConfig.getSyncRetryDelay());
|
||||
}
|
||||
|
||||
@ -164,10 +153,6 @@ public class DataSyncer {
|
||||
|
||||
try {
|
||||
|
||||
if (Loggers.EPHEMERAL.isDebugEnabled()) {
|
||||
Loggers.EPHEMERAL.debug("server list is: {}", getServers());
|
||||
}
|
||||
|
||||
// send local timestamps to other servers:
|
||||
Map<String, String> keyChecksums = new HashMap<>(64);
|
||||
for (String key : dataStore.keys()) {
|
||||
@ -175,7 +160,11 @@ public class DataSyncer {
|
||||
continue;
|
||||
}
|
||||
|
||||
keyChecksums.put(key, dataStore.get(key).value.getChecksum());
|
||||
Datum datum = dataStore.get(key);
|
||||
if (datum == null) {
|
||||
continue;
|
||||
}
|
||||
keyChecksums.put(key, datum.value.getChecksum());
|
||||
}
|
||||
|
||||
if (keyChecksums.isEmpty()) {
|
||||
|
@ -211,6 +211,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
|
||||
// abort the procedure:
|
||||
return;
|
||||
}
|
||||
|
||||
if (!dataStore.contains(entry.getKey()) ||
|
||||
dataStore.get(entry.getKey()).value == null ||
|
||||
!dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
|
||||
|
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package com.alibaba.nacos.naming.consistency.ephemeral.distro;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.nacos.naming.cluster.servers.Server;
|
||||
import com.alibaba.nacos.naming.misc.*;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -125,7 +124,7 @@ public class TaskDispatcher {
|
||||
syncTask.setTargetServer(member.getKey());
|
||||
|
||||
if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) {
|
||||
Loggers.EPHEMERAL.debug("add sync task: {}", JSON.toJSONString(syncTask));
|
||||
Loggers.EPHEMERAL.debug("add sync task. task server is {}, and key size {}", syncTask.getTargetServer(), keys.size());
|
||||
}
|
||||
|
||||
dataSyncer.submit(syncTask, 0);
|
||||
|
@ -38,7 +38,6 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@ -69,7 +68,7 @@ public class DistroController {
|
||||
private SwitchDomain switchDomain;
|
||||
|
||||
@RequestMapping(value = "/datum", method = RequestMethod.PUT)
|
||||
public String onSyncDatum(HttpServletRequest request, HttpServletResponse response) throws Exception {
|
||||
public String onSyncDatum(HttpServletRequest request) throws Exception {
|
||||
|
||||
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
|
||||
|
||||
@ -101,7 +100,7 @@ public class DistroController {
|
||||
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
|
||||
Map<String, String> dataMap =
|
||||
serializer.deserialize(entity.getBytes(), new TypeReference<Map<String, String>>() {
|
||||
});
|
||||
});
|
||||
consistencyService.onReceiveChecksums(dataMap, source);
|
||||
return "ok";
|
||||
}
|
||||
@ -114,7 +113,11 @@ public class DistroController {
|
||||
String keySplitter = ",";
|
||||
Map<String, Datum> datumMap = new HashMap<>(64);
|
||||
for (String key : keys.split(keySplitter)) {
|
||||
datumMap.put(key, consistencyService.get(key));
|
||||
Datum datum = consistencyService.get(key);
|
||||
if (datum == null) {
|
||||
continue;
|
||||
}
|
||||
datumMap.put(key, datum);
|
||||
}
|
||||
response.getWriter().write(new String(serializer.serialize(datumMap), StandardCharsets.UTF_8));
|
||||
}
|
||||
|
@ -39,9 +39,16 @@ import java.util.Map;
|
||||
*/
|
||||
public class Instances implements Record {
|
||||
|
||||
private String cachedChecksum;
|
||||
private static MessageDigest MESSAGE_DIGEST;
|
||||
|
||||
private long lastCalculateTime = 0L;
|
||||
static {
|
||||
try {
|
||||
MESSAGE_DIGEST = MessageDigest.getInstance("MD5");
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
Loggers.SRV_LOG.error("error while calculating checksum(md5) for instances", e);
|
||||
MESSAGE_DIGEST = null;
|
||||
}
|
||||
}
|
||||
|
||||
private List<Instance> instanceList = new ArrayList<>();
|
||||
|
||||
@ -61,15 +68,12 @@ public class Instances implements Record {
|
||||
@Override
|
||||
@JSONField(serialize = false)
|
||||
public String getChecksum() {
|
||||
recalculateChecksum();
|
||||
return cachedChecksum;
|
||||
|
||||
return recalculateChecksum();
|
||||
}
|
||||
|
||||
public String getCachedChecksum() {
|
||||
return cachedChecksum;
|
||||
}
|
||||
|
||||
private void recalculateChecksum() {
|
||||
private String recalculateChecksum() {
|
||||
String checksum;
|
||||
StringBuilder sb = new StringBuilder();
|
||||
Collections.sort(instanceList);
|
||||
for (Instance ip : instanceList) {
|
||||
@ -78,16 +82,14 @@ public class Instances implements Record {
|
||||
sb.append(string);
|
||||
sb.append(",");
|
||||
}
|
||||
MessageDigest md5;
|
||||
try {
|
||||
md5 = MessageDigest.getInstance("MD5");
|
||||
cachedChecksum =
|
||||
new BigInteger(1, md5.digest((sb.toString()).getBytes(Charset.forName("UTF-8")))).toString(16);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
Loggers.SRV_LOG.error("error while calculating checksum(md5) for instances", e);
|
||||
cachedChecksum = RandomStringUtils.randomAscii(32);
|
||||
|
||||
if (MESSAGE_DIGEST != null) {
|
||||
checksum =
|
||||
new BigInteger(1, MESSAGE_DIGEST.digest((sb.toString()).getBytes(Charset.forName("UTF-8")))).toString(16);
|
||||
} else {
|
||||
checksum = RandomStringUtils.randomAscii(32);
|
||||
}
|
||||
lastCalculateTime = System.currentTimeMillis();
|
||||
return checksum;
|
||||
}
|
||||
|
||||
public String convertMap2String(Map<String, String> map) {
|
||||
|
@ -537,20 +537,17 @@ public class ServiceManager implements RecordListener<Service> {
|
||||
|
||||
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
|
||||
|
||||
Map<String, Instance> oldInstanceMap = new HashMap<>(16);
|
||||
List<Instance> currentIPs = service.allIPs(ephemeral);
|
||||
Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());
|
||||
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
|
||||
|
||||
for (Instance instance : currentIPs) {
|
||||
map.put(instance.toIPAddr(), instance);
|
||||
}
|
||||
if (datum != null) {
|
||||
oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);
|
||||
currentInstances.put(instance.toIPAddr(), instance);
|
||||
}
|
||||
|
||||
// use HashMap for deep copy:
|
||||
HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
|
||||
instanceMap.putAll(oldInstanceMap);
|
||||
Map<String, Instance> instanceMap = null;
|
||||
if (datum != null) {
|
||||
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
|
||||
}
|
||||
|
||||
for (Instance instance : ips) {
|
||||
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
|
||||
|
@ -108,27 +108,24 @@ public class NamingProxy {
|
||||
}
|
||||
|
||||
|
||||
public static boolean syncData(byte[] data, String curServer) throws Exception {
|
||||
public static boolean syncData(byte[] data, String curServer) {
|
||||
Map<String, String> headers = new HashMap<>(128);
|
||||
|
||||
headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION);
|
||||
headers.put("User-Agent", UtilsAndCommons.SERVER_VERSION);
|
||||
headers.put("Accept-Encoding", "gzip,deflate,sdch");
|
||||
headers.put("Connection", "Keep-Alive");
|
||||
headers.put("Content-Encoding", "gzip");
|
||||
|
||||
try {
|
||||
Map<String, String> headers = new HashMap<>(128);
|
||||
|
||||
headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION);
|
||||
headers.put("User-Agent", UtilsAndCommons.SERVER_VERSION);
|
||||
headers.put("Accept-Encoding", "gzip,deflate,sdch");
|
||||
headers.put("Connection", "Keep-Alive");
|
||||
headers.put("Content-Encoding", "gzip");
|
||||
|
||||
HttpClient.HttpResult result = HttpClient.httpPutLarge("http://" + curServer + RunningConfig.getContextPath()
|
||||
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, headers, data);
|
||||
|
||||
if (HttpURLConnection.HTTP_OK == result.code) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
|
||||
return true;
|
||||
}
|
||||
|
||||
throw new IOException("failed to req API:" + "http://" + curServer
|
||||
+ RunningConfig.getContextPath()
|
||||
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:"
|
||||
|
Loading…
Reference in New Issue
Block a user