Differentiate temporary data and persistent data

This commit is contained in:
nkorange 2018-10-31 10:48:15 +08:00
parent 3e01285ab7
commit 4ff38aca3a
2 changed files with 46 additions and 27 deletions

View File

@ -319,18 +319,24 @@ public class RaftCore {
local.resetLeaderDue();
// do apply
if (datum.key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) {
RaftStore.write(datum);
}
RaftCore.datums.put(datum.key, datum);
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
if (datum.key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) {
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
RaftStore.updateTerm(local.term.get());
}
notifier.addTask(datum, Notifier.ApplyAction.CHANGE);
@ -367,18 +373,21 @@ public class RaftCore {
// do apply
String key = params.getString("key");
// deleteDatum(key);
deleteDatum(key);
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
if (key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
RaftStore.updateTerm(local.term.get());
}
// RaftStore.updateTerm(local.term.get());
}
public static class MasterElection implements Runnable {
@ -711,18 +720,23 @@ public class RaftCore {
continue;
}
RaftStore.write(datum);
if (datum.key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) {
RaftStore.write(datum);
}
RaftCore.datums.put(datum.key, datum);
local.resetLeaderDue();
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
if (datum.key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) {
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
RaftStore.updateTerm(local.term.get());
RaftStore.updateTerm(local.term.get());
}
Loggers.RAFT.info("data updated" + ", key=" + datum.key
+ ", timestamp=" + datum.timestamp + ",from " + JSON.toJSONString(remote) + ", local term: " + local.term);
@ -906,9 +920,10 @@ public class RaftCore {
private static void deleteDatum(String key) {
Datum deleted = datums.remove(key);
if (deleted != null) {
RaftStore.delete(deleted);
if (key.startsWith(UtilsAndCommons.DOMAINS_DATA_ID)) {
RaftStore.delete(deleted);
}
notifier.addTask(deleted, Notifier.ApplyAction.DELETE);
Loggers.RAFT.info("datum deleted, key=" + key);
}
}

View File

@ -311,6 +311,10 @@ public class ApiCommands {
ipAddress.setMetadata(clientBeat.getMetadata());
ipAddress.setClusterName(clusterName);
if (!virtualClusterDomain.getClusterMap().containsKey(ipAddress.getClusterName())) {
doAddCluster4Dom(MockHttpRequest.buildRequest(stringMap));
}
if (!virtualClusterDomain.allIPs().contains(ipAddress)) {
stringMap.put("ipList", Arrays.asList(JSON.toJSONString(Arrays.asList(ipAddress))).toArray(new String[1]));
stringMap.put("json", Arrays.asList("true").toArray(new String[1]));