refactor instance heart beat storage

This commit is contained in:
KomachiSion 2020-10-12 17:43:56 +08:00
parent 72f3476c57
commit 018e0bff8c
9 changed files with 34 additions and 489 deletions

View File

@ -17,7 +17,7 @@
package com.alibaba.nacos.naming.core.v2.client.impl;
import com.alibaba.nacos.naming.core.v2.client.AbstractClient;
import com.alibaba.nacos.naming.core.v2.metadata.MetadataConstants;
import com.alibaba.nacos.naming.core.v2.pojo.HeartBeatInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTaskV2;
@ -64,8 +64,7 @@ public class IpPortBasedClient extends AbstractClient {
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
instancePublishInfo.getExtendDatum().put(MetadataConstants.LAST_BEAT_TIME, System.currentTimeMillis());
return super.addServiceInstance(service, instancePublishInfo);
return super.addServiceInstance(service, parseToHeartBeatInstance(instancePublishInfo));
}
public Collection<InstancePublishInfo> getAllInstancePublishInfo() {
@ -75,4 +74,16 @@ public class IpPortBasedClient extends AbstractClient {
public void destroy() {
HealthCheckReactor.cancelCheck(beatCheckTask);
}
private InstancePublishInfo parseToHeartBeatInstance(InstancePublishInfo instancePublishInfo) {
if (instancePublishInfo instanceof HeartBeatInstancePublishInfo) {
return instancePublishInfo;
}
HeartBeatInstancePublishInfo result = new HeartBeatInstancePublishInfo();
result.setIp(instancePublishInfo.getIp());
result.setPort(instancePublishInfo.getPort());
result.setHealthy(instancePublishInfo.isHealthy());
result.setExtendDatum(instancePublishInfo.getExtendDatum());
return result;
}
}

View File

@ -23,7 +23,7 @@ import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
import com.alibaba.nacos.naming.core.v2.metadata.MetadataConstants;
import com.alibaba.nacos.naming.core.v2.pojo.HeartBeatInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
@ -100,7 +100,7 @@ public class IpPortBasedClientManager implements ClientManager {
long currentTime = System.currentTimeMillis();
if (null != client) {
for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {
each.getExtendDatum().put(MetadataConstants.LAST_BEAT_TIME, currentTime);
((HeartBeatInstancePublishInfo) each).setLastHeartBeatTime(currentTime);
}
client.setLastUpdatedTime();
return true;

View File

@ -14,14 +14,22 @@
* limitations under the License.
*/
package com.alibaba.nacos.naming.core.v2.metadata;
package com.alibaba.nacos.naming.core.v2.pojo;
/**
* Metadata constants.
* Instance publish info with heart beat time for v1.x.
*
* @author xiweng.yy
*/
public class MetadataConstants {
public class HeartBeatInstancePublishInfo extends InstancePublishInfo {
public static final String LAST_BEAT_TIME = "lastBeatTime";
private long lastHeartBeatTime = System.currentTimeMillis();
public long getLastHeartBeatTime() {
return lastHeartBeatTime;
}
public void setLastHeartBeatTime(long lastHeartBeatTime) {
this.lastHeartBeatTime = lastHeartBeatTime;
}
}

View File

@ -27,7 +27,7 @@ import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
import com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent;
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
import com.alibaba.nacos.naming.core.v2.metadata.MetadataConstants;
import com.alibaba.nacos.naming.core.v2.pojo.HeartBeatInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.GlobalConfig;
@ -83,8 +83,8 @@ public class ClientBeatCheckTaskV2 implements BeatCheckTask {
boolean expireInstance = getGlobalConfig().isExpireInstance();
Collection<Service> services = client.getAllPublishedService();
for (Service each : services) {
InstancePublishInfo instance = client.getInstancePublishInfo(each);
long lastBeatTime = (long) instance.getExtendDatum().get(MetadataConstants.LAST_BEAT_TIME);
HeartBeatInstancePublishInfo instance = (HeartBeatInstancePublishInfo) client.getInstancePublishInfo(each);
long lastBeatTime = instance.getLastHeartBeatTime();
if (instance.isHealthy() && isUnhealthy(instance, lastBeatTime)) {
changeHealthyStatus(each, instance, lastBeatTime);
}

View File

@ -20,8 +20,7 @@ import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
import com.alibaba.nacos.naming.core.v2.metadata.MetadataConstants;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.HeartBeatInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
@ -55,12 +54,12 @@ public class ClientBeatProcessorV2 implements BeatProcessor {
String serviceName = NamingUtils.getServiceName(rsInfo.getServiceName());
String groupName = NamingUtils.getGroupName(rsInfo.getServiceName());
Service service = Service.newService(namespace, groupName, serviceName, rsInfo.isEphemeral());
InstancePublishInfo instance = client.getInstancePublishInfo(service);
HeartBeatInstancePublishInfo instance = (HeartBeatInstancePublishInfo) client.getInstancePublishInfo(service);
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
instance.getExtendDatum().put(MetadataConstants.LAST_BEAT_TIME, System.currentTimeMillis());
instance.setLastHeartBeatTime(System.currentTimeMillis());
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",

View File

@ -1,129 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Remoting Connection Instance.
*
* @author xiweng.yy
*/
public class RemotingConnection {
private final ConcurrentMap<String, Set<Subscriber>> subscriberIndex = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Set<Instance>> instanceIndex = new ConcurrentHashMap<>();
private final Connection connection;
private long lastHeartBeatTime;
public RemotingConnection(Connection connection) {
this.connection = connection;
this.lastHeartBeatTime = System.currentTimeMillis();
}
public String getConnectionId() {
return connection.getMetaInfo().getConnectionId();
}
/**
* Add new instance.
*
* @param namespaceId namespace Id
* @param fullServiceName full service name with group name
* @param instance instance
*/
public void addNewInstance(String namespaceId, String fullServiceName, Instance instance) {
String key = KeyBuilder.buildServiceMetaKey(namespaceId, fullServiceName);
if (!instanceIndex.containsKey(key)) {
instanceIndex.put(key, new ConcurrentHashSet<>());
}
instanceIndex.get(key).add(instance);
}
/**
* Remove instance.
*
* @param namespaceId namespace Id
* @param fullServiceName full service name with group name
* @param instance instance
*/
public void removeInstance(String namespaceId, String fullServiceName, Instance instance) {
String key = KeyBuilder.buildServiceMetaKey(namespaceId, fullServiceName);
if (!instanceIndex.containsKey(key)) {
return;
}
instanceIndex.get(key).remove(instance);
}
/**
* Add new subscriber.
*
* @param namespaceId namespace Id
* @param fullServiceName full service name with group name
* @param subscriber subscriber
*/
public void addNewSubscriber(String namespaceId, String fullServiceName, Subscriber subscriber) {
String key = UtilsAndCommons.assembleFullServiceName(namespaceId, fullServiceName);
if (!subscriberIndex.containsKey(key)) {
subscriberIndex.put(key, new ConcurrentHashSet<>());
}
subscriberIndex.get(key).add(subscriber);
}
/**
* Remove subscriber.
*
* @param namespaceId namespace Id
* @param fullServiceName full service name with group name
* @param subscriber subscriber
*/
public void removeSubscriber(String namespaceId, String fullServiceName, Subscriber subscriber) {
String key = UtilsAndCommons.assembleFullServiceName(namespaceId, fullServiceName);
if (!subscriberIndex.containsKey(key)) {
return;
}
subscriberIndex.get(key).remove(subscriber);
}
public ConcurrentMap<String, Set<Subscriber>> getSubscriberIndex() {
return subscriberIndex;
}
public ConcurrentMap<String, Set<Instance>> getInstanceIndex() {
return instanceIndex;
}
public long getLastHeartBeatTime() {
return lastHeartBeatTime;
}
public void setLastHeartBeatTime(long lastHeartBeatTime) {
this.lastHeartBeatTime = lastHeartBeatTime;
}
}

View File

@ -1,137 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.core.remote.ClientConnectionEventListener;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.push.RemotePushService;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* Remoting connection holder.
*
* @author xiweng.yy
*/
public class RemotingConnectionHolder extends ClientConnectionEventListener {
private final ConcurrentMap<String, RemotingConnection> connectionCache = new ConcurrentHashMap<>();
private final RemotePushService remotePushService;
private final ServiceManager serviceManager;
public RemotingConnectionHolder(RemotePushService remotePushService, ServiceManager serviceManager) {
this.remotePushService = remotePushService;
this.serviceManager = serviceManager;
GlobalExecutor.scheduleRemoteConnectionManager(new RemotingConnectionCleaner(this), 0,
Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
}
@Override
public void clientConnected(Connection connect) {
Loggers.SRV_LOG.info("Client connection {} connect", connect.getMetaInfo().getConnectionId());
if (!connectionCache.containsKey(connect.getMetaInfo().getConnectionId())) {
connectionCache.put(connect.getMetaInfo().getConnectionId(), new RemotingConnection(connect));
}
}
@Override
public void clientDisConnected(Connection connect) {
clientDisConnected(connect.getMetaInfo().getConnectionId());
}
private void clientDisConnected(String connectionId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", connectionId);
RemotingConnection remotingConnection = connectionCache.remove(connectionId);
if (null == remotingConnection) {
return;
}
for (String each : remotingConnection.getSubscriberIndex().keySet()) {
remotePushService.removeAllSubscribeForService(each);
}
for (Map.Entry<String, Set<Instance>> entry : remotingConnection.getInstanceIndex().entrySet()) {
String namespace = KeyBuilder.getNamespace(entry.getKey());
String serviceName = KeyBuilder.getServiceName(entry.getKey());
for (Instance each : entry.getValue()) {
try {
serviceManager.removeInstance(namespace, serviceName, true, each);
} catch (NacosException e) {
Loggers.SRV_LOG.error("Remove instance {} for service {}##{} failed. ", each.toIpAddr(), namespace,
serviceName, e);
}
}
}
}
public RemotingConnection getRemotingConnection(String connectionId) {
return connectionCache.get(connectionId);
}
public Collection<String> getAllConnectionId() {
return connectionCache.keySet();
}
/**
* Renew remoting connection.
*
* @param connectionId connection id
*/
public void renewRemotingConnection(String connectionId) {
if (!connectionCache.containsKey(connectionId)) {
return;
}
RemotingConnection remotingConnection = connectionCache.get(connectionId);
remotingConnection.setLastHeartBeatTime(System.currentTimeMillis());
}
private static class RemotingConnectionCleaner implements Runnable {
private final RemotingConnectionHolder remotingConnectionHolder;
public RemotingConnectionCleaner(RemotingConnectionHolder remotingConnectionHolder) {
this.remotingConnectionHolder = remotingConnectionHolder;
}
@Override
public void run() {
long currentTime = System.currentTimeMillis();
for (String each : remotingConnectionHolder.getAllConnectionId()) {
RemotingConnection remotingConnection = remotingConnectionHolder.getRemotingConnection(each);
if (null != remotingConnection && isExpireConnection(currentTime, remotingConnection)) {
remotingConnectionHolder.clientDisConnected(each);
}
}
}
private boolean isExpireConnection(long currentTime, RemotingConnection remotingConnection) {
return currentTime - remotingConnection.getLastHeartBeatTime() > Constants.DEFAULT_IP_DELETE_TIMEOUT * 2;
}
}
}

View File

@ -1,115 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote.worker;
import com.alibaba.nacos.common.lifecycle.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Remoting worker.
*
* @author xiweng.yy
*/
public final class RemotingWorker implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RemotingWorker.class);
private static final String SEPARATOR = "_";
private static final int QUEUE_CAPACITY = 50000;
private final BlockingQueue<Runnable> queue;
private final String name;
private final InnerWorker worker;
public RemotingWorker(final int mod, final int total) {
name = getClass().getName() + "_" + mod + "%" + total;
queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
worker = new InnerWorker(name);
worker.start();
}
public String getName() {
return name;
}
/**
* Execute task.
*/
public void execute(Runnable task) {
putTask(task);
}
private void putTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException ire) {
LOGGER.error(ire.toString(), ire);
}
}
public int pendingTaskCount() {
return queue.size();
}
@Override
public void shutdown() {
worker.shutdown();
queue.clear();
}
/**
* Real worker thread.
*/
private class InnerWorker extends Thread implements Closeable {
private volatile boolean start = true;
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
while (start) {
try {
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
LOGGER.warn("it takes {}ms to run task {}", duration, task);
}
} catch (Throwable e) {
LOGGER.error("[remoting-worker-error] " + e.toString(), e);
}
}
}
@Override
public void shutdown() {
start = false;
}
}
}

View File

@ -1,92 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote.worker;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.lifecycle.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Remoting workers manager.
*
* @author xiweng.yy
*/
public final class RemotingWorkersManager implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RemotingWorkersManager.class);
private static final int TIMES_FOR_CORE = 2;
/**
* power of 2.
*/
private static final RemotingWorker[] REMOTING_WORKERS;
private RemotingWorkersManager() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
shutdown();
} catch (NacosException nacosException) {
LOGGER.warn("shutdown RemotingWorkersManager failed", nacosException);
}
}));
}
static {
// Find a power of 2 >= cpuCores * 2.
final int coreCount = Runtime.getRuntime().availableProcessors();
int workerCount = 1;
while (workerCount < coreCount * TIMES_FOR_CORE) {
workerCount <<= 1;
}
REMOTING_WORKERS = new RemotingWorker[workerCount];
for (int mod = 0; mod < workerCount; ++mod) {
REMOTING_WORKERS[mod] = new RemotingWorker(mod, workerCount);
}
}
/**
* Dispatch task by connectionId.
*/
public static void dispatch(String connectionId, Runnable task) {
RemotingWorker worker = getWorker(connectionId);
worker.execute(task);
}
/**
* Get worker of connection id.
*
* @param connectionId connection Id
* @return remoting worker
*/
private static RemotingWorker getWorker(String connectionId) {
int idx = connectionId.hashCode() & (REMOTING_WORKERS.length - 1);
return REMOTING_WORKERS[idx];
}
public static int workersCount() {
return REMOTING_WORKERS.length;
}
@Override
public void shutdown() throws NacosException {
for (RemotingWorker each : REMOTING_WORKERS) {
each.shutdown();
}
}
}