Remove ClientBeatProcessor.java and ServiceStatusSynchronizer.java

This commit is contained in:
KomachiSion 2022-08-26 17:13:28 +08:00
parent 9b68e45c97
commit db11e0bf5a
6 changed files with 0 additions and 605 deletions

View File

@ -23,9 +23,6 @@ import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Record;
@ -115,18 +112,6 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
this.ipDeleteTimeout = ipDeleteTimeout;
}
/**
* Process client beat.
*
* @param rsInfo metrics info of server
*/
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
public Boolean getEnabled() {
return enabled;
}

View File

@ -23,31 +23,22 @@ import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.InternetAddressUtil;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.Message;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.ServiceStatusSynchronizer;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.Synchronizer;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.InstanceOperationContext;
import com.alibaba.nacos.naming.pojo.InstanceOperationInfo;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -58,8 +49,6 @@ import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -79,10 +68,6 @@ public class ServiceManager implements RecordListener<Service> {
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
private final LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
private final Synchronizer synchronizer = new ServiceStatusSynchronizer();
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
@ -107,10 +92,6 @@ public class ServiceManager implements RecordListener<Service> {
*/
@PostConstruct
public void init() {
GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
try {
Loggers.SRV_LOG.info("listen for service meta change");
consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
@ -123,26 +104,6 @@ public class ServiceManager implements RecordListener<Service> {
return serviceMap.get(namespaceId);
}
/**
* Add a service into queue to update.
*
* @param namespaceId namespace
* @param serviceName service name
* @param serverIP target server ip
* @param checksum checksum of service
*/
public synchronized void addUpdatedServiceToQueue(String namespaceId, String serviceName, String serverIP,
String checksum) {
try {
toBeUpdatedServicesQueue
.offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
} catch (Exception e) {
toBeUpdatedServicesQueue.poll();
toBeUpdatedServicesQueue.add(new ServiceKey(namespaceId, serviceName, serverIP, checksum));
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add service to be updated to queue.", e);
}
}
@Override
public boolean interests(String key) {
return KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key);
@ -200,115 +161,6 @@ public class ServiceManager implements RecordListener<Service> {
chooseServiceMap(namespace).remove(name);
}
private class UpdatedServiceProcessor implements Runnable {
//get changed service from other server asynchronously
@Override
public void run() {
ServiceKey serviceKey = null;
try {
while (true) {
try {
serviceKey = toBeUpdatedServicesQueue.take();
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
}
if (serviceKey == null) {
continue;
}
GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
}
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
}
}
}
private class ServiceUpdater implements Runnable {
String namespaceId;
String serviceName;
String serverIP;
public ServiceUpdater(ServiceKey serviceKey) {
this.namespaceId = serviceKey.getNamespaceId();
this.serviceName = serviceKey.getServiceName();
this.serverIP = serviceKey.getServerIP();
}
@Override
public void run() {
try {
updatedHealthStatus(namespaceId, serviceName, serverIP);
} catch (Exception e) {
Loggers.SRV_LOG
.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,
serverIP, e);
}
}
}
/**
* Update health status of instance in service.
*
* @param namespaceId namespace
* @param serviceName service name
* @param serverIP source server Ip
*/
public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
JsonNode serviceJson = JacksonUtils.toObj(msg.getData());
ArrayNode ipList = (ArrayNode) serviceJson.get("ips");
Map<String, String> ipsMap = new HashMap<>(ipList.size());
for (int i = 0; i < ipList.size(); i++) {
String ip = ipList.get(i).asText();
String[] strings = ip.split("_");
ipsMap.put(strings[0], strings[1]);
}
Service service = getService(namespaceId, serviceName);
if (service == null) {
return;
}
boolean changed = false;
List<Instance> instances = service.allIPs();
for (Instance instance : instances) {
boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));
if (valid != instance.isHealthy()) {
changed = true;
instance.setHealthy(valid);
Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,
(instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),
instance.getClusterName());
}
}
if (changed) {
pushService.serviceChanged(service);
if (Loggers.EVT_LOG.isDebugEnabled()) {
StringBuilder stringBuilder = new StringBuilder();
List<Instance> allIps = service.allIPs();
for (Instance instance : allIps) {
stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(',');
}
Loggers.EVT_LOG
.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),
service.getName(), stringBuilder);
}
}
}
public Set<String> getAllServiceNames(String namespaceId) {
return serviceMap.get(namespaceId).keySet();
}
@ -1006,95 +858,6 @@ public class ServiceManager implements RecordListener<Service> {
Loggers.SRV_LOG.info("[DEAD-SERVICE] {}", service.toJson());
}
public static class ServiceChecksum {
public String namespaceId;
public Map<String, String> serviceName2Checksum = new HashMap<String, String>();
public ServiceChecksum() {
this.namespaceId = Constants.DEFAULT_NAMESPACE_ID;
}
public ServiceChecksum(String namespaceId) {
this.namespaceId = namespaceId;
}
/**
* Add service checksum.
*
* @param serviceName service name
* @param checksum checksum of service
*/
public void addItem(String serviceName, String checksum) {
if (StringUtils.isEmpty(serviceName) || StringUtils.isEmpty(checksum)) {
Loggers.SRV_LOG.warn("[DOMAIN-CHECKSUM] serviceName or checksum is empty,serviceName: {}, checksum: {}",
serviceName, checksum);
return;
}
serviceName2Checksum.put(serviceName, checksum);
}
}
private class ServiceReporter implements Runnable {
@Override
public void run() {
try {
Map<String, Set<String>> allServiceNames = getAllServiceNames();
if (allServiceNames.size() <= 0) {
//ignore
return;
}
for (String namespaceId : allServiceNames.keySet()) {
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
for (String serviceName : allServiceNames.get(namespaceId)) {
if (!distroMapper.responsible(serviceName)) {
continue;
}
Service service = getService(namespaceId, serviceName);
if (service == null || service.isEmpty()) {
continue;
}
service.recalculateChecksum();
checksum.addItem(serviceName, service.getChecksum());
}
Message msg = new Message();
msg.setData(JacksonUtils.toJson(checksum));
Collection<Member> sameSiteServers = memberManager.allMembers();
if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return;
}
for (Member server : sameSiteServers) {
if (server.getAddress().equals(NetUtils.localServer())) {
continue;
}
synchronizer.send(server.getAddress(), msg);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
} finally {
GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
TimeUnit.MILLISECONDS);
}
}
}
private static class ServiceKey {
private String namespaceId;

View File

@ -1,98 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.healthcheck;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.trace.event.naming.HealthStateChangeTraceEvent;
import com.alibaba.nacos.naming.healthcheck.heartbeat.BeatProcessor;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
/**
* Thread to update ephemeral instance triggered by client beat for v1.x.
*
* @author nkorange
*/
public class ClientBeatProcessor implements BeatProcessor {
private RsInfo rsInfo;
private Service service;
@JsonIgnore
public UdpPushService getPushService() {
return ApplicationUtils.getBean(UdpPushService.class);
}
public RsInfo getRsInfo() {
return rsInfo;
}
public void setRsInfo(RsInfo rsInfo) {
this.rsInfo = rsInfo;
}
public Service getService() {
return service;
}
public void setService(Service service) {
this.service = service;
}
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked() && !instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
getPushService().serviceChanged(service);
NotifyCenter.publishEvent(new HealthStateChangeTraceEvent(System.currentTimeMillis(),
service.getNamespaceId(), service.getGroupName(), service.getName(), instance.getIp(),
instance.getPort(), true, "client_beat"));
}
}
}
}
}

View File

@ -39,14 +39,6 @@ import java.util.concurrent.TimeUnit;
@SuppressWarnings({"checkstyle:indentation", "PMD.ThreadPoolCreationRule"})
public class GlobalExecutor {
public static final long HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5L);
public static final long LEADER_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15L);
public static final long RANDOM_MS = TimeUnit.SECONDS.toMillis(5L);
public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L);
private static final long SERVER_STATUS_UPDATE_PERIOD = TimeUnit.SECONDS.toMillis(5);
public static final int DEFAULT_THREAD_COUNT = EnvUtil.getAvailableProcessors(0.5);
@ -55,36 +47,6 @@ public class GlobalExecutor {
.newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
EnvUtil.getAvailableProcessors(2), new NameThreadFactory("com.alibaba.nacos.naming.timer"));
/**
* Service synchronization executor.
*
* @deprecated will remove in v2.1.x.
*/
@Deprecated
private static final ScheduledExecutorService SERVICE_SYNCHRONIZATION_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.service.worker"));
/**
* Service update manager executor.
*
* @deprecated will remove in v2.1.x.
*/
@Deprecated
public static final ScheduledExecutorService SERVICE_UPDATE_MANAGER_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.service.update.processor"));
/**
* thread pool that processes getting service detail from other server asynchronously.
*
* @deprecated will remove in v2.1.x.
*/
@Deprecated
private static final ExecutorService SERVICE_UPDATE_EXECUTOR = ExecutorFactory.Managed
.newFixedExecutorService(ClassUtils.getCanonicalName(NamingApp.class), 2,
new NameThreadFactory("com.alibaba.nacos.naming.service.update.http.handler"));
private static final ExecutorService MYSQL_CHECK_EXECUTOR = ExecutorFactory.Managed
.newFixedExecutorService(ClassUtils.getCanonicalName(NamingApp.class), DEFAULT_THREAD_COUNT,
new NameThreadFactory("com.alibaba.nacos.naming.mysql.checker"));
@ -121,69 +83,11 @@ public class GlobalExecutor {
private static final ExecutorService PUSH_CALLBACK_EXECUTOR = ExecutorFactory.Managed
.newSingleExecutorService("Push", new NameThreadFactory("com.alibaba.nacos.naming.push.callback"));
/**
* Register raft leader election executor.
*
* @param runnable leader election executor
* @return future
* @deprecated will removed with old raft
*/
@Deprecated
public static ScheduledFuture registerMasterElection(Runnable runnable) {
return NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}
public static void registerServerStatusUpdater(Runnable runnable) {
NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, SERVER_STATUS_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
}
/**
* Register raft heart beat executor.
*
* @param runnable heart beat executor
* @return future
* @deprecated will removed with old raft
*/
@Deprecated
public static ScheduledFuture registerHeartbeat(Runnable runnable) {
return NAMING_TIMER_EXECUTOR.scheduleWithFixedDelay(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}
/**
* Submit service update for v1.x.
*
* @param runnable runnable
* @deprecated will remove in v2.1.x.
*/
@Deprecated
public static void submitServiceUpdate(Runnable runnable) {
SERVICE_UPDATE_EXECUTOR.execute(runnable);
}
/**
* submitServiceUpdateManager.
*
* @param runnable runnable
* @deprecated will remove in v2.1.x.
*/
@Deprecated
public static void submitServiceUpdateManager(Runnable runnable) {
SERVICE_UPDATE_MANAGER_EXECUTOR.submit(runnable);
}
/**
* scheduleServiceReporter.
*
* @param command command
* @param delay delay
* @param unit time unit
* @deprecated will remove in v2.1.x.
*/
@Deprecated
public static void scheduleServiceReporter(Runnable command, long delay, TimeUnit unit) {
SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(command, delay, unit);
}
public static void executeMysqlCheckTask(Runnable runnable) {
MYSQL_CHECK_EXECUTOR.execute(runnable);
}

View File

@ -1,117 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.misc;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.InternetAddressUtil;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.constants.FieldsConstants;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.common.utils.StringUtils;
import java.util.HashMap;
import java.util.Map;
import static com.alibaba.nacos.common.constant.RequestUrlConstants.HTTP_PREFIX;
/**
* Service status ynchronizer.
*
* @author nacos
*/
public class ServiceStatusSynchronizer implements Synchronizer {
@Override
public void send(final String serverIp, Message msg) {
if (serverIp == null) {
return;
}
Map<String, String> params = new HashMap<String, String>(10);
params.put(FieldsConstants.STATUSES, msg.getData());
params.put(FieldsConstants.CLIENT_IP, NetUtils.localServer());
String url = HTTP_PREFIX + serverIp + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
if (InternetAddressUtil.containsPort(serverIp)) {
url = HTTP_PREFIX + serverIp + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ "/service/status";
}
try {
HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}",
serverIp);
}
}
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIp, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIp, e);
}
}
@Override
public Message get(String serverIp, String key) {
if (serverIp == null) {
return null;
}
Map<String, String> params = new HashMap<>(1);
String keyStr = "key";
params.put(keyStr, key);
String result;
try {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("[STATUS-SYNCHRONIZE] sync service status from: {}, service: {}", serverIp, key);
}
result = NamingProxy
.reqApi(EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/"
+ "statuses", params, serverIp);
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] Failed to get service status from " + serverIp, e);
return null;
}
if (result == null || result.equals(StringUtils.EMPTY)) {
return null;
}
Message msg = new Message();
msg.setData(result);
return msg;
}
}

View File

@ -1,42 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.misc;
/**
* Synchronizer.
*
* @author nacos
*/
public interface Synchronizer {
/**
* Send message to server.
*
* @param serverIp target server address
* @param msg message to send
*/
void send(String serverIp, Message msg);
/**
* Get message from server using message key.
*
* @param serverIp source server address
* @param key message key
* @return message
*/
Message get(String serverIp, String key);
}