From f09909cd1fd2472fc8fc777738f6259ce8f72377 Mon Sep 17 00:00:00 2001 From: pbting <314226532@qq.com> Date: Fri, 9 Aug 2019 10:25:42 +0800 Subject: [PATCH] 1. Optimize log printing 2. Improve the robustness and readability of your code --- .../ephemeral/distro/DataStore.java | 5 +- .../ephemeral/distro/DataSyncer.java | 87 ++++++++----------- .../distro/DistroConsistencyServiceImpl.java | 1 + .../ephemeral/distro/TaskDispatcher.java | 3 +- .../naming/controllers/DistroController.java | 11 ++- .../alibaba/nacos/naming/core/Instances.java | 38 ++++---- .../nacos/naming/core/ServiceManager.java | 15 ++-- .../nacos/naming/misc/NamingProxy.java | 21 ++--- 8 files changed, 85 insertions(+), 96 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataStore.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataStore.java index fe95237f7..b02f66c67 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataStore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataStore.java @@ -59,10 +59,11 @@ public class DataStore { public Map batchGet(List keys) { Map map = new HashMap<>(128); for (String key : keys) { - if (!dataMap.containsKey(key)) { + Datum datum = dataMap.get(key); + if (datum == null) { continue; } - map.put(key, dataMap.get(key)); + map.put(key, datum); } return map; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java index c5de7497a..abd430553 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java @@ -88,52 +88,42 @@ public class DataSyncer { return; } - GlobalExecutor.submitDataSync(new Runnable() { - @Override - public void run() { + GlobalExecutor.submitDataSync(() -> { + // 1. check the server + if (getServers() == null || getServers().isEmpty()) { + Loggers.SRV_LOG.warn("try to sync data but server list is empty."); + return; + } - try { - if (getServers() == null || getServers().isEmpty()) { - Loggers.SRV_LOG.warn("try to sync data but server list is empty."); - return; - } + List keys = task.getKeys(); + if (Loggers.EPHEMERAL.isDebugEnabled()) { + Loggers.EPHEMERAL.debug("sync keys: {}", keys); + } - List keys = task.getKeys(); + // 2. get the datums by keys and check the datum is empty or not + Map datumMap = dataStore.batchGet(keys); + if (datumMap == null || datumMap.isEmpty()) { + // clear all flags of this task: + for (String key : keys) { + taskMap.remove(buildKey(key, task.getTargetServer())); + } + return; + } - if (Loggers.EPHEMERAL.isDebugEnabled()) { - Loggers.EPHEMERAL.debug("sync keys: {}", keys); - } - - Map datumMap = dataStore.batchGet(keys); - - if (datumMap == null || datumMap.isEmpty()) { - // clear all flags of this task: - for (String key : task.getKeys()) { - taskMap.remove(buildKey(key, task.getTargetServer())); - } - return; - } - - byte[] data = serializer.serialize(datumMap); - - long timestamp = System.currentTimeMillis(); - boolean success = NamingProxy.syncData(data, task.getTargetServer()); - if (!success) { - SyncTask syncTask = new SyncTask(); - syncTask.setKeys(task.getKeys()); - syncTask.setRetryCount(task.getRetryCount() + 1); - syncTask.setLastExecuteTime(timestamp); - syncTask.setTargetServer(task.getTargetServer()); - retrySync(syncTask); - } else { - // clear all flags of this task: - for (String key : task.getKeys()) { - taskMap.remove(buildKey(key, task.getTargetServer())); - } - } - - } catch (Exception e) { - Loggers.EPHEMERAL.error("sync data failed.", e); + byte[] data = serializer.serialize(datumMap); + long timestamp = System.currentTimeMillis(); + boolean success = NamingProxy.syncData(data, task.getTargetServer()); + if (!success) { + SyncTask syncTask = new SyncTask(); + syncTask.setKeys(task.getKeys()); + syncTask.setRetryCount(task.getRetryCount() + 1); + syncTask.setLastExecuteTime(timestamp); + syncTask.setTargetServer(task.getTargetServer()); + retrySync(syncTask); + } else { + // clear all flags of this task: + for (String key : task.getKeys()) { + taskMap.remove(buildKey(key, task.getTargetServer())); } } }, delay); @@ -149,7 +139,6 @@ public class DataSyncer { return; } - // TODO may choose other retry policy. submit(syncTask, partitionConfig.getSyncRetryDelay()); } @@ -164,10 +153,6 @@ public class DataSyncer { try { - if (Loggers.EPHEMERAL.isDebugEnabled()) { - Loggers.EPHEMERAL.debug("server list is: {}", getServers()); - } - // send local timestamps to other servers: Map keyChecksums = new HashMap<>(64); for (String key : dataStore.keys()) { @@ -175,7 +160,11 @@ public class DataSyncer { continue; } - keyChecksums.put(key, dataStore.get(key).value.getChecksum()); + Datum datum = dataStore.get(key); + if (datum == null) { + continue; + } + keyChecksums.put(key, datum.value.getChecksum()); } if (keyChecksums.isEmpty()) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java index 9024addcb..a23c4a65d 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java @@ -211,6 +211,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService // abort the procedure: return; } + if (!dataStore.contains(entry.getKey()) || dataStore.get(entry.getKey()).value == null || !dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java index 23574beea..249d811f7 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java @@ -15,7 +15,6 @@ */ package com.alibaba.nacos.naming.consistency.ephemeral.distro; -import com.alibaba.fastjson.JSON; import com.alibaba.nacos.naming.cluster.servers.Server; import com.alibaba.nacos.naming.misc.*; import org.apache.commons.lang3.StringUtils; @@ -125,7 +124,7 @@ public class TaskDispatcher { syncTask.setTargetServer(member.getKey()); if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) { - Loggers.EPHEMERAL.debug("add sync task: {}", JSON.toJSONString(syncTask)); + Loggers.EPHEMERAL.debug("add sync task. task server is {}, and key size {}", syncTask.getTargetServer(), keys.size()); } dataSyncer.submit(syncTask, 0); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/DistroController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/DistroController.java index 080b98f6e..e74024710 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/DistroController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/DistroController.java @@ -38,7 +38,6 @@ import org.springframework.web.bind.annotation.RestController; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -69,7 +68,7 @@ public class DistroController { private SwitchDomain switchDomain; @RequestMapping(value = "/datum", method = RequestMethod.PUT) - public String onSyncDatum(HttpServletRequest request, HttpServletResponse response) throws Exception { + public String onSyncDatum(HttpServletRequest request) throws Exception { String entity = IOUtils.toString(request.getInputStream(), "UTF-8"); @@ -101,7 +100,7 @@ public class DistroController { String entity = IOUtils.toString(request.getInputStream(), "UTF-8"); Map dataMap = serializer.deserialize(entity.getBytes(), new TypeReference>() { - }); + }); consistencyService.onReceiveChecksums(dataMap, source); return "ok"; } @@ -114,7 +113,11 @@ public class DistroController { String keySplitter = ","; Map datumMap = new HashMap<>(64); for (String key : keys.split(keySplitter)) { - datumMap.put(key, consistencyService.get(key)); + Datum datum = consistencyService.get(key); + if (datum == null) { + continue; + } + datumMap.put(key, datum); } response.getWriter().write(new String(serializer.serialize(datumMap), StandardCharsets.UTF_8)); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/Instances.java b/naming/src/main/java/com/alibaba/nacos/naming/core/Instances.java index ef4e560e3..49ad3b913 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/Instances.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/Instances.java @@ -39,9 +39,16 @@ import java.util.Map; */ public class Instances implements Record { - private String cachedChecksum; + private static MessageDigest MESSAGE_DIGEST; - private long lastCalculateTime = 0L; + static { + try { + MESSAGE_DIGEST = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + Loggers.SRV_LOG.error("error while calculating checksum(md5) for instances", e); + MESSAGE_DIGEST = null; + } + } private List instanceList = new ArrayList<>(); @@ -61,15 +68,12 @@ public class Instances implements Record { @Override @JSONField(serialize = false) public String getChecksum() { - recalculateChecksum(); - return cachedChecksum; + + return recalculateChecksum(); } - public String getCachedChecksum() { - return cachedChecksum; - } - - private void recalculateChecksum() { + private String recalculateChecksum() { + String checksum; StringBuilder sb = new StringBuilder(); Collections.sort(instanceList); for (Instance ip : instanceList) { @@ -78,16 +82,14 @@ public class Instances implements Record { sb.append(string); sb.append(","); } - MessageDigest md5; - try { - md5 = MessageDigest.getInstance("MD5"); - cachedChecksum = - new BigInteger(1, md5.digest((sb.toString()).getBytes(Charset.forName("UTF-8")))).toString(16); - } catch (NoSuchAlgorithmException e) { - Loggers.SRV_LOG.error("error while calculating checksum(md5) for instances", e); - cachedChecksum = RandomStringUtils.randomAscii(32); + + if (MESSAGE_DIGEST != null) { + checksum = + new BigInteger(1, MESSAGE_DIGEST.digest((sb.toString()).getBytes(Charset.forName("UTF-8")))).toString(16); + } else { + checksum = RandomStringUtils.randomAscii(32); } - lastCalculateTime = System.currentTimeMillis(); + return checksum; } public String convertMap2String(Map map) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java index 74fe9cdbc..27c1ab78c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java @@ -537,20 +537,17 @@ public class ServiceManager implements RecordListener { Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); - Map oldInstanceMap = new HashMap<>(16); List currentIPs = service.allIPs(ephemeral); - Map map = new ConcurrentHashMap<>(currentIPs.size()); + Map currentInstances = new HashMap<>(currentIPs.size()); for (Instance instance : currentIPs) { - map.put(instance.toIPAddr(), instance); - } - if (datum != null) { - oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map); + currentInstances.put(instance.toIPAddr(), instance); } - // use HashMap for deep copy: - HashMap instanceMap = new HashMap<>(oldInstanceMap.size()); - instanceMap.putAll(oldInstanceMap); + Map instanceMap = null; + if (datum != null) { + instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); + } for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java index eb14827a9..f2485a1d5 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java @@ -108,27 +108,24 @@ public class NamingProxy { } - public static boolean syncData(byte[] data, String curServer) throws Exception { + public static boolean syncData(byte[] data, String curServer) { + Map headers = new HashMap<>(128); + + headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION); + headers.put("User-Agent", UtilsAndCommons.SERVER_VERSION); + headers.put("Accept-Encoding", "gzip,deflate,sdch"); + headers.put("Connection", "Keep-Alive"); + headers.put("Content-Encoding", "gzip"); + try { - Map headers = new HashMap<>(128); - - headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION); - headers.put("User-Agent", UtilsAndCommons.SERVER_VERSION); - headers.put("Accept-Encoding", "gzip,deflate,sdch"); - headers.put("Connection", "Keep-Alive"); - headers.put("Content-Encoding", "gzip"); - HttpClient.HttpResult result = HttpClient.httpPutLarge("http://" + curServer + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, headers, data); - if (HttpURLConnection.HTTP_OK == result.code) { return true; } - if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) { return true; } - throw new IOException("failed to req API:" + "http://" + curServer + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:"