From 79f83f3a67aa842667643f8b178d6a02641bcc80 Mon Sep 17 00:00:00 2001 From: nkorange Date: Wed, 6 Mar 2019 17:01:25 +0800 Subject: [PATCH] #502 Refactor warm up --- .../consistency/ConsistencyService.java | 18 --- .../DelegateConsistencyServiceImpl.java | 24 ++- .../ephemeral/partition/DataStore.java | 4 + .../ephemeral/partition/DataSyncer.java | 42 +---- .../PartitionConsistencyServiceImpl.java | 150 +++++++++++++----- .../raft/RaftConsistencyServiceImpl.java | 10 -- .../controllers/PartitionController.java | 25 ++- .../nacos/naming/misc/NamingProxy.java | 25 ++- 8 files changed, 161 insertions(+), 137 deletions(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java index c98f1174a..17c5d5dd9 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java @@ -77,24 +77,6 @@ public interface ConsistencyService { */ void unlisten(String key, RecordListener listener) throws NacosException; - /** - * Is the local server responsible for a data. - *

- * Any write operation to a data in a server not responsible for the data is refused. - * - * @param key key of data - * @return true if the local server is responsible for the data - */ - boolean isResponsible(String key); - - /** - * Get the responsible server for a data - * - * @param key key of data - * @return responsible server for the data - */ - String getResponsibleServer(String key); - /** * Tell the status of this consistency service * diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/DelegateConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/DelegateConsistencyServiceImpl.java index 1aa0825a9..7ae64a929 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/DelegateConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/DelegateConsistencyServiceImpl.java @@ -18,13 +18,12 @@ package com.alibaba.nacos.naming.consistency; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService; import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService; -import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.pojo.Record; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * Publish execution delegate + * Consistency delegate * * @author nkorange * @since 1.0.0 @@ -35,9 +34,6 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService { @Autowired private PersistentConsistencyService persistentConsistencyService; - @Autowired - private DistroMapper distroMapper; - @Autowired private EphemeralConsistencyService ephemeralConsistencyService; @@ -58,6 +54,14 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService { @Override public void listen(String key, RecordListener listener) throws NacosException { + + // this special key is listened by both: + if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) { + persistentConsistencyService.listen(key, listener); + ephemeralConsistencyService.listen(key, listener); + return; + } + mapConsistencyService(key).listen(key, listener); } @@ -66,16 +70,6 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService { mapConsistencyService(key).unlisten(key, listener); } - @Override - public boolean isResponsible(String key) { - return distroMapper.responsible(KeyBuilder.getServiceName(key)); - } - - @Override - public String getResponsibleServer(String key) { - return distroMapper.mapSrv(KeyBuilder.getServiceName(key)); - } - @Override public boolean isAvailable() { return ephemeralConsistencyService.isAvailable() && persistentConsistencyService.isAvailable(); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataStore.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataStore.java index eb0281587..45ec94d11 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataStore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataStore.java @@ -80,4 +80,8 @@ public class DataStore { } return count; } + + public Map getDataMap() { + return dataMap; + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataSyncer.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataSyncer.java index 20bf3dfb7..2fa27c389 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataSyncer.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/DataSyncer.java @@ -15,7 +15,6 @@ */ package com.alibaba.nacos.naming.consistency.ephemeral.partition; -import com.alibaba.nacos.common.util.IoUtils; import com.alibaba.nacos.naming.cluster.ServerListManager; import com.alibaba.nacos.naming.cluster.servers.Server; import com.alibaba.nacos.naming.cluster.servers.ServerChangeListener; @@ -30,18 +29,12 @@ import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static org.apache.commons.lang3.CharEncoding.UTF_8; - /** * Data replicator * @@ -71,8 +64,6 @@ public class DataSyncer implements ServerChangeListener { private List servers; - private boolean initialized = false; - @PostConstruct public void init() { serverListManager.listen(this); @@ -175,33 +166,6 @@ public class DataSyncer implements ServerChangeListener { @Override public void run() { - try { - - File metaFile = new File(UtilsAndCommons.DATA_BASE_DIR + File.separator + "ephemeral.properties"); - if (initialized) { - // write the current instance count to disk: - IoUtils.writeStringToFile(metaFile, "instanceCount=" + dataStore.getInstanceCount(), "UTF-8"); - } else { - // check if most of the data are loaded: - List lines = IoUtils.readLines(new InputStreamReader(new FileInputStream(metaFile), UTF_8)); - if (lines == null || lines.isEmpty()) { - initialized = true; - } else { - int desiredInstanceCount = Integer.parseInt(lines.get(0).split("=")[1]); - if (desiredInstanceCount <= 0 || - desiredInstanceCount * partitionConfig.getInitDataRatio() < dataStore.keys().size()) { - initialized = true; - } - } - } - - } catch (IOException ioe) { - initialized = true; - Loggers.EPHEMERAL.error("operate on meta file failed.", ioe); - } catch (Exception e) { - Loggers.EPHEMERAL.error("operate on meta file failed.", e); - } - try { // send local timestamps to other servers: Map keyChecksums = new HashMap<>(64); @@ -225,7 +189,7 @@ public class DataSyncer implements ServerChangeListener { if (NetUtils.localServer().equals(member.getKey())) { continue; } - NamingProxy.syncTimestamps(keyChecksums, member.getKey()); + NamingProxy.syncChecksums(keyChecksums, member.getKey()); } } catch (Exception e) { Loggers.EPHEMERAL.error("timed sync task failed.", e); @@ -241,10 +205,6 @@ public class DataSyncer implements ServerChangeListener { return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer; } - public boolean isInitialized() { - return initialized; - } - @Override public void onChangeServerList(List latestMembers) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/PartitionConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/PartitionConsistencyServiceImpl.java index dedb6d52b..0796a5feb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/PartitionConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/partition/PartitionConsistencyServiceImpl.java @@ -15,20 +15,28 @@ */ package com.alibaba.nacos.naming.consistency.ephemeral.partition; +import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.core.utils.SystemUtils; +import com.alibaba.nacos.naming.cluster.ServerListManager; +import com.alibaba.nacos.naming.cluster.ServerMode; +import com.alibaba.nacos.naming.cluster.servers.Server; import com.alibaba.nacos.naming.cluster.transport.Serializer; -import com.alibaba.nacos.naming.consistency.RecordListener; import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.KeyBuilder; +import com.alibaba.nacos.naming.consistency.RecordListener; import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService; import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.core.Instances; +import com.alibaba.nacos.naming.core.Service; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.NamingProxy; +import com.alibaba.nacos.naming.misc.NetUtils; +import com.alibaba.nacos.naming.misc.SwitchDomain; import com.alibaba.nacos.naming.pojo.Record; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -48,7 +56,7 @@ import java.util.concurrent.ConcurrentHashMap; * @author nkorange * @since 1.0.0 */ -@Service("partitionConsistencyService") +@org.springframework.stereotype.Service("partitionConsistencyService") public class PartitionConsistencyServiceImpl implements EphemeralConsistencyService { @Autowired @@ -66,8 +74,46 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ @Autowired private Serializer serializer; + @Autowired + private ServerListManager serverListManager; + + @Autowired + private SwitchDomain switchDomain; + + private boolean initialized = false; + private volatile Map> listeners = new ConcurrentHashMap<>(); + @PostConstruct + public void init() throws Exception { + + if (SystemUtils.STANDALONE_MODE) { + initialized = true; + return; + } + while (serverListManager.getHealthyServers().isEmpty()) { + Thread.sleep(1000L); + Loggers.EPHEMERAL.info("waiting server list init..."); + } + + for (Server server : serverListManager.getHealthyServers()) { + if (NetUtils.localServer().equals(server.getKey())) { + continue; + } + // try sync data from remote server: + if (syncAllDataFromRemote(server)) { + initialized = true; + break; + } + } + + if (!initialized) { + // init failed, exit: + throw new RuntimeException("init local server failed! Abort."); + } + + } + @Override public void put(String key, Record value) throws NacosException { onPut(key, value); @@ -122,12 +168,12 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ } } - public void onReceiveTimestamps(Map timestamps, String server) { + public void onReceiveChecksums(Map checksumMap, String server) { List toUpdateKeys = new ArrayList<>(); List toRemoveKeys = new ArrayList<>(); - for (Map.Entry entry : timestamps.entrySet()) { - if (isResponsible(entry.getKey())) { + for (Map.Entry entry : checksumMap.entrySet()) { + if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) { // this key should not be sent from remote server: Loggers.EPHEMERAL.error("receive responsible key timestamp of " + entry.getKey() + " from " + server); // abort the procedure: @@ -146,7 +192,7 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ continue; } - if (!timestamps.containsKey(key)) { + if (!checksumMap.containsKey(key)) { toRemoveKeys.add(key); } } @@ -163,31 +209,71 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ try { byte[] result = NamingProxy.getData(toUpdateKeys, server); - if (result.length > 0) { - Map> datumMap = - serializer.deserializeMap(result, Instances.class); - - for (Map.Entry> entry : datumMap.entrySet()) { - dataStore.put(entry.getKey(), entry.getValue()); - - if (!listeners.containsKey(entry.getKey())) { - return; - } - for (RecordListener listener : listeners.get(entry.getKey())) { - try { - listener.onChange(entry.getKey(), entry.getValue().value); - } catch (Exception e) { - Loggers.EPHEMERAL.error("notify " + listener + ", key: " + entry.getKey() + " failed.", e); - } - } - } - } + processData(result); } catch (Exception e) { Loggers.EPHEMERAL.error("get data from " + server + " failed!", e); } } + public boolean syncAllDataFromRemote(Server server) { + + try { + byte[] data = NamingProxy.getAllData(server.getKey()); + processData(data); + return true; + } catch (Exception e) { + Loggers.EPHEMERAL.error("sync full data from " + server + " failed!"); + return false; + } + } + + public void processData(byte[] data) throws Exception { + if (data.length > 0) { + Map> datumMap = + serializer.deserializeMap(data, Instances.class); + + + for (Map.Entry> entry : datumMap.entrySet()) { + dataStore.put(entry.getKey(), entry.getValue()); + + if (!listeners.containsKey(entry.getKey())) { + // pretty sure the service not exist: + if (ServerMode.AP.name().equals(switchDomain.getServerMode())) { + // create empty service + Service service = new Service(); + String serviceName = KeyBuilder.getServiceName(entry.getKey()); + String namespaceId = KeyBuilder.getNamespace(entry.getKey()); + service.setName(serviceName); + service.setNamespaceId(namespaceId); + service.setGroupName(Constants.DEFAULT_GROUP); + // now validate the service. if failed, exception will be thrown + service.setLastModifiedMillis(System.currentTimeMillis()); + service.recalculateChecksum(); + listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0) + .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); + } + } + } + + for (Map.Entry> entry : datumMap.entrySet()) { + dataStore.put(entry.getKey(), entry.getValue()); + + if (!listeners.containsKey(entry.getKey())) { + Loggers.EPHEMERAL.warn("listener not found: {}", entry.getKey()); + continue; + } + for (RecordListener listener : listeners.get(entry.getKey())) { + try { + listener.onChange(entry.getKey(), entry.getValue().value); + } catch (Exception e) { + Loggers.EPHEMERAL.error("notify " + listener + ", key: " + entry.getKey() + " failed.", e); + } + } + } + } + } + @Override public void listen(String key, RecordListener listener) throws NacosException { if (!listeners.containsKey(key)) { @@ -209,18 +295,8 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ } } - @Override - public boolean isResponsible(String key) { - return distroMapper.responsible(KeyBuilder.getServiceName(key)); - } - - @Override - public String getResponsibleServer(String key) { - return distroMapper.mapSrv(KeyBuilder.getServiceName(key)); - } - @Override public boolean isAvailable() { - return dataSyncer.isInitialized(); + return initialized; } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftConsistencyServiceImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftConsistencyServiceImpl.java index 0e3272229..0c6750940 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftConsistencyServiceImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftConsistencyServiceImpl.java @@ -71,16 +71,6 @@ public class RaftConsistencyServiceImpl implements PersistentConsistencyService raftCore.unlisten(key, listener); } - @Override - public boolean isResponsible(String key) { - return false; - } - - @Override - public String getResponsibleServer(String key) { - return null; - } - @Override public boolean isAvailable() { return raftCore.isInitialized(); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/PartitionController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/PartitionController.java index b9e91dc62..12315bbb1 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/PartitionController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/PartitionController.java @@ -21,6 +21,7 @@ import com.alibaba.nacos.naming.cluster.ServerMode; import com.alibaba.nacos.naming.cluster.transport.Serializer; import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.KeyBuilder; +import com.alibaba.nacos.naming.consistency.ephemeral.partition.DataStore; import com.alibaba.nacos.naming.consistency.ephemeral.partition.PartitionConsistencyServiceImpl; import com.alibaba.nacos.naming.core.Instances; import com.alibaba.nacos.naming.core.ServiceManager; @@ -56,6 +57,9 @@ public class PartitionController { @Autowired private PartitionConsistencyServiceImpl consistencyService; + @Autowired + private DataStore dataStore; + @Autowired private ServiceManager serviceManager; @@ -89,24 +93,14 @@ public class PartitionController { return "ok"; } - @RequestMapping(value = "/timestamps", method = RequestMethod.PUT) - public String syncTimestamps(HttpServletRequest request, HttpServletResponse response) throws Exception { + @RequestMapping(value = "/checksum", method = RequestMethod.PUT) + public String syncChecksum(HttpServletRequest request, HttpServletResponse response) throws Exception { String source = WebUtils.required(request, "source"); String entity = IOUtils.toString(request.getInputStream(), "UTF-8"); Map dataMap = serializer.deserialize(entity.getBytes(), new TypeReference>() { }); - - for (String key : dataMap.keySet()) { - String namespaceId = KeyBuilder.getNamespace(key); - String serviceName = KeyBuilder.getServiceName(key); - if (!serviceManager.containService(namespaceId, serviceName) - && ServerMode.AP.name().equals(switchDomain.getServerMode())) { - serviceManager.createEmptyService(namespaceId, serviceName); - } - } - - consistencyService.onReceiveTimestamps(dataMap, source); + consistencyService.onReceiveChecksums(dataMap, source); return "ok"; } @@ -120,4 +114,9 @@ public class PartitionController { } response.getWriter().write(new String(serializer.serialize(datumMap), "UTF-8")); } + + @RequestMapping(value = "/datums", method = RequestMethod.GET) + public void getAllDatums(HttpServletRequest request, HttpServletResponse response) throws Exception { + response.getWriter().write(new String(serializer.serialize(dataStore.getDataMap()), "UTF-8")); + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java index 5f9e6d0dd..370d52113 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java @@ -35,9 +35,11 @@ public class NamingProxy { private static final String DATA_GET_URL = "/partition/datum"; - private static final String TIMESTAMP_SYNC_URL = "/partition/timestamps"; + private static final String ALL_DATA_GET_URL = "/partition/datums"; - public static void syncTimestamps(Map timestamps, String server) { + private static final String TIMESTAMP_SYNC_URL = "/partition/checksum"; + + public static void syncChecksums(Map checksumMap, String server) { try { Map headers = new HashMap<>(128); @@ -48,7 +50,7 @@ public class NamingProxy { HttpClient.asyncHttpPutLarge("http://" + server + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL + "?source=" + NetUtils.localServer(), - headers, JSON.toJSONBytes(timestamps), + headers, JSON.toJSONBytes(checksumMap), new AsyncCompletionHandler() { @Override public Object onCompleted(Response response) throws Exception { @@ -90,6 +92,23 @@ public class NamingProxy { + result.code + " msg: " + result.content); } + public static byte[] getAllData(String server) throws Exception { + + Map params = new HashMap<>(8); + HttpClient.HttpResult result = HttpClient.httpGet("http://" + server + RunningConfig.getContextPath() + + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL, new ArrayList<>(), params); + + if (HttpURLConnection.HTTP_OK == result.code) { + return result.content.getBytes(); + } + + throw new IOException("failed to req API: " + "http://" + server + + RunningConfig.getContextPath() + + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL + ". code: " + + result.code + " msg: " + result.content); + } + + public static boolean syncData(byte[] data, String curServer) throws Exception { try { Map headers = new HashMap<>(128);