Separate locked publish from unlocked publish

This commit is contained in:
nkorange 2018-11-07 11:28:46 +08:00
parent c348398d45
commit d6f4834688
4 changed files with 77 additions and 6 deletions

View File

@ -285,7 +285,7 @@ public class DomainsManager {
virtualClusterDomain = (VirtualClusterDomain) newDom;
newDom = virtualClusterDomain;
}
RaftCore.signalPublish(UtilsAndCommons.getDomStoreKey(newDom), JSON.toJSONString(newDom));
RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(newDom), JSON.toJSONString(newDom));
}
public void easyReplaceIP4Dom(String domName, String clusterName, List<IpAddress> ips) throws Exception {
@ -367,7 +367,7 @@ public class DomainsManager {
}
if (timestamp == -1) {
RaftCore.signalPublish(UtilsAndCommons.getIPListStoreKey(dom),
RaftCore.doSignalPublish(UtilsAndCommons.getIPListStoreKey(dom),
JSON.toJSONString(ipAddressMap.values()));
} else {
String key = UtilsAndCommons.getIPListStoreKey(dom);
@ -452,7 +452,7 @@ public class DomainsManager {
ipAddrs.removeAll(ips);
RaftCore.signalPublish(UtilsAndCommons.getIPListStoreKey(dom), JSON.toJSONString(ipAddrs));
RaftCore.doSignalPublish(UtilsAndCommons.getIPListStoreKey(dom), JSON.toJSONString(ipAddrs));
} finally {
lock.unlock();
}

View File

@ -135,7 +135,7 @@ public class Switch {
public static void save() {
try {
RaftCore.signalPublish(UtilsAndCommons.getDomStoreKey(dom), JSON.toJSONString(dom));
RaftCore.doSignalPublish(UtilsAndCommons.getDomStoreKey(dom), JSON.toJSONString(dom));
} catch (Exception e) {
Loggers.SRV_LOG.error("VIPSRV-SWITCH", "failed to save switch", e);
}

View File

@ -179,6 +179,50 @@ public class RaftCore {
}
public static void signalPublish(String key, String value) throws Exception {
long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
datum.timestamp = System.currentTimeMillis();
JSONObject json = new JSONObject();
json.put("datum", datum);
json.put("source", peers.local());
onPublish(datum, peers.local());
final String content = JSON.toJSONString(json);
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
continue;
}
final String url = buildURL(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.warn("RAFT", "failed to publish data to peer, datumId=" + datum.key + ", peer=" + server + ", http code=" + response.getStatusCode());
return 1;
}
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost " + (end - start) + " ms" + " : " + key);
}
public static void doSignalPublish(String key, String value) throws Exception {
if (!RaftCore.isLeader()) {
JSONObject params = new JSONObject();
params.put("key", key);
@ -195,6 +239,15 @@ public class RaftCore {
throw new IllegalStateException("I'm not leader, can not handle update/delete operation");
}
if (key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) {
signalPublishLocked(key, value);
} else {
signalPublish(key, value);
}
}
public static void signalPublishLocked(String key, String value) throws Exception {
try {
RaftCore.OPERATE_LOCK.lock();
long start = System.currentTimeMillis();
@ -237,6 +290,12 @@ public class RaftCore {
}
if (!latch.await(5000, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.info("data publish failed, caused failed to notify majority, key=" + key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost " + (end - start) + " ms" + " : " + key);
} finally {
@ -930,10 +989,19 @@ public class RaftCore {
public static class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
public void addTask(Datum datum, ApplyAction action) {
if (services.containsKey(datum.key) && action == ApplyAction.CHANGE) {
return;
}
tasks.add(Pair.with(datum, action));
if (action == ApplyAction.CHANGE) {
services.put(datum.key, StringUtils.EMPTY);
}
}
@Override
@ -951,6 +1019,9 @@ public class RaftCore {
Datum datum = (Datum) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datum.key);
int count = 0;
for (RaftListener listener : listeners) {
@ -966,7 +1037,7 @@ public class RaftCore {
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datum.key, datum.value);
listener.onChange(datum.key, getDatum(datum.key).value);
continue;
}

View File

@ -113,7 +113,7 @@ public class RaftCommands {
String value = Arrays.asList(entity).toArray(new String[1])[0];
JSONObject json = JSON.parseObject(value);
RaftCore.signalPublish(json.getString("key"), json.getString("value"));
RaftCore.doSignalPublish(json.getString("key"), json.getString("value"));
return "ok";
}