#502 Refactor warm up

This commit is contained in:
nkorange 2019-03-06 17:01:25 +08:00
parent d477bbce7c
commit 79f83f3a67
8 changed files with 161 additions and 137 deletions

View File

@ -77,24 +77,6 @@ public interface ConsistencyService {
*/
void unlisten(String key, RecordListener listener) throws NacosException;
/**
* Is the local server responsible for a data.
* <p>
* Any write operation to a data in a server not responsible for the data is refused.
*
* @param key key of data
* @return true if the local server is responsible for the data
*/
boolean isResponsible(String key);
/**
* Get the responsible server for a data
*
* @param key key of data
* @return responsible server for the data
*/
String getResponsibleServer(String key);
/**
* Tell the status of this consistency service
*

View File

@ -18,13 +18,12 @@ package com.alibaba.nacos.naming.consistency;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.pojo.Record;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Publish execution delegate
* Consistency delegate
*
* @author nkorange
* @since 1.0.0
@ -35,9 +34,6 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService {
@Autowired
private PersistentConsistencyService persistentConsistencyService;
@Autowired
private DistroMapper distroMapper;
@Autowired
private EphemeralConsistencyService ephemeralConsistencyService;
@ -58,6 +54,14 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService {
@Override
public void listen(String key, RecordListener listener) throws NacosException {
// this special key is listened by both:
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
persistentConsistencyService.listen(key, listener);
ephemeralConsistencyService.listen(key, listener);
return;
}
mapConsistencyService(key).listen(key, listener);
}
@ -66,16 +70,6 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService {
mapConsistencyService(key).unlisten(key, listener);
}
@Override
public boolean isResponsible(String key) {
return distroMapper.responsible(KeyBuilder.getServiceName(key));
}
@Override
public String getResponsibleServer(String key) {
return distroMapper.mapSrv(KeyBuilder.getServiceName(key));
}
@Override
public boolean isAvailable() {
return ephemeralConsistencyService.isAvailable() && persistentConsistencyService.isAvailable();

View File

@ -80,4 +80,8 @@ public class DataStore {
}
return count;
}
public Map<String, Datum> getDataMap() {
return dataMap;
}
}

View File

@ -15,7 +15,6 @@
*/
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import com.alibaba.nacos.common.util.IoUtils;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.servers.Server;
import com.alibaba.nacos.naming.cluster.servers.ServerChangeListener;
@ -30,18 +29,12 @@ import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.commons.lang3.CharEncoding.UTF_8;
/**
* Data replicator
*
@ -71,8 +64,6 @@ public class DataSyncer implements ServerChangeListener {
private List<Server> servers;
private boolean initialized = false;
@PostConstruct
public void init() {
serverListManager.listen(this);
@ -175,33 +166,6 @@ public class DataSyncer implements ServerChangeListener {
@Override
public void run() {
try {
File metaFile = new File(UtilsAndCommons.DATA_BASE_DIR + File.separator + "ephemeral.properties");
if (initialized) {
// write the current instance count to disk:
IoUtils.writeStringToFile(metaFile, "instanceCount=" + dataStore.getInstanceCount(), "UTF-8");
} else {
// check if most of the data are loaded:
List<String> lines = IoUtils.readLines(new InputStreamReader(new FileInputStream(metaFile), UTF_8));
if (lines == null || lines.isEmpty()) {
initialized = true;
} else {
int desiredInstanceCount = Integer.parseInt(lines.get(0).split("=")[1]);
if (desiredInstanceCount <= 0 ||
desiredInstanceCount * partitionConfig.getInitDataRatio() < dataStore.keys().size()) {
initialized = true;
}
}
}
} catch (IOException ioe) {
initialized = true;
Loggers.EPHEMERAL.error("operate on meta file failed.", ioe);
} catch (Exception e) {
Loggers.EPHEMERAL.error("operate on meta file failed.", e);
}
try {
// send local timestamps to other servers:
Map<String, String> keyChecksums = new HashMap<>(64);
@ -225,7 +189,7 @@ public class DataSyncer implements ServerChangeListener {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
NamingProxy.syncTimestamps(keyChecksums, member.getKey());
NamingProxy.syncChecksums(keyChecksums, member.getKey());
}
} catch (Exception e) {
Loggers.EPHEMERAL.error("timed sync task failed.", e);
@ -241,10 +205,6 @@ public class DataSyncer implements ServerChangeListener {
return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
}
public boolean isInitialized() {
return initialized;
}
@Override
public void onChangeServerList(List<Server> latestMembers) {

View File

@ -15,20 +15,28 @@
*/
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.core.utils.SystemUtils;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.ServerMode;
import com.alibaba.nacos.naming.cluster.servers.Server;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Record;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -48,7 +56,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @author nkorange
* @since 1.0.0
*/
@Service("partitionConsistencyService")
@org.springframework.stereotype.Service("partitionConsistencyService")
public class PartitionConsistencyServiceImpl implements EphemeralConsistencyService {
@Autowired
@ -66,8 +74,46 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
@Autowired
private Serializer serializer;
@Autowired
private ServerListManager serverListManager;
@Autowired
private SwitchDomain switchDomain;
private boolean initialized = false;
private volatile Map<String, List<RecordListener>> listeners = new ConcurrentHashMap<>();
@PostConstruct
public void init() throws Exception {
if (SystemUtils.STANDALONE_MODE) {
initialized = true;
return;
}
while (serverListManager.getHealthyServers().isEmpty()) {
Thread.sleep(1000L);
Loggers.EPHEMERAL.info("waiting server list init...");
}
for (Server server : serverListManager.getHealthyServers()) {
if (NetUtils.localServer().equals(server.getKey())) {
continue;
}
// try sync data from remote server:
if (syncAllDataFromRemote(server)) {
initialized = true;
break;
}
}
if (!initialized) {
// init failed, exit:
throw new RuntimeException("init local server failed! Abort.");
}
}
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
@ -122,12 +168,12 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
}
}
public void onReceiveTimestamps(Map<String, String> timestamps, String server) {
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
List<String> toUpdateKeys = new ArrayList<>();
List<String> toRemoveKeys = new ArrayList<>();
for (Map.Entry<String, String> entry : timestamps.entrySet()) {
if (isResponsible(entry.getKey())) {
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
// this key should not be sent from remote server:
Loggers.EPHEMERAL.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
// abort the procedure:
@ -146,7 +192,7 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
continue;
}
if (!timestamps.containsKey(key)) {
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
}
}
@ -163,31 +209,71 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
try {
byte[] result = NamingProxy.getData(toUpdateKeys, server);
if (result.length > 0) {
Map<String, Datum<Instances>> datumMap =
serializer.deserializeMap(result, Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
if (!listeners.containsKey(entry.getKey())) {
return;
}
for (RecordListener listener : listeners.get(entry.getKey())) {
try {
listener.onChange(entry.getKey(), entry.getValue().value);
} catch (Exception e) {
Loggers.EPHEMERAL.error("notify " + listener + ", key: " + entry.getKey() + " failed.", e);
}
}
}
}
processData(result);
} catch (Exception e) {
Loggers.EPHEMERAL.error("get data from " + server + " failed!", e);
}
}
public boolean syncAllDataFromRemote(Server server) {
try {
byte[] data = NamingProxy.getAllData(server.getKey());
processData(data);
return true;
} catch (Exception e) {
Loggers.EPHEMERAL.error("sync full data from " + server + " failed!");
return false;
}
}
public void processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String, Datum<Instances>> datumMap =
serializer.deserializeMap(data, Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (ServerMode.AP.name().equals(switchDomain.getServerMode())) {
// create empty service
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
if (!listeners.containsKey(entry.getKey())) {
Loggers.EPHEMERAL.warn("listener not found: {}", entry.getKey());
continue;
}
for (RecordListener listener : listeners.get(entry.getKey())) {
try {
listener.onChange(entry.getKey(), entry.getValue().value);
} catch (Exception e) {
Loggers.EPHEMERAL.error("notify " + listener + ", key: " + entry.getKey() + " failed.", e);
}
}
}
}
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
@ -209,18 +295,8 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
}
}
@Override
public boolean isResponsible(String key) {
return distroMapper.responsible(KeyBuilder.getServiceName(key));
}
@Override
public String getResponsibleServer(String key) {
return distroMapper.mapSrv(KeyBuilder.getServiceName(key));
}
@Override
public boolean isAvailable() {
return dataSyncer.isInitialized();
return initialized;
}
}

View File

@ -71,16 +71,6 @@ public class RaftConsistencyServiceImpl implements PersistentConsistencyService
raftCore.unlisten(key, listener);
}
@Override
public boolean isResponsible(String key) {
return false;
}
@Override
public String getResponsibleServer(String key) {
return null;
}
@Override
public boolean isAvailable() {
return raftCore.isInitialized();

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.naming.cluster.ServerMode;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.partition.DataStore;
import com.alibaba.nacos.naming.consistency.ephemeral.partition.PartitionConsistencyServiceImpl;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.ServiceManager;
@ -56,6 +57,9 @@ public class PartitionController {
@Autowired
private PartitionConsistencyServiceImpl consistencyService;
@Autowired
private DataStore dataStore;
@Autowired
private ServiceManager serviceManager;
@ -89,24 +93,14 @@ public class PartitionController {
return "ok";
}
@RequestMapping(value = "/timestamps", method = RequestMethod.PUT)
public String syncTimestamps(HttpServletRequest request, HttpServletResponse response) throws Exception {
@RequestMapping(value = "/checksum", method = RequestMethod.PUT)
public String syncChecksum(HttpServletRequest request, HttpServletResponse response) throws Exception {
String source = WebUtils.required(request, "source");
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
Map<String, String> dataMap =
serializer.deserialize(entity.getBytes(), new TypeReference<Map<String, String>>() {
});
for (String key : dataMap.keySet()) {
String namespaceId = KeyBuilder.getNamespace(key);
String serviceName = KeyBuilder.getServiceName(key);
if (!serviceManager.containService(namespaceId, serviceName)
&& ServerMode.AP.name().equals(switchDomain.getServerMode())) {
serviceManager.createEmptyService(namespaceId, serviceName);
}
}
consistencyService.onReceiveTimestamps(dataMap, source);
consistencyService.onReceiveChecksums(dataMap, source);
return "ok";
}
@ -120,4 +114,9 @@ public class PartitionController {
}
response.getWriter().write(new String(serializer.serialize(datumMap), "UTF-8"));
}
@RequestMapping(value = "/datums", method = RequestMethod.GET)
public void getAllDatums(HttpServletRequest request, HttpServletResponse response) throws Exception {
response.getWriter().write(new String(serializer.serialize(dataStore.getDataMap()), "UTF-8"));
}
}

View File

@ -35,9 +35,11 @@ public class NamingProxy {
private static final String DATA_GET_URL = "/partition/datum";
private static final String TIMESTAMP_SYNC_URL = "/partition/timestamps";
private static final String ALL_DATA_GET_URL = "/partition/datums";
public static void syncTimestamps(Map<String, String> timestamps, String server) {
private static final String TIMESTAMP_SYNC_URL = "/partition/checksum";
public static void syncChecksums(Map<String, String> checksumMap, String server) {
try {
Map<String, String> headers = new HashMap<>(128);
@ -48,7 +50,7 @@ public class NamingProxy {
HttpClient.asyncHttpPutLarge("http://" + server + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL + "?source=" + NetUtils.localServer(),
headers, JSON.toJSONBytes(timestamps),
headers, JSON.toJSONBytes(checksumMap),
new AsyncCompletionHandler() {
@Override
public Object onCompleted(Response response) throws Exception {
@ -90,6 +92,23 @@ public class NamingProxy {
+ result.code + " msg: " + result.content);
}
public static byte[] getAllData(String server) throws Exception {
Map<String, String> params = new HashMap<>(8);
HttpClient.HttpResult result = HttpClient.httpGet("http://" + server + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL, new ArrayList<>(), params);
if (HttpURLConnection.HTTP_OK == result.code) {
return result.content.getBytes();
}
throw new IOException("failed to req API: " + "http://" + server
+ RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL + ". code: "
+ result.code + " msg: " + result.content);
}
public static boolean syncData(byte[] data, String curServer) throws Exception {
try {
Map<String, String> headers = new HashMap<>(128);