#502 tuning Serializer

This commit is contained in:
nkorange 2019-01-26 23:19:57 +08:00
parent 8d0c5ad8fb
commit e3440fe7f0
30 changed files with 279 additions and 150 deletions

View File

@ -16,7 +16,7 @@
package com.alibaba.nacos.console.controller;
import com.alibaba.nacos.config.server.service.PersistService;
import com.alibaba.nacos.naming.web.ApiCommands;
import com.alibaba.nacos.naming.controllers.OperatorController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -38,10 +38,10 @@ public class HealthController {
private static final Logger logger = LoggerFactory.getLogger(HealthController.class);
private final PersistService persistService;
private final ApiCommands apiCommands;
private final OperatorController apiCommands;
@Autowired
public HealthController(PersistService persistService, ApiCommands apiCommands) {
public HealthController(PersistService persistService, OperatorController apiCommands) {
this.persistService = persistService;
this.apiCommands = apiCommands;
}
@ -98,7 +98,7 @@ public class HealthController {
private boolean isNamingReadiness(HttpServletRequest request) {
try {
apiCommands.hello(request);
apiCommands.metrics(request);
return true;
} catch (Exception e) {
logger.error("Naming health check fail.", e);

View File

@ -17,7 +17,7 @@ package com.alibaba.nacos.console.controller;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.config.server.service.PersistService;
import com.alibaba.nacos.naming.web.ApiCommands;
import com.alibaba.nacos.naming.controllers.OperatorController;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -54,7 +54,7 @@ public class HealthControllerTest {
private PersistService persistService;
@Mock
private ApiCommands apiCommands;
private OperatorController apiCommands;
private MockMvc mockmvc;
@ -75,14 +75,14 @@ public class HealthControllerTest {
String url = "/v1/console/health/readiness";
Mockito.when(persistService.configInfoCount(any(String.class))).thenReturn(0);
Mockito.when(apiCommands.hello(any(HttpServletRequest.class))).thenReturn(new JSONObject());
Mockito.when(apiCommands.metrics(any(HttpServletRequest.class))).thenReturn(new JSONObject());
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders.get(url);
Assert.assertEquals(200, mockmvc.perform(builder).andReturn().getResponse().getStatus());
// Config and Naming are not in readiness
Mockito.when(persistService.configInfoCount(any(String.class))).thenThrow(
new RuntimeException("HealthControllerTest.testReadiness"));
Mockito.when(apiCommands.hello(any(HttpServletRequest.class))).thenThrow(
Mockito.when(apiCommands.metrics(any(HttpServletRequest.class))).thenThrow(
new RuntimeException("HealthControllerTest.testReadiness"));
builder = MockMvcRequestBuilders.get(url);
MockHttpServletResponse response = mockmvc.perform(builder).andReturn().getResponse();
@ -92,18 +92,18 @@ public class HealthControllerTest {
// Config is not in readiness
Mockito.when(persistService.configInfoCount(any(String.class))).thenThrow(
new RuntimeException("HealthControllerTest.testReadiness"));
Mockito.when(apiCommands.hello(any(HttpServletRequest.class))).thenReturn(new JSONObject());
Mockito.when(apiCommands.metrics(any(HttpServletRequest.class))).thenReturn(new JSONObject());
response = mockmvc.perform(builder).andReturn().getResponse();
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals("Config is not in readiness", response.getContentAsString());
// Naming is not in readiness
Mockito.when(persistService.configInfoCount(any(String.class))).thenReturn(0);
Mockito.when(apiCommands.hello(any(HttpServletRequest.class))).thenThrow(
Mockito.when(apiCommands.metrics(any(HttpServletRequest.class))).thenThrow(
new RuntimeException("HealthControllerTest.testReadiness"));
builder = MockMvcRequestBuilders.get(url);
response = mockmvc.perform(builder).andReturn().getResponse();
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals("Naming is not in readiness", response.getContentAsString());
}
}
}

View File

@ -16,12 +16,12 @@
package com.alibaba.nacos.naming.cluster.transport;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.misc.Loggers;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
/**
@ -38,15 +38,6 @@ public class FastJsonSerializer implements Serializer {
return JSON.toJSONBytes(data);
}
@Override
public <T> byte[] serializeMap(Map<String, T> dataMap) {
JSONObject json = new JSONObject();
for (Map.Entry<String, T> entry : dataMap.entrySet()) {
json.put(entry.getKey(), entry.getValue());
}
return json.toJSONString().getBytes();
}
@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
try {
@ -57,15 +48,22 @@ public class FastJsonSerializer implements Serializer {
}
@Override
public <T> Map<String, T> deserializeMap(byte[] data, Class<T> clazz) {
public <T> T deserialize(byte[] data, TypeReference<T> clazz) {
try {
String dataString = new String(data, "UTF-8");
JSONObject json = JSON.parseObject(dataString);
Map<String, T> dataMap = new HashMap<>(16);
for (String key : json.keySet()) {
dataMap.put(key, JSON.parseObject(json.getString(key), clazz));
}
return dataMap;
return JSON.parseObject(dataString, clazz);
} catch (Exception e) {
Loggers.SRV_LOG.error("deserialize data failed.", e);
}
return null;
}
@Override
public <T> Map<String, Datum<T>> deserializeMap(byte[] data, Class<T> clazz) {
try {
String dataString = new String(data, "UTF-8");
return JSON.parseObject(dataString, new TypeReference<Map<String, Datum<T>>>() {
});
} catch (Exception e) {
Loggers.SRV_LOG.error("deserialize data failed.", e);
}

View File

@ -15,6 +15,9 @@
*/
package com.alibaba.nacos.naming.cluster.transport;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.naming.consistency.Datum;
import java.util.Map;
/**
@ -34,15 +37,6 @@ public interface Serializer {
*/
<T> byte[] serialize(T data);
/**
* Serialize map data with some kind of serializing protocol
*
* @param data data to serialize
* @param <T> type of data
* @return byte array of serialized data
*/
<T> byte[] serializeMap(Map<String, T> data);
/**
* Deserialize byte array data to target type
*
@ -54,12 +48,22 @@ public interface Serializer {
<T> T deserialize(byte[] data, Class<T> clazz);
/**
* Deserialize byte array data to target type
* Deserialize byte array data to target generic type
*
* @param data data to deserialize
* @param clazz target type
* @param <T> target type
* @return deserialized data
*/
<T> T deserialize(byte[] data, TypeReference<T> clazz);
/**
* Deserialize byte array data to target type
*
* @param <T> target type
* @param data data to deserialize
* @param clazz target type
* @return deserialized data map
*/
<T> Map<String, T> deserializeMap(byte[] data, Class<T> clazz);
<T> Map<String, Datum<T>> deserializeMap(byte[] data, Class<T> clazz);
}

View File

@ -30,13 +30,13 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class DataStore {
private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
private Map<String, Datum<?>> dataMap = new ConcurrentHashMap<>(1024);
public void put(String key, Datum value) {
public void put(String key, Datum<?> value) {
dataMap.put(key, value);
}
public Datum remove(String key) {
public Datum<?> remove(String key) {
return dataMap.remove(key);
}
@ -44,7 +44,7 @@ public class DataStore {
return dataMap.keySet();
}
public Datum get(String key) {
public Datum<?> get(String key) {
return dataMap.get(key);
}
@ -52,8 +52,8 @@ public class DataStore {
return dataMap.containsKey(key);
}
public Map<String, Datum> batchGet(List<String> keys) {
Map<String, Datum> map = new HashMap<>();
public Map<String, Datum<?>> batchGet(List<String> keys) {
Map<String, Datum<?>> map = new HashMap<>(128);
for (String key : keys) {
if (!dataMap.containsKey(key)) {
continue;

View File

@ -26,6 +26,7 @@ import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -87,8 +88,10 @@ public class DataSyncer implements MemberChangeListener {
}
List<String> keys = task.getKeys();
Map<String, Datum> datumMap = dataStore.batchGet(keys);
byte[] data = serializer.serializeMap(datumMap);
Map<String, Datum<?>> datumMap = dataStore.batchGet(keys);
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
boolean success = NamingProxy.syncData(data, task.getTargetServer());
@ -122,7 +125,7 @@ public class DataSyncer implements MemberChangeListener {
@Override
public void run() {
Map<String, Long> keyTimestamps = new HashMap<>();
Map<String, Long> keyTimestamps = new HashMap<>(64);
for (String key : dataStore.keys()) {
if (!distroMapper.responsible(key)) {
// this key is no longer in our hands:

View File

@ -16,13 +16,14 @@
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import org.springframework.beans.factory.annotation.Autowired;
@ -76,16 +77,16 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
}
@Override
public Datum get(String key) throws NacosException {
public Datum<?> get(String key) throws NacosException {
return dataStore.get(key);
}
public void onPut(String key, Object value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
List<Instance> instances = (List<Instance>) value;
Datum<List<Instance>> datum = new Datum<>();
datum.value = instances;
Instances instances = (Instances) value;
Datum<Map<String, Instance>> datum = new Datum<>();
datum.value = instances.getInstanceMap();
datum.key = key;
datum.timestamp.set(System.currentTimeMillis());
dataStore.put(key, datum);
@ -126,7 +127,7 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
for (Map.Entry<String, Long> entry : timestamps.entrySet()) {
if (isResponsible(entry.getKey())) {
// this key should not be sent from remote server:
Loggers.EPHEMERAL.error("receive timestamp of " + entry.getKey() + " from " + server);
Loggers.EPHEMERAL.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
continue;
}
if (!dataStore.contains(entry.getKey()) || dataStore.get(entry.getKey()).timestamp.get() < entry.getValue()) {
@ -149,9 +150,22 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
try {
byte[] result = NamingProxy.getData(toUpdateKeys, server);
if (result.length > 0) {
Map<String, Datum> datumMap = serializer.deserializeMap(result, Datum.class);
for (Map.Entry<String, Datum> entry : datumMap.entrySet()) {
Map<String, Datum<Instances>> datumMap =
serializer.deserializeMap(result, Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
if (!listeners.containsKey(entry.getKey())) {
return;
}
for (DataListener listener : listeners.get(entry.getKey())) {
try {
listener.onChange(entry.getKey(), entry.getValue());
} catch (Exception e) {
Loggers.EPHEMERAL.error("notify " + listener + ", key: " + entry.getKey() + " failed.", e);
}
}
}
}
} catch (Exception e) {

View File

@ -847,6 +847,19 @@ public class RaftCore {
notifier.addTask(datum, ApplyAction.CHANGE);
}
public void loadDatum(String key) {
try {
Datum datum = raftStore.load(key);
if (datum == null) {
return;
}
datums.put(key, datum);
} catch (Exception e) {
Loggers.RAFT.error("load datum failed: " + key, e);
}
}
private void deleteDatum(String key) {
Datum deleted = null;

View File

@ -20,6 +20,7 @@ import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
@ -32,9 +33,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static com.alibaba.nacos.common.util.SystemUtils.NACOS_HOME;
@ -101,7 +100,7 @@ public class RaftStore {
return meta;
}
public synchronized static Datum load(String key) throws Exception {
public synchronized Datum load(String key) throws Exception {
long start = System.currentTimeMillis();
// load data
for (File cache : listCaches()) {
@ -109,7 +108,7 @@ public class RaftStore {
Loggers.RAFT.warn("warning: encountered directory in cache dir: {}", cache.getAbsolutePath());
}
if (!StringUtils.equals(decodeFileName(cache.getName()), key)) {
if (!StringUtils.equals(cache.getName(), encodeFileName(key))) {
continue;
}
@ -136,13 +135,25 @@ public class RaftStore {
}
if (KeyBuilder.matchServiceMetaKey(file.getName())) {
return JSON.parseObject(json, new TypeReference<Datum<Service>>() {
return JSON.parseObject(json.replace("\\", ""), new TypeReference<Datum<Service>>() {
});
}
if (KeyBuilder.matchInstanceListKey(file.getName())) {
return JSON.parseObject(json, new TypeReference<Datum<List<Instance>>>() {
Datum<List<Instance>> datum = JSON.parseObject(json, new TypeReference<Datum<List<Instance>>>() {
});
Map<String, Instance> instanceMap = new HashMap<>(64);
if (datum.value == null || datum.value.isEmpty()) {
return datum;
}
for (Instance instance : datum.value) {
instanceMap.put(instance.getDatumKey(), instance);
}
Datum<Map<String, Instance>> mapDatum = new Datum<>();
mapDatum.value = instanceMap;
mapDatum.key = datum.key;
mapDatum.timestamp.set(datum.timestamp.get());
return mapDatum;
}
return JSON.parseObject(json, Datum.class);
@ -162,7 +173,6 @@ public class RaftStore {
String namespaceId = KeyBuilder.getNamespace(datum.key);
File cacheFile;
File oldCacheFile = null;
if (StringUtils.isNotBlank(namespaceId)) {
cacheFile = new File(CACHE_DIR + File.separator + namespaceId + File.separator + encodeFileName(datum.key));
@ -177,7 +187,17 @@ public class RaftStore {
}
FileChannel fc = null;
ByteBuffer data = ByteBuffer.wrap(JSON.toJSONString(datum).getBytes("UTF-8"));
ByteBuffer data;
if (KeyBuilder.matchInstanceListKey(datum.key)) {
Datum<Collection<Instance>> listDatum = new Datum<>();
listDatum.key = datum.key;
listDatum.value = ((Instances) datum.value).getInstanceMap().values();
listDatum.timestamp.set(datum.timestamp.get());
data = ByteBuffer.wrap(JSON.toJSONString(listDatum).getBytes("UTF-8"));
} else {
data = ByteBuffer.wrap(JSON.toJSONString(datum).getBytes("UTF-8"));
}
try {
fc = new FileOutputStream(cacheFile, false).getChannel();
@ -191,11 +211,6 @@ public class RaftStore {
fc.close();
}
}
if (oldCacheFile != null) {
oldCacheFile.delete();
}
}
private static File[] listCaches() throws Exception {

View File

@ -29,8 +29,8 @@ import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.ClusterInfo;
import com.alibaba.nacos.naming.pojo.IpAddressInfo;
import com.alibaba.nacos.naming.pojo.ServiceDetailInfo;
import com.alibaba.nacos.naming.view.ServiceDetailView;
import com.alibaba.nacos.naming.view.ServiceView;
import com.alibaba.nacos.naming.pojo.ServiceDetailView;
import com.alibaba.nacos.naming.pojo.ServiceView;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.lang3.StringUtils;

View File

@ -357,7 +357,7 @@ public class InstanceController {
String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
JSONObject result = new JSONObject();
Service domObj = (Service) serviceManager.getService(namespaceId, dom);
Service domObj = serviceManager.getService(namespaceId, dom);
if (domObj == null) {
throw new NacosException(NacosException.NOT_FOUND, "dom not found: " + dom);
@ -386,7 +386,7 @@ public class InstanceController {
List<Instance> srvedIPs;
srvedIPs = domObj.srvIPs(clientIP, Arrays.asList(StringUtils.split(clusters, ",")));
srvedIPs = domObj.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
// filter ips using selector:
if (domObj.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
@ -394,14 +394,26 @@ public class InstanceController {
}
if (CollectionUtils.isEmpty(srvedIPs)) {
String msg = "no ip to serve for dom: " + dom;
Loggers.SRV_LOG.debug(msg);
if (Loggers.DEBUG_LOG.isDebugEnabled()) {
Loggers.DEBUG_LOG.debug("no instance to serve for service: " + dom);
}
result.put("hosts", new JSONArray());
result.put("dom", dom);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", domObj.getChecksum() + System.currentTimeMillis());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.put("metadata", domObj.getMetadata());
return result;
}
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<Instance>());
ipMap.put(Boolean.FALSE, new ArrayList<Instance>());
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
for (Instance ip : srvedIPs) {
ipMap.get(ip.isValid()).add(ip);

View File

@ -15,6 +15,8 @@
*/
package com.alibaba.nacos.naming.controllers;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.common.util.IoUtils;
import com.alibaba.nacos.core.utils.WebUtils;
@ -22,6 +24,7 @@ import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.partition.PartitionConsistencyServiceImpl;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
@ -52,11 +55,12 @@ public class PartitionController {
@RequestMapping("/onSync")
public String onSync(HttpServletRequest request, HttpServletResponse response) throws Exception {
byte[] data = IoUtils.tryDecompress(request.getInputStream());
Map<String, Object> dataMap = serializer.deserializeMap(data, Object.class);
for (String key : dataMap.keySet()) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
List<Instance> list = (List<Instance>) dataMap.get(key);
consistencyService.onPut(key, list);
Map<String, Datum<Instances>> dataMap =
serializer.deserializeMap(data, Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
consistencyService.onPut(entry.getKey(), entry.getValue().value);
}
}
return "ok";
@ -66,7 +70,7 @@ public class PartitionController {
public String syncTimestamps(HttpServletRequest request, HttpServletResponse response) throws Exception {
String source = WebUtils.required(request, "source");
byte[] data = IoUtils.tryDecompress(request.getInputStream());
Map<String, Long> dataMap = serializer.deserializeMap(data, Long.class);
Map<String, Long> dataMap = serializer.deserialize(data, new TypeReference<Map<String, Long>>(){});
consistencyService.onReceiveTimestamps(dataMap, source);
return "ok";
}
@ -74,10 +78,11 @@ public class PartitionController {
@RequestMapping("/get")
public void get(HttpServletRequest request, HttpServletResponse response) throws Exception {
String keys = WebUtils.required(request, "keys");
Map<String, Datum> datumMap = new HashMap<>();
for (String key : keys.split(",")) {
datumMap.put(key, (Datum) consistencyService.get(key));
String keySplitter = ",";
Map<String, Datum<?>> datumMap = new HashMap<>(64);
for (String key : keys.split(keySplitter)) {
datumMap.put(key, consistencyService.get(key));
}
response.getWriter().write(new String(serializer.serializeMap(datumMap), "UTF-8"));
response.getWriter().write(new String(serializer.serialize(datumMap), "UTF-8"));
}
}

View File

@ -18,12 +18,17 @@ package com.alibaba.nacos.naming.controllers;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.common.util.IoUtils;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.persistent.raft.*;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.exception.NacosException;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.web.NeedAuth;
@ -75,8 +80,8 @@ public class RaftController {
public JSONObject beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
String entity = new String(IoUtils.tryDecompress(request.getInputStream()), "UTF-8");
String value = Arrays.asList(entity).toArray(new String[1])[0];
// String value = Arrays.asList(entity).toArray(new String[1])[0];
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
JSONObject json = JSON.parseObject(value);
@ -111,7 +116,7 @@ public class RaftController {
@RequestMapping("/reloadDatum")
public String reloadDatum(HttpServletRequest request, HttpServletResponse response) throws Exception {
String key = WebUtils.required(request, "key");
RaftStore.load(key);
raftCore.loadDatum(key);
return "ok";
}
@ -124,14 +129,23 @@ public class RaftController {
response.setHeader("Content-Encode", "gzip");
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
String value = Arrays.asList(entity).toArray(new String[1])[0];
// String value = Arrays.asList(entity).toArray(new String[1])[0];
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
JSONObject json = JSON.parseObject(value);
raftConsistencyService.put(json.getString("key"), json.getString("value"), String.class);
String key = json.getString("key");
if (KeyBuilder.matchInstanceListKey(key)) {
raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), new TypeReference<Map<String, Instance>>(){}));
return "ok";
}
return "ok";
if (KeyBuilder.matchServiceMetaKey(key)) {
raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), new TypeReference<Service>(){}));
return "ok";
}
throw new NacosException(NacosException.INVALID_PARAM, "unknown type publish key: " + key);
}
@NeedAuth
@ -188,9 +202,8 @@ public class RaftController {
response.setHeader("Content-Encode", "gzip");
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
String value = Arrays.asList(entity).toArray(new String[1])[0];
value = URLDecoder.decode(value, "UTF-8");
// String value = Arrays.asList(entity).toArray(new String[1])[0];
String value = URLDecoder.decode(entity, "UTF-8");
JSONObject jsonObject = JSON.parseObject(value);
Datum datum = JSON.parseObject(jsonObject.getString("datum"), Datum.class);
@ -209,7 +222,8 @@ public class RaftController {
response.setHeader("Content-Encode", "gzip");
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
String value = Arrays.asList(entity).toArray(new String[1])[0];
// String value = Arrays.asList(entity).toArray(new String[1])[0];
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
JSONObject jsonObject = JSON.parseObject(value);

View File

@ -131,10 +131,13 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
return cluster;
}
public void updateIPs(List<Instance> ips) {
HashMap<String, Instance> oldIPMap = new HashMap<>(persistentInstances.size());
public void updateIPs(List<Instance> ips, boolean ephemeral) {
for (Instance ip : this.persistentInstances) {
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIPMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIPMap.put(ip.getDatumKey(), ip);
}
@ -184,14 +187,13 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
}
}
this.persistentInstances = new HashSet<>(ips);
toUpdateInstances = new HashSet<>(ips);
ipContains.clear();
for (Instance instance : persistentInstances) {
ipContains.put(instance.toIPAddr(), true);
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
public List<Instance> updatedIPs(Collection<Instance> a, Collection<Instance> b) {
@ -343,7 +345,7 @@ public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implement
}
public boolean contains(Instance ip) {
return ipContains.containsKey(ip.toIPAddr());
return persistentInstances.contains(ip) || ephemeralInstances.contains(ip);
}
public void validate() {

View File

@ -0,0 +1,41 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core;
import com.alibaba.fastjson.JSON;
import java.util.Map;
/**
* @author nkorange
*/
public class Instances {
private Map<String, Instance> instanceMap;
public Map<String, Instance> getInstanceMap() {
return instanceMap;
}
public void setInstanceMap(Map<String, Instance> instanceMap) {
this.instanceMap = instanceMap;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -49,7 +49,7 @@ import java.util.*;
*
* @author nkorange
*/
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements DataListener<List<Instance>> {
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements DataListener<Instances> {
private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z\\.:_-]+";
@ -142,11 +142,11 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
}
@Override
public void onChange(String key, List<Instance> value) throws Exception {
public void onChange(String key, Instances value) throws Exception {
Loggers.RAFT.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance ip : value) {
for (Instance ip : value.getInstanceMap().values()) {
if (ip.getWeight() > 10000.0D) {
ip.setWeight(10000.0D);
@ -157,7 +157,7 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
}
}
updateIPs(value);
updateIPs(value.getInstanceMap().values(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
@ -167,12 +167,11 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
// ignore
}
public void updateIPs(List<Instance> ips) {
public void updateIPs(Collection<Instance> ips, boolean ephemeral) {
if (CollectionUtils.isEmpty(ips) && allIPs().size() > 1) {
return;
}
Map<String, List<Instance>> ipMap = new HashMap<String, List<Instance>>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<Instance>());
@ -191,13 +190,13 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
// put wild ip into DEFAULT cluster
if (!clusterMap.containsKey(ip.getClusterName())) {
Loggers.SRV_LOG.warn("cluster of IP not found: {}", ip.toJSON());
Loggers.SRV_LOG.warn("cluster of IP not found: {}", ip);
continue;
}
List<Instance> clusterIPs = ipMap.get(ip.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<Instance>();
clusterIPs = new LinkedList<>();
ipMap.put(ip.getClusterName(), clusterIPs);
}
@ -210,8 +209,9 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
clusterMap.get(entry.getKey()).updateIPs(entryIPs);
clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
getPushService().domChanged(namespaceId, getName());
StringBuilder stringBuilder = new StringBuilder();
@ -272,13 +272,7 @@ public class Service extends com.alibaba.nacos.api.naming.pojo.Service implement
return allIPs;
}
public List<Instance> srvIPs(String clientIP) {
return srvIPs(clientIP, Collections.EMPTY_LIST);
}
public List<Instance> srvIPs(String clientIP, List<String> clusters) {
List<Instance> ips;
public List<Instance> srvIPs(List<String> clusters) {
if (CollectionUtils.isEmpty(clusters)) {
clusters = new ArrayList<>();
clusters.addAll(clusterMap.keySet());

View File

@ -352,7 +352,6 @@ public class ServiceManager implements DataListener<Service> {
}
public void addOrReplaceService(Service service) throws Exception {
// TODO use Service to put:
consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);
}
@ -431,9 +430,10 @@ public class ServiceManager implements DataListener<Service> {
Map<String, Instance> instanceMap = addIpAddresses(service, ephemeral, ips);
// String value = JSON.toJSONString(ipAddressMap.values());
Instances instances = new Instances();
instances.setInstanceMap(instanceMap);
consistencyService.put(key, instanceMap);
consistencyService.put(key, instances);
}
public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
@ -446,9 +446,10 @@ public class ServiceManager implements DataListener<Service> {
Map<String, Instance> instanceMap = substractIpAddresses(dom, ephemeral, ips);
// String value = JSON.toJSONString(ipAddressMap.values());
Instances instances = new Instances();
instances.setInstanceMap(instanceMap);
consistencyService.put(key, instanceMap);
consistencyService.put(key, instances);
}
public Instance getInstance(String namespaceId, String serviceName, String cluster, String ip, int port) {
@ -478,10 +479,10 @@ public class ServiceManager implements DataListener<Service> {
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(dom.getNamespaceId(), dom.getName(), ephemeral));
Map<String, Instance> oldInstances = new HashMap<>();
Map<String, Instance> oldInstances = new HashMap<>(16);
if (datum != null) {
oldInstances = (Map<String, Instance>) datum.value;
oldInstances = ((Instances) datum.value).getInstanceMap();
}
Map<String, Instance> instances;

View File

@ -71,7 +71,7 @@ public class ClientBeatProcessor implements Runnable {
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs();
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {

View File

@ -85,12 +85,12 @@ public class HttpHealthCheckProcessor implements HealthCheckProcessor {
@Override
public void process(HealthCheckTask task) {
List<Instance> ips = task.getCluster().allIPs();
List<Instance> ips = task.getCluster().allIPs(false);
if (CollectionUtils.isEmpty(ips)) {
return;
}
Service service = (Service) task.getCluster().getDom();
Service service = task.getCluster().getDom();
if (!switchDomain.isHealthCheckEnabled() || !service.getHealthCheckMode().equals(HealthCheckMode.server.name())) {
return;

View File

@ -89,14 +89,14 @@ public class MysqlHealthCheckProcessor implements HealthCheckProcessor {
@Override
public void process(HealthCheckTask task) {
List<Instance> ips = task.getCluster().allIPs();
List<Instance> ips = task.getCluster().allIPs(false);
SRV_LOG.debug("mysql check, ips:" + ips);
if (CollectionUtils.isEmpty(ips)) {
return;
}
Service service = (Service) task.getCluster().getDom();
Service service = task.getCluster().getDom();
if (!healthCheckCommon.isHealthCheckEnabled(service)) {
return;

View File

@ -106,12 +106,12 @@ public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {
@Override
public void process(HealthCheckTask task) {
List<Instance> ips = task.getCluster().allIPs();
List<Instance> ips = task.getCluster().allIPs(false);
if (CollectionUtils.isEmpty(ips)) {
return;
}
Service service = (Service) task.getCluster().getDom();
Service service = task.getCluster().getDom();
if (!healthCheckCommon.isHealthCheckEnabled(service)) {
return;

View File

@ -73,10 +73,10 @@ public class NamingProxy {
public static byte[] getData(List<String> keys, String server) throws Exception {
Map<String, String> params = new HashMap<>();
Map<String, String> params = new HashMap<>(8);
params.put("keys", StringUtils.join(keys, ","));
HttpClient.HttpResult result = HttpClient.httpGet("http://" + server + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL, new ArrayList<String>(), params);
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL, new ArrayList<>(), params);
if (HttpURLConnection.HTTP_OK == result.code) {
return result.content.getBytes();

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.view;
package com.alibaba.nacos.naming.pojo;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.naming.pojo.Cluster;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.view;
package com.alibaba.nacos.naming.pojo;
import com.alibaba.fastjson.JSON;

View File

@ -219,9 +219,12 @@ public class PushService {
}
public void domChanged(final String namespaceId, final String dom) {
// merge some change events to reduce the push frequency:
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, dom))) {
return;
}
Future future = udpSender.schedule(new Runnable() {
@Override
public void run() {

View File

@ -15,7 +15,6 @@
*/
package com.alibaba.nacos.naming.web;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;

View File

@ -83,7 +83,7 @@ public class InstanceControllerTest extends BaseTest {
instance.setPort(9999);
List<Instance> ipList = new ArrayList<Instance>();
ipList.add(instance);
domain.updateIPs(ipList);
domain.updateIPs(ipList, false);
Mockito.when(domainsManager.getService(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1")).thenReturn(domain);
@ -132,7 +132,7 @@ public class InstanceControllerTest extends BaseTest {
instance.setWeight(2.0);
List<Instance> ipList = new ArrayList<Instance>();
ipList.add(instance);
domain.updateIPs(ipList);
domain.updateIPs(ipList, false);
Mockito.when(domainsManager.getService(UtilsAndCommons.getDefaultNamespaceId(), "nacos.test.1")).thenReturn(domain);

View File

@ -87,7 +87,7 @@ public class ClusterTest {
list.add(instance1);
list.add(instance2);
cluster.updateIPs(list);
cluster.updateIPs(list, false);
List<Instance> ips = cluster.allIPs();
Assert.assertNotNull(ips);

View File

@ -23,6 +23,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -83,7 +84,17 @@ public class DomainTest {
List<Instance> list = new ArrayList<Instance>();
list.add(instance);
domain.onChange("iplist", JSON.toJSONString(list));
Instances instances = new Instances();
Map<String, Instance> instanceMap = new HashMap<>();
for (Instance instance1 : list) {
instanceMap.put(instance1.getDatumKey(), instance1);
}
instances.setInstanceMap(instanceMap);
domain.onChange("iplist", instances);
List<Instance> ips = domain.allIPs();

View File

@ -39,7 +39,7 @@ public class RaftStoreTest {
RaftStore.write(datum);
RaftStore.load("1.2.3.4");
raftCore.loadDatum("1.2.3.4");
Datum result = raftCore.getDatum("1.2.3.4");