Use datum in new raft processor to compatible old data

This commit is contained in:
KomachiSion 2020-09-24 17:03:24 +08:00
parent f792300da2
commit eceab55cef
2 changed files with 17 additions and 71 deletions

View File

@ -44,8 +44,6 @@ import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftStore;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.utils.Constants;
@ -53,13 +51,12 @@ import com.google.protobuf.ByteString;
import org.apache.commons.lang3.reflect.TypeUtils;
import org.springframework.stereotype.Service;
import java.lang.reflect.Type;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -99,8 +96,6 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
private final KvStorage kvStorage;
private final RaftStore oldStore;
private final ClusterVersionJudgement versionJudgement;
private final Serializer serializer;
@ -125,17 +120,17 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
private volatile boolean hasError = false;
public PersistentServiceProcessor(final ProtocolManager protocolManager,
final ClusterVersionJudgement versionJudgement, final RaftStore oldStore) throws Exception {
final ClusterVersionJudgement versionJudgement) throws Exception {
this.protocol = protocolManager.getCpProtocol();
this.oldStore = oldStore;
this.versionJudgement = versionJudgement;
this.kvStorage = StorageFactory.createKvStorage(KvStorage.KvType.File, "naming-persistent",
Paths.get(UtilsAndCommons.DATA_BASE_DIR, "persistent").toString());
Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString());
this.serializer = SerializeFactory.getSerializer("JSON");
this.notifier = new PersistentNotifier(key -> {
try {
byte[] data = kvStorage.get(ByteUtils.toBytes(key));
return serializer.deserialize(data, getClassForDeserialize(key));
Datum datum = serializer.deserialize(data, getDatumTypeFromKey(key));
return null != datum ? datum.value : null;
} catch (KvStorageException ex) {
throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg());
}
@ -156,7 +151,6 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
} else {
this.versionJudgement.registerObserver(isNewVersion -> {
if (isNewVersion) {
loadFromOldData();
NotifyCenter.registerSubscriber(notifier);
}
}, 10);
@ -214,7 +208,8 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
final List<byte[]> values = request.getValues();
for (int i = 0; i < keys.size(); i++) {
final String key = new String(keys.get(i));
final Record value = serializer.deserialize(values.get(i), getClassForDeserialize(key));
final Datum datum = serializer.deserialize(values.get(i), getDatumTypeFromKey(key));
final Record value = null != datum ? datum.value : null;
final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value)
.action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build();
NotifyCenter.publishEvent(event);
@ -231,62 +226,11 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, lock));
}
/**
* Pull old data into the new data store. When loading old data information, write locks must be added, and new
* requests can be processed only after the old data has been loaded
*/
@SuppressWarnings("unchecked")
public void loadFromOldData() {
final Lock lock = this.lock.writeLock();
lock.lock();
Loggers.RAFT.warn("start to load data to new raft protocol!!!");
try {
if (protocol.isLeader(Constants.NAMING_PERSISTENT_SERVICE_GROUP)) {
Map<String, Datum> datumMap = new HashMap<>(64);
oldStore.loadDatums(null, datumMap);
int totalSize = datumMap.size();
List<byte[]> keys = new ArrayList<>(totalSize);
List<byte[]> values = new ArrayList<>(totalSize);
int batchSize = 100;
List<CompletableFuture> futures = new ArrayList<>(16);
for (Map.Entry<String, Datum> entry : datumMap.entrySet()) {
totalSize--;
keys.add(ByteUtils.toBytes(entry.getKey()));
values.add(serializer.serialize(entry.getValue().value));
if (keys.size() == batchSize || totalSize == 0) {
BatchWriteRequest request = new BatchWriteRequest();
request.setKeys(keys);
request.setValues(values);
CompletableFuture future = protocol.submitAsync(
Log.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP)
.setOperation(Op.Write.name())
.setData(ByteString.copyFrom(serializer.serialize(request))).build())
.whenComplete(((response, throwable) -> {
if (throwable == null) {
Loggers.RAFT.info("submit old raft data result : {}", response);
} else {
Loggers.RAFT.error("submit old raft data occur exception : {}", throwable);
}
}));
futures.add(future);
keys.clear();
values.clear();
}
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
} catch (Throwable ex) {
hasError = true;
Loggers.RAFT.error("load old raft data occur exception : {}", ex);
} finally {
lock.unlock();
}
}
@Override
public void put(String key, Record value) throws NacosException {
final BatchWriteRequest req = new BatchWriteRequest();
req.append(ByteUtils.toBytes(key), serializer.serialize(value));
Datum datum = Datum.createDatum(key, value);
req.append(ByteUtils.toBytes(key), serializer.serialize(datum));
final Log log = Log.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
try {
@ -321,9 +265,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
BatchReadResponse response = serializer
.deserialize(resp.getData().toByteArray(), BatchReadResponse.class);
final List<byte[]> rValues = response.getValues();
Record record =
rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getClassForDeserialize(key));
return Datum.createDatum(key, record);
return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key));
}
throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg());
} catch (Throwable e) {
@ -352,7 +294,11 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
return hasLeader && !hasError;
}
private Class<? extends Record> getClassForDeserialize(String key) {
private Type getDatumTypeFromKey(String key) {
return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key));
}
private Class<? extends Record> getClassOfRecordFromKey(String key) {
if (KeyBuilder.matchSwitchKey(key)) {
return com.alibaba.nacos.naming.misc.SwitchDomain.class;
} else if (KeyBuilder.matchServiceMetaKey(key)) {

View File

@ -71,7 +71,7 @@ public class RaftPeerSet extends MemberChangeListener implements Closeable {
private volatile boolean ready = false;
private Set<Member> oldMembers;
private Set<Member> oldMembers = new HashSet<>();
public RaftPeerSet(ServerMemberManager memberManager) {
this.memberManager = memberManager;
@ -317,7 +317,7 @@ public class RaftPeerSet extends MemberChangeListener implements Closeable {
@Override
public void onEvent(MembersChangeEvent event) {
Collection<Member> members = event.getMembers();
if (oldMembers == null) {
if (oldMembers.isEmpty()) {
oldMembers = new HashSet<>(members);
} else {
oldMembers.removeAll(members);