#634 Add push switch
This commit is contained in:
parent
4e6efbdde6
commit
9f5f8bb379
@ -88,6 +88,9 @@ public class DataSyncer implements ServerChangeListener {
|
||||
String key = iterator.next();
|
||||
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
|
||||
// associated key already exist:
|
||||
if (Loggers.EPHEMERAL.isDebugEnabled()) {
|
||||
Loggers.EPHEMERAL.debug("sync already in process, key: {}", key);
|
||||
}
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
@ -213,6 +216,10 @@ public class DataSyncer implements ServerChangeListener {
|
||||
return;
|
||||
}
|
||||
|
||||
if (Loggers.EPHEMERAL.isDebugEnabled()) {
|
||||
Loggers.EPHEMERAL.debug("sync checksums: {}", keyChecksums);
|
||||
}
|
||||
|
||||
for (Server member : servers) {
|
||||
if (NetUtils.localServer().equals(member.getKey())) {
|
||||
continue;
|
||||
|
@ -140,6 +140,11 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
|
||||
}
|
||||
|
||||
for (String key : dataStore.keys()) {
|
||||
|
||||
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!timestamps.containsKey(key)) {
|
||||
toRemoveKeys.add(key);
|
||||
}
|
||||
|
@ -19,7 +19,6 @@ import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
import com.alibaba.nacos.naming.pojo.Record;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.nio.charset.Charset;
|
||||
@ -36,7 +35,7 @@ import java.util.List;
|
||||
*/
|
||||
public class Instances implements Record {
|
||||
|
||||
private String checksum;
|
||||
private String cachedChecksum;
|
||||
|
||||
private long lastCalculateTime = 0L;
|
||||
|
||||
@ -57,11 +56,12 @@ public class Instances implements Record {
|
||||
|
||||
@Override
|
||||
public String getChecksum() {
|
||||
if (StringUtils.isBlank(checksum) ||
|
||||
(System.currentTimeMillis() - lastCalculateTime) > 5000L) {
|
||||
recalculateChecksum();
|
||||
return cachedChecksum;
|
||||
}
|
||||
return checksum;
|
||||
|
||||
public String getCachedChecksum() {
|
||||
return cachedChecksum;
|
||||
}
|
||||
|
||||
private void recalculateChecksum() {
|
||||
@ -76,11 +76,11 @@ public class Instances implements Record {
|
||||
MessageDigest md5;
|
||||
try {
|
||||
md5 = MessageDigest.getInstance("MD5");
|
||||
checksum =
|
||||
cachedChecksum =
|
||||
new BigInteger(1, md5.digest((sb.toString()).getBytes(Charset.forName("UTF-8")))).toString(16);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
Loggers.SRV_LOG.error("error while calculating checksum(md5) for instances", e);
|
||||
checksum = RandomStringUtils.randomAscii(32);
|
||||
cachedChecksum = RandomStringUtils.randomAscii(32);
|
||||
}
|
||||
lastCalculateTime = System.currentTimeMillis();
|
||||
}
|
||||
|
@ -30,13 +30,13 @@ public class ResponseExceptionHandler {
|
||||
|
||||
@ExceptionHandler(NacosException.class)
|
||||
private ResponseEntity<String> handleNacosException(NacosException e) {
|
||||
Loggers.SRV_LOG.error("got exception.", e);
|
||||
Loggers.SRV_LOG.error("got exception. {}", e.getErrorMsg(), e);
|
||||
return ResponseEntity.status(e.getErrorCode()).body(e.getMessage());
|
||||
}
|
||||
|
||||
@ExceptionHandler(IllegalArgumentException.class)
|
||||
public ResponseEntity<String> handleParameterError(IllegalArgumentException ex) {
|
||||
Loggers.SRV_LOG.error("got exception.", ex);
|
||||
Loggers.SRV_LOG.error("got exception. {}", ex.getMessage(), ex);
|
||||
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
|
||||
}
|
||||
|
||||
@ -48,7 +48,7 @@ public class ResponseExceptionHandler {
|
||||
}
|
||||
|
||||
@ExceptionHandler(Exception.class)
|
||||
private ResponseEntity<String> handleNacosException(Exception e) {
|
||||
private ResponseEntity<String> handleException(Exception e) {
|
||||
Loggers.SRV_LOG.error("got exception.", e);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.toString());
|
||||
}
|
||||
|
@ -50,6 +50,8 @@ public class SwitchDomain implements Record, Cloneable {
|
||||
|
||||
public boolean enableStandalone = true;
|
||||
|
||||
public boolean pushEnabled = true;
|
||||
|
||||
public int checkTimes = 3;
|
||||
|
||||
public HttpHealthParams httpHealthParams = new HttpHealthParams();
|
||||
@ -221,6 +223,14 @@ public class SwitchDomain implements Record, Cloneable {
|
||||
this.distroEnabled = distroEnabled;
|
||||
}
|
||||
|
||||
public boolean isPushEnabled() {
|
||||
return pushEnabled;
|
||||
}
|
||||
|
||||
public void setPushEnabled(boolean pushEnabled) {
|
||||
this.pushEnabled = pushEnabled;
|
||||
}
|
||||
|
||||
public int getCheckTimes() {
|
||||
return checkTimes;
|
||||
}
|
||||
|
@ -40,6 +40,7 @@ public class SwitchEntry {
|
||||
public static final String MASTERS = "masters";
|
||||
public static final String DISTRO = "distro";
|
||||
public static final String CHECK = "check";
|
||||
public static final String PUSH_ENABLED = "pushEnabled";
|
||||
public static final String DEFAULT_HEALTH_CHECK_MODE = "defaultHealthCheckMode";
|
||||
public static final String SERVICE_STATUS_SYNC_PERIOD = "serviceStatusSynchronizationPeriodMillis";
|
||||
public static final String SERVER_STATUS_SYNC_PERIOD = "serverStatusSynchronizationPeriodMillis";
|
||||
|
@ -218,6 +218,16 @@ public class SwitchManager implements DataListener<SwitchDomain> {
|
||||
return;
|
||||
}
|
||||
|
||||
if (entry.equals(SwitchEntry.PUSH_ENABLED)) {
|
||||
boolean enabled = Boolean.parseBoolean(value);
|
||||
|
||||
switchDomain.setPushEnabled(enabled);
|
||||
if (!debug) {
|
||||
consistencyService.put(UtilsAndCommons.getSwitchDomainKey(), switchDomain);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (entry.equals(SwitchEntry.SERVICE_STATUS_SYNC_PERIOD)) {
|
||||
Long millis = Long.parseLong(value);
|
||||
|
||||
@ -362,6 +372,7 @@ public class SwitchManager implements DataListener<SwitchDomain> {
|
||||
switchDomain.setDistroThreshold(newSwitchDomain.getDistroThreshold());
|
||||
switchDomain.setHealthCheckEnabled(newSwitchDomain.isHealthCheckEnabled());
|
||||
switchDomain.setDistroEnabled(newSwitchDomain.isDistroEnabled());
|
||||
switchDomain.setPushEnabled(newSwitchDomain.isPushEnabled());
|
||||
switchDomain.setEnableStandalone(newSwitchDomain.isEnableStandalone());
|
||||
switchDomain.setCheckTimes(newSwitchDomain.getCheckTimes());
|
||||
switchDomain.setHttpHealthParams(newSwitchDomain.getHttpHealthParams());
|
||||
|
@ -285,6 +285,11 @@ public class PushService {
|
||||
}
|
||||
|
||||
public boolean canEnablePush(String agent) {
|
||||
|
||||
if (!switchDomain.isPushEnabled()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ClientInfo clientInfo = new ClientInfo(agent);
|
||||
|
||||
if (ClientInfo.ClientType.JAVA == clientInfo.type
|
||||
|
@ -58,8 +58,7 @@ public class ServiceListTest {
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
if (naming == null) {
|
||||
// naming = NamingFactory.createNamingService("127.0.0.1" + ":" + port);
|
||||
naming = NamingFactory.createNamingService("11.239.112.161:8848,11.239.113.204:8848,11.239.114.187:8848");
|
||||
naming = NamingFactory.createNamingService("127.0.0.1" + ":" + port);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user