From 9f5f8bb3797be85986a69ca6c9746eded1e6368c Mon Sep 17 00:00:00 2001 From: nkorange Date: Mon, 18 Feb 2019 08:42:24 +0800 Subject: [PATCH] #634 Add push switch --- .../ephemeral/partition/DataSyncer.java | 7 +++++++ .../PartitionConsistencyServiceImpl.java | 5 +++++ .../alibaba/nacos/naming/core/Instances.java | 18 +++++++++--------- .../exception/ResponseExceptionHandler.java | 6 +++--- .../nacos/naming/misc/SwitchDomain.java | 10 ++++++++++ .../alibaba/nacos/naming/misc/SwitchEntry.java | 1 + .../nacos/naming/misc/SwitchManager.java | 11 +++++++++++ .../alibaba/nacos/naming/push/PushService.java | 5 +++++ .../nacos/test/naming/ServiceListTest.java | 3 +-- 9 files changed, 52 insertions(+), 14 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataSyncer.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataSyncer.java index fcc182345..f1a011333 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataSyncer.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataSyncer.java @@ -88,6 +88,9 @@ public class DataSyncer implements ServerChangeListener { String key = iterator.next(); if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) { // associated key already exist: + if (Loggers.EPHEMERAL.isDebugEnabled()) { + Loggers.EPHEMERAL.debug("sync already in process, key: {}", key); + } iterator.remove(); } } @@ -213,6 +216,10 @@ public class DataSyncer implements ServerChangeListener { return; } + if (Loggers.EPHEMERAL.isDebugEnabled()) { + Loggers.EPHEMERAL.debug("sync checksums: {}", keyChecksums); + } + for (Server member : servers) { if (NetUtils.localServer().equals(member.getKey())) { continue; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/PartitionConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/PartitionConsistencyServiceImpl.java index 8efa2fd35..9a5df6586 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/PartitionConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/PartitionConsistencyServiceImpl.java @@ -140,6 +140,11 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ } for (String key : dataStore.keys()) { + + if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) { + continue; + } + if (!timestamps.containsKey(key)) { toRemoveKeys.add(key); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/Instances.java b/naming/src/main/java/com/alibaba/nacos/naming/core/Instances.java index d1e5ae779..6906d7002 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/Instances.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/Instances.java @@ -19,7 +19,6 @@ import com.alibaba.fastjson.JSON; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.pojo.Record; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.StringUtils; import java.math.BigInteger; import java.nio.charset.Charset; @@ -36,7 +35,7 @@ import java.util.List; */ public class Instances implements Record { - private String checksum; + private String cachedChecksum; private long lastCalculateTime = 0L; @@ -57,11 +56,12 @@ public class Instances implements Record { @Override public String getChecksum() { - if (StringUtils.isBlank(checksum) || - (System.currentTimeMillis() - lastCalculateTime) > 5000L) { - recalculateChecksum(); - } - return checksum; + recalculateChecksum(); + return cachedChecksum; + } + + public String getCachedChecksum() { + return cachedChecksum; } private void recalculateChecksum() { @@ -76,11 +76,11 @@ public class Instances implements Record { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); - checksum = + cachedChecksum = new BigInteger(1, md5.digest((sb.toString()).getBytes(Charset.forName("UTF-8")))).toString(16); } catch (NoSuchAlgorithmException e) { Loggers.SRV_LOG.error("error while calculating checksum(md5) for instances", e); - checksum = RandomStringUtils.randomAscii(32); + cachedChecksum = RandomStringUtils.randomAscii(32); } lastCalculateTime = System.currentTimeMillis(); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/exception/ResponseExceptionHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/exception/ResponseExceptionHandler.java index 0d9b27467..bd9ca3501 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/exception/ResponseExceptionHandler.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/exception/ResponseExceptionHandler.java @@ -30,13 +30,13 @@ public class ResponseExceptionHandler { @ExceptionHandler(NacosException.class) private ResponseEntity handleNacosException(NacosException e) { - Loggers.SRV_LOG.error("got exception.", e); + Loggers.SRV_LOG.error("got exception. {}", e.getErrorMsg(), e); return ResponseEntity.status(e.getErrorCode()).body(e.getMessage()); } @ExceptionHandler(IllegalArgumentException.class) public ResponseEntity handleParameterError(IllegalArgumentException ex) { - Loggers.SRV_LOG.error("got exception.", ex); + Loggers.SRV_LOG.error("got exception. {}", ex.getMessage(), ex); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage()); } @@ -48,7 +48,7 @@ public class ResponseExceptionHandler { } @ExceptionHandler(Exception.class) - private ResponseEntity handleNacosException(Exception e) { + private ResponseEntity handleException(Exception e) { Loggers.SRV_LOG.error("got exception.", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.toString()); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java index f52bd1596..50e2eed7f 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchDomain.java @@ -50,6 +50,8 @@ public class SwitchDomain implements Record, Cloneable { public boolean enableStandalone = true; + public boolean pushEnabled = true; + public int checkTimes = 3; public HttpHealthParams httpHealthParams = new HttpHealthParams(); @@ -221,6 +223,14 @@ public class SwitchDomain implements Record, Cloneable { this.distroEnabled = distroEnabled; } + public boolean isPushEnabled() { + return pushEnabled; + } + + public void setPushEnabled(boolean pushEnabled) { + this.pushEnabled = pushEnabled; + } + public int getCheckTimes() { return checkTimes; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchEntry.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchEntry.java index e1ca12dda..212a01b03 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchEntry.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchEntry.java @@ -40,6 +40,7 @@ public class SwitchEntry { public static final String MASTERS = "masters"; public static final String DISTRO = "distro"; public static final String CHECK = "check"; + public static final String PUSH_ENABLED = "pushEnabled"; public static final String DEFAULT_HEALTH_CHECK_MODE = "defaultHealthCheckMode"; public static final String SERVICE_STATUS_SYNC_PERIOD = "serviceStatusSynchronizationPeriodMillis"; public static final String SERVER_STATUS_SYNC_PERIOD = "serverStatusSynchronizationPeriodMillis"; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchManager.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchManager.java index 2e3e97d34..10b67fccb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/SwitchManager.java @@ -218,6 +218,16 @@ public class SwitchManager implements DataListener { return; } + if (entry.equals(SwitchEntry.PUSH_ENABLED)) { + boolean enabled = Boolean.parseBoolean(value); + + switchDomain.setPushEnabled(enabled); + if (!debug) { + consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain); + } + return; + } + if (entry.equals(SwitchEntry.SERVICE_STATUS_SYNC_PERIOD)) { Long millis = Long.parseLong(value); @@ -362,6 +372,7 @@ public class SwitchManager implements DataListener { switchDomain.setDistroThreshold(newSwitchDomain.getDistroThreshold()); switchDomain.setHealthCheckEnabled(newSwitchDomain.isHealthCheckEnabled()); switchDomain.setDistroEnabled(newSwitchDomain.isDistroEnabled()); + switchDomain.setPushEnabled(newSwitchDomain.isPushEnabled()); switchDomain.setEnableStandalone(newSwitchDomain.isEnableStandalone()); switchDomain.setCheckTimes(newSwitchDomain.getCheckTimes()); switchDomain.setHttpHealthParams(newSwitchDomain.getHttpHealthParams()); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java b/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java index 877c40875..7b0774885 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java @@ -285,6 +285,11 @@ public class PushService { } public boolean canEnablePush(String agent) { + + if (!switchDomain.isPushEnabled()) { + return false; + } + ClientInfo clientInfo = new ClientInfo(agent); if (ClientInfo.ClientType.JAVA == clientInfo.type diff --git a/test/src/test/java/com/alibaba/nacos/test/naming/ServiceListTest.java b/test/src/test/java/com/alibaba/nacos/test/naming/ServiceListTest.java index 1b28d68be..5c82a5891 100644 --- a/test/src/test/java/com/alibaba/nacos/test/naming/ServiceListTest.java +++ b/test/src/test/java/com/alibaba/nacos/test/naming/ServiceListTest.java @@ -58,8 +58,7 @@ public class ServiceListTest { @Before public void init() throws Exception { if (naming == null) { -// naming = NamingFactory.createNamingService("127.0.0.1" + ":" + port); - naming = NamingFactory.createNamingService("11.239.112.161:8848,11.239.113.204:8848,11.239.114.187:8848"); + naming = NamingFactory.createNamingService("127.0.0.1" + ":" + port); } }