#502 Support delete service in AP mode

This commit is contained in:
nkorange 2019-03-22 20:42:00 +08:00
parent f0431913c0
commit d5dc3e5747
2 changed files with 29 additions and 28 deletions

View File

@ -21,9 +21,9 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.naming.boot.RunningConfig; import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.consistency.ApplyAction; import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.core.Instances; import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service; import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.*; import com.alibaba.nacos.naming.misc.*;
@ -312,7 +312,7 @@ public class RaftCore {
} }
raftStore.updateTerm(local.term.get()); raftStore.updateTerm(local.term.get());
notifier.addTask(datum, ApplyAction.CHANGE); notifier.addTask(datum.key, ApplyAction.CHANGE);
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
} }
@ -353,6 +353,8 @@ public class RaftCore {
raftStore.updateTerm(local.term.get()); raftStore.updateTerm(local.term.get());
} }
Loggers.RAFT.info("data removed, key={}, term={}", datum.key, local.term);
} }
public class MasterElection implements Runnable { public class MasterElection implements Runnable {
@ -699,7 +701,7 @@ public class RaftCore {
} }
datums.put(datum.key, datum); datums.put(datum.key, datum);
notifier.addTask(datum, ApplyAction.CHANGE); notifier.addTask(datum.key, ApplyAction.CHANGE);
local.resetLeaderDue(); local.resetLeaderDue();
@ -844,7 +846,7 @@ public class RaftCore {
public void addDatum(Datum datum) { public void addDatum(Datum datum) {
datums.put(datum.key, datum); datums.put(datum.key, datum);
notifier.addTask(datum, ApplyAction.CHANGE); notifier.addTask(datum.key, ApplyAction.CHANGE);
} }
public void loadDatum(String key) { public void loadDatum(String key) {
@ -862,18 +864,17 @@ public class RaftCore {
private void deleteDatum(String key) { private void deleteDatum(String key) {
Datum deleted = null; Datum deleted;
try { try {
deleted = datums.remove(URLDecoder.decode(key, "UTF-8")); deleted = datums.remove(URLDecoder.decode(key, "UTF-8"));
if (deleted != null) {
raftStore.delete(deleted);
Loggers.RAFT.info("datum deleted, key: {}", key);
}
notifier.addTask(URLDecoder.decode(key, "UTF-8"), ApplyAction.DELETE);
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
Loggers.RAFT.warn("datum key decode failed: {}", key); Loggers.RAFT.warn("datum key decode failed: {}", key);
} }
// FIXME should we ignore the value of 'deleted'?
if (deleted != null) {
raftStore.delete(deleted);
notifier.addTask(deleted, ApplyAction.DELETE);
Loggers.RAFT.info("datum deleted, key: {}", key);
}
} }
public boolean isInitialized() { public boolean isInitialized() {
@ -886,15 +887,15 @@ public class RaftCore {
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024); private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
public void addTask(Datum datum, ApplyAction action) { public void addTask(String datumKey, ApplyAction action) {
if (services.containsKey(datum.key) && action == ApplyAction.CHANGE) { if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return; return;
} }
if (action == ApplyAction.CHANGE) { if (action == ApplyAction.CHANGE) {
services.put(datum.key, StringUtils.EMPTY); services.put(datumKey, StringUtils.EMPTY);
} }
tasks.add(Pair.with(datum, action)); tasks.add(Pair.with(datumKey, action));
} }
public int getTaskSize() { public int getTaskSize() {
@ -914,58 +915,58 @@ public class RaftCore {
continue; continue;
} }
Datum datum = (Datum) pair.getValue0(); String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1(); ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datum.key); services.remove(datumKey);
int count = 0; int count = 0;
if (listeners.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) { if (listeners.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
if (KeyBuilder.matchServiceMetaKey(datum.key) && !KeyBuilder.matchSwitchKey(datum.key)) { if (KeyBuilder.matchServiceMetaKey(datumKey) && !KeyBuilder.matchSwitchKey(datumKey)) {
for (RecordListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) { for (RecordListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
try { try {
if (action == ApplyAction.CHANGE) { if (action == ApplyAction.CHANGE) {
listener.onChange(datum.key, getDatum(datum.key).value); listener.onChange(datumKey, getDatum(datumKey).value);
} }
if (action == ApplyAction.DELETE) { if (action == ApplyAction.DELETE) {
listener.onDelete(datum.key); listener.onDelete(datumKey);
} }
} catch (Throwable e) { } catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datum.key, e); Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datumKey, e);
} }
} }
} }
} }
if (!listeners.containsKey(datum.key)) { if (!listeners.containsKey(datumKey)) {
continue; continue;
} }
for (RecordListener listener : listeners.get(datum.key)) { for (RecordListener listener : listeners.get(datumKey)) {
count++; count++;
try { try {
if (action == ApplyAction.CHANGE) { if (action == ApplyAction.CHANGE) {
listener.onChange(datum.key, getDatum(datum.key).value); listener.onChange(datumKey, getDatum(datumKey).value);
continue; continue;
} }
if (action == ApplyAction.DELETE) { if (action == ApplyAction.DELETE) {
listener.onDelete(datum.key); listener.onDelete(datumKey);
continue; continue;
} }
} catch (Throwable e) { } catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datum.key, e); Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datumKey, e);
} }
} }
if (Loggers.RAFT.isDebugEnabled()) { if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[NACOS-RAFT] datum change notified, key: {}, listener count: {}", datum.key, count); Loggers.RAFT.debug("[NACOS-RAFT] datum change notified, key: {}, listener count: {}", datumKey, count);
} }
} catch (Throwable e) { } catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] Error while handling notifying task", e); Loggers.RAFT.error("[NACOS-RAFT] Error while handling notifying task", e);

View File

@ -63,7 +63,7 @@ public class RaftStore {
datum = readDatum(datumFile, cache.getName()); datum = readDatum(datumFile, cache.getName());
if (datum != null) { if (datum != null) {
datums.put(datum.key, datum); datums.put(datum.key, datum);
notifier.addTask(datum, ApplyAction.CHANGE); notifier.addTask(datum.key, ApplyAction.CHANGE);
} }
} }
continue; continue;