#502 Partition consistency start coding

This commit is contained in:
nkorange 2019-01-20 20:55:43 +08:00
parent f2b43fb240
commit 330fc4e6dc
14 changed files with 513 additions and 36 deletions

View File

@ -4,7 +4,7 @@ import com.alibaba.nacos.common.util.SystemUtils;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.cluster.members.Member;
import com.alibaba.nacos.naming.cluster.members.MemberChangeListener;
import com.alibaba.nacos.naming.consistency.persistent.raft.GlobalExecutor;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;

View File

@ -0,0 +1,15 @@
package com.alibaba.nacos.naming.consistency;
/**
* @author nkorange
*/
public enum ApplyAction {
/**
* Data changed
*/
CHANGE,
/**
* Data deleted
*/
DELETE
}

View File

@ -0,0 +1,41 @@
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import com.alibaba.nacos.api.naming.pojo.Instance;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author nkorange
*/
@Component
public class DataStore {
private Map<String, List<Instance>> dataMap = new ConcurrentHashMap<>();
public void put(String key, List<Instance> value) {
dataMap.put(key, value);
}
public void remove(String key) {
dataMap.remove(key);
}
public List<Instance> get(String key) {
return dataMap.get(key);
}
public Map<String, List<Instance>> batchGet(List<String> keys) {
Map<String, List<Instance>> map = new HashMap<>();
for (String key : keys) {
if (!dataMap.containsKey(key)) {
continue;
}
map.put(key, dataMap.get(key));
}
return map;
}
}

View File

@ -0,0 +1,98 @@
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.members.Member;
import com.alibaba.nacos.naming.cluster.members.MemberChangeListener;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* @author nkorange
* @since 1.0.0
*/
@Component
public class DataSyncer implements MemberChangeListener {
@Autowired
private DataStore dataStore;
@Autowired
private Serializer serializer;
@Autowired
private ServerListManager serverListManager;
private List<Member> servers;
public DataSyncer() {
serverListManager.listen(this);
}
public void submit(SyncTask task) {
GlobalExecutor.submitDataSync(new Runnable() {
@Override
public void run() {
try {
if (servers == null || servers.isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
List<String> keys = task.getKeys();
Map<String, List<Instance>> instancMap = dataStore.batchGet(keys);
byte[] data = serializer.serialize(instancMap);
if (StringUtils.isBlank(task.getTargetServer())) {
for (Member server : servers) {
long timestamp = System.currentTimeMillis();
boolean success = NamingProxy.syncData(data, server.getKey());
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(server.getKey());
submit(syncTask);
}
}
} else {
long timestamp = System.currentTimeMillis();
boolean success = NamingProxy.syncData(data, task.getTargetServer());
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
submit(syncTask);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("sync data failed.", e);
}
}
});
}
@Override
public void onChangeMemberList(List<Member> latestMembers) {
}
@Override
public void onChangeReachableMemberList(List<Member> latestReachableMembers) {
servers = latestReachableMembers;
}
}

View File

@ -0,0 +1,41 @@
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.naming.misc.Loggers;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author nkorange
*/
@Component
public class FastJsonSerializer implements Serializer {
@Override
public <T> byte[] serialize(Map<String, T> dataMap) {
JSONObject json = new JSONObject();
for (Map.Entry<String, T> entry : dataMap.entrySet()) {
json.put(entry.getKey(), entry.getValue());
}
return json.toJSONString().getBytes();
}
@Override
public <T> Map<String, T> deserialize(byte[] data, Class<T> clazz) {
try {
String dataString = new String(data, "UTF-8");
JSONObject json = JSON.parseObject(dataString);
Map<String, T> dataMap = new HashMap<>();
for (String key : json.keySet()) {
dataMap.put(key, JSON.parseObject(json.getString(key), clazz));
}
return dataMap;
} catch (Exception e) {
Loggers.SRV_LOG.error("deserialize data failed.", e);
}
return null;
}
}

View File

@ -0,0 +1,33 @@
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import org.springframework.beans.factory.annotation.Value;
/**
* Stores some configurations for Partition protocol
*
* @author nkorange
* @since 1.0.0
*/
public class PartitionConfig {
@Value("taskDispatchThreadCount")
private int taskDispatchThreadCount = 10;
@Value("taskDispatchPeriod")
private int taskDispatchPeriod = 2000;
@Value("batchSyncKeyCount")
private int batchSyncKeyCount = 1000;
public int getTaskDispatchThreadCount() {
return taskDispatchThreadCount;
}
public int getTaskDispatchPeriod() {
return taskDispatchPeriod;
}
public int getBatchSyncKeyCount() {
return batchSyncKeyCount;
}
}

View File

@ -0,0 +1,13 @@
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import java.util.Map;
/**
* @author nkorange
*/
public interface Serializer {
<T> byte[] serialize(Map<String, T> data);
<T> Map<String, T> deserialize(byte[] data, Class<T> clazz);
}

View File

@ -0,0 +1,50 @@
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import java.util.List;
/**
* @author nkorange
* @since 1.0.0
*/
public class SyncTask {
private List<String> keys;
private int retryCount;
private long lastExecuteTime;
private String targetServer;
public List<String> getKeys() {
return keys;
}
public void setKeys(List<String> keys) {
this.keys = keys;
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
public long getLastExecuteTime() {
return lastExecuteTime;
}
public void setLastExecuteTime(long lastExecuteTime) {
this.lastExecuteTime = lastExecuteTime;
}
public String getTargetServer() {
return targetServer;
}
public void setTargetServer(String targetServer) {
this.targetServer = targetServer;
}
}

View File

@ -0,0 +1,100 @@
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author nkorange
* @since 1.0.0
*/
@Component
public class TaskDispatcher {
private List<BlockingQueue<String>> taskList = new ArrayList<>();
private Map<String, String> taskMap = new ConcurrentHashMap<>();
@Autowired
private PartitionConfig partitionConfig;
@Autowired
private DataSyncer dataSyncer;
@PostConstruct
public void init() {
for (int i = 0; i < partitionConfig.getTaskDispatchThreadCount(); i++) {
taskList.add(new LinkedBlockingQueue<>(128 * 1024));
GlobalExecutor.submit(new TaskScheduler(i));
}
}
public int mapTask(String key) {
// TODO map a task key to a particular task queue:
return 0;
}
public void addTask(String key) {
if (taskMap.containsKey(key)) {
return;
}
taskMap.put(key, StringUtils.EMPTY);
taskList.get(mapTask(key)).add(key);
}
public class TaskScheduler implements Runnable {
private int index;
private int dataSize = 0;
private long lastExecuteTime = 0L;
public TaskScheduler(int index) {
this.index = index;
}
public int getIndex() {
return index;
}
@Override
public void run() {
while (true) {
List<String> keys = new ArrayList<>();
try {
String key = taskList.get(index).take();
if (dataSize == 0) {
keys = new ArrayList<>();
}
if (dataSize < partitionConfig.getBatchSyncKeyCount()) {
keys.add(key);
dataSize ++;
}
// TODO estimate lastExecuteTime
if (dataSize == partitionConfig.getBatchSyncKeyCount()) {
}
} catch (Exception e) {
}
}
}
}
}

View File

@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.misc.*;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
@ -959,15 +960,4 @@ public class RaftCore {
}
}
}
public enum ApplyAction {
/**
* Data changed
*/
CHANGE,
/**
* Data deleted
*/
DELETE
}
}

View File

@ -15,6 +15,7 @@
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.raft;
package com.alibaba.nacos.naming.misc;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -37,16 +37,51 @@ public class GlobalExecutor {
private static ScheduledExecutorService executorService =
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.raft.timer");
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.raft.timer");
return t;
}
});
return t;
}
});
private static ScheduledExecutorService taskDispatchExecutor =
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.partition.task.dispatcher");
return t;
}
});
private static ScheduledExecutorService dataSyncExecutor =
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.partition.data.syncer");
return t;
}
});
public static void submitPartitionTaskExecute(Runnable runnable) {
taskDispatchExecutor.submit(runnable);
}
public static void submitDataSync(Runnable runnable) {
dataSyncExecutor.submit(runnable);
}
public static void registerMasterElection(Runnable runnable) {
executorService.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);

View File

@ -27,6 +27,8 @@ import org.apache.http.*;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
@ -278,6 +280,33 @@ public class HttpClient {
}
}
public static HttpResult httpPutLarge(String url, Map<String, String> headers, byte[] content) {
try {
HttpClientBuilder builder = HttpClients.custom();
builder.setUserAgent(UtilsAndCommons.SERVER_VERSION);
builder.setConnectionTimeToLive(500, TimeUnit.MILLISECONDS);
CloseableHttpClient httpClient = builder.build();
HttpPut httpPut = new HttpPut(url);
for (Map.Entry<String, String> entry : headers.entrySet()) {
httpPut.setHeader(entry.getKey(), entry.getValue());
}
httpPut.setEntity(new ByteArrayEntity(content));
HttpResponse response = httpClient.execute(httpPut);
HttpEntity entity = response.getEntity();
HeaderElement[] headerElements = entity.getContentType().getElements();
String charset = headerElements[0].getParameterByName("charset").getValue();
return new HttpResult(response.getStatusLine().getStatusCode(),
IOUtils.toString(entity.getContent(), charset), Collections.<String, String>emptyMap());
} catch (Exception e) {
return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());
}
}
public static HttpResult httpPostLarge(String url, Map<String, String> headers, String content) {
try {
HttpClientBuilder builder = HttpClients.custom();

View File

@ -15,33 +15,64 @@
*/
package com.alibaba.nacos.naming.misc;
import com.alibaba.nacos.common.util.SystemUtils;
import com.alibaba.nacos.naming.boot.RunningConfig;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.common.util.SystemUtils.*;
/**
* @author nacos
*/
public class NamingProxy {
private static final String DATA_SYNC_URL = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/partition/sync";
public static boolean syncData(byte[] data, String curServer) throws Exception {
try {
Map<String, String> headers = new HashMap<>();
headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION);
headers.put("Accept-Encoding", "gzip,deflate,sdch");
headers.put("Connection", "Keep-Alive");
headers.put("Content-Encoding", "gzip");
if (!curServer.contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
curServer = curServer + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort();
}
HttpClient.HttpResult result = HttpClient.httpPutLarge("http://" + curServer + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_SYNC_URL, headers, data);
if (HttpURLConnection.HTTP_OK == result.code) {
return true;
}
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
return true;
}
throw new IOException("failed to req API:" + "http://" + curServer
+ RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_SYNC_URL + ". code:"
+ result.code + " msg: " + result.content);
} catch (Exception e) {
Loggers.SRV_LOG.warn("NamingProxy", e);
}
return false;
}
public static String reqAPI(String api, Map<String, String> params, String curServer, boolean isPost) throws Exception {
try {
List<String> headers = Arrays.asList("Client-Version", UtilsAndCommons.SERVER_VERSION,
"Accept-Encoding", "gzip,deflate,sdch",
"Connection", "Keep-Alive",
"Content-Encoding", "gzip");
"Accept-Encoding", "gzip,deflate,sdch",
"Connection", "Keep-Alive",
"Content-Encoding", "gzip");
HttpClient.HttpResult result;
@ -52,10 +83,10 @@ public class NamingProxy {
if (isPost) {
result = HttpClient.httpPost("http://" + curServer + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api, headers, params);
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api, headers, params);
} else {
result = HttpClient.httpGet("http://" + curServer + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api, headers, params);
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api, headers, params);
}
if (HttpURLConnection.HTTP_OK == result.code) {
@ -67,9 +98,9 @@ public class NamingProxy {
}
throw new IOException("failed to req API:" + "http://" + curServer
+ RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api + ". code:"
+ result.code + " msg: " + result.content);
+ RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/" + api + ". code:"
+ result.code + " msg: " + result.content);
} catch (Exception e) {
Loggers.SRV_LOG.warn("NamingProxy", e);
}