refactor: refactor issue #4291 (#4292)

This commit is contained in:
liaochuntao 2020-11-23 09:49:08 +08:00 committed by GitHub
parent 8500279c79
commit a43bf8fcfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 473 additions and 256 deletions

View File

@ -22,6 +22,7 @@ import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants; import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.misc.GlobalExecutor; import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collection; import java.util.Collection;
@ -64,6 +65,11 @@ public class ClusterVersionJudgement {
} }
protected void runVersionListener() { protected void runVersionListener() {
// Single machine mode, do upgrade operation directly.
if (EnvUtil.getStandaloneMode()) {
notifyAllListener();
return;
}
try { try {
judge(); judge();
} finally { } finally {
@ -72,6 +78,7 @@ public class ClusterVersionJudgement {
} }
protected void judge() { protected void judge() {
Collection<Member> members = memberManager.allMembers(); Collection<Member> members = memberManager.allMembers();
final String oldVersion = "1.4.0"; final String oldVersion = "1.4.0";
boolean allMemberIsNewVersion = true; boolean allMemberIsNewVersion = true;
@ -83,15 +90,19 @@ public class ClusterVersionJudgement {
} }
// can only trigger once // can only trigger once
if (allMemberIsNewVersion && !this.allMemberIsNewVersion) { if (allMemberIsNewVersion && !this.allMemberIsNewVersion) {
this.allMemberIsNewVersion = true; notifyAllListener();
Collections.sort(observers);
for (ConsumerWithPriority consumer : observers) {
consumer.consumer.accept(true);
}
observers.clear();
} }
} }
private void notifyAllListener() {
this.allMemberIsNewVersion = true;
Collections.sort(observers);
for (ConsumerWithPriority consumer : observers) {
consumer.consumer.accept(true);
}
observers.clear();
}
public boolean allMemberIsNewVersion() { public boolean allMemberIsNewVersion() {
return allMemberIsNewVersion; return allMemberIsNewVersion;
} }

View File

@ -19,7 +19,7 @@ package com.alibaba.nacos.naming.consistency.persistent;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.RecordListener; import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.impl.PersistentServiceProcessor; import com.alibaba.nacos.naming.consistency.persistent.impl.BasePersistentServiceProcessor;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl; import com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl;
import com.alibaba.nacos.naming.pojo.Record; import com.alibaba.nacos.naming.pojo.Record;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -36,13 +36,13 @@ public class PersistentConsistencyServiceDelegateImpl implements PersistentConsi
private final RaftConsistencyServiceImpl oldPersistentConsistencyService; private final RaftConsistencyServiceImpl oldPersistentConsistencyService;
private final PersistentServiceProcessor newPersistentConsistencyService; private final BasePersistentServiceProcessor newPersistentConsistencyService;
private volatile boolean switchNewPersistentService = false; private volatile boolean switchNewPersistentService = false;
public PersistentConsistencyServiceDelegateImpl(ClusterVersionJudgement versionJudgement, public PersistentConsistencyServiceDelegateImpl(ClusterVersionJudgement versionJudgement,
RaftConsistencyServiceImpl oldPersistentConsistencyService, RaftConsistencyServiceImpl oldPersistentConsistencyService,
PersistentServiceProcessor newPersistentConsistencyService) { BasePersistentServiceProcessor newPersistentConsistencyService) {
this.versionJudgement = versionJudgement; this.versionJudgement = versionJudgement;
this.oldPersistentConsistencyService = oldPersistentConsistencyService; this.oldPersistentConsistencyService = oldPersistentConsistencyService;
this.newPersistentConsistencyService = newPersistentConsistencyService; this.newPersistentConsistencyService = newPersistentConsistencyService;

View File

@ -0,0 +1,265 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.cp.RequestProcessor4CP;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.utils.Constants;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.reflect.TypeUtils;
import java.lang.reflect.Type;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* New service data persistence handler.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public abstract class BasePersistentServiceProcessor extends RequestProcessor4CP
implements PersistentConsistencyService {
enum Op {
/**
* write ops.
*/
Write("Write"),
/**
* read ops.
*/
Read("Read"),
/**
* delete ops.
*/
Delete("Delete");
protected final String desc;
Op(String desc) {
this.desc = desc;
}
}
protected final KvStorage kvStorage;
protected final Serializer serializer;
/**
* Whether an unrecoverable error occurred.
*/
protected volatile boolean hasError = false;
/**
* If use old raft, should not notify listener even new listener add.
*/
protected volatile boolean startNotify = false;
/**
* During snapshot processing, the processing of other requests needs to be paused.
*/
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
protected final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
protected final ClusterVersionJudgement versionJudgement;
protected final PersistentNotifier notifier;
public BasePersistentServiceProcessor(final ClusterVersionJudgement judgement) throws Exception {
this.versionJudgement = judgement;
this.kvStorage = new NamingKvStorage(Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString());
this.serializer = SerializeFactory.getSerializer("JSON");
this.notifier = new PersistentNotifier(key -> {
try {
byte[] data = kvStorage.get(ByteUtils.toBytes(key));
Datum datum = serializer.deserialize(data, getDatumTypeFromKey(key));
return null != datum ? datum.value : null;
} catch (KvStorageException ex) {
throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg());
}
});
afterConstruct();
}
@SuppressWarnings("unchecked")
protected void afterConstruct() {
NotifyCenter.registerToPublisher(ValueChangeEvent.class, 16384);
listenOldRaftClose();
}
private void listenOldRaftClose() {
this.versionJudgement.registerObserver(isNewVersion -> {
if (isNewVersion) {
NotifyCenter.registerSubscriber(notifier);
startNotify = true;
}
}, 10);
}
@Override
public Response onRequest(ReadRequest request) {
final List<byte[]> keys = serializer
.deserialize(request.getData().toByteArray(), TypeUtils.parameterize(List.class, byte[].class));
final Lock lock = readLock;
lock.lock();
try {
final Map<byte[], byte[]> result = kvStorage.batchGet(keys);
final BatchReadResponse response = new BatchReadResponse();
result.forEach(response::append);
return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(serializer.serialize(response)))
.build();
} catch (KvStorageException e) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build();
} finally {
lock.unlock();
}
}
@Override
public Response onApply(WriteRequest request) {
final byte[] data = request.getData().toByteArray();
final BatchWriteRequest bwRequest = serializer.deserialize(data, BatchWriteRequest.class);
final Op op = Op.valueOf(request.getOperation());
final Lock lock = readLock;
lock.lock();
try {
switch (op) {
case Write:
kvStorage.batchPut(bwRequest.getKeys(), bwRequest.getValues());
break;
case Delete:
kvStorage.batchDelete(bwRequest.getKeys());
break;
default:
return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + op).build();
}
publishValueChangeEvent(op, bwRequest);
return Response.newBuilder().setSuccess(true).build();
} catch (KvStorageException e) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build();
} finally {
lock.unlock();
}
}
private void publishValueChangeEvent(final Op op, final BatchWriteRequest request) {
final List<byte[]> keys = request.getKeys();
final List<byte[]> values = request.getValues();
for (int i = 0; i < keys.size(); i++) {
final String key = new String(keys.get(i));
final Datum datum = serializer.deserialize(values.get(i), getDatumTypeFromKey(key));
final Record value = null != datum ? datum.value : null;
final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value)
.action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build();
NotifyCenter.publishEvent(event);
}
}
@Override
public String group() {
return Constants.NAMING_PERSISTENT_SERVICE_GROUP;
}
@Override
public List<SnapshotOperation> loadSnapshotOperate() {
return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, lock));
}
@Override
public void onError(Throwable error) {
super.onError(error);
hasError = true;
}
protected Type getDatumTypeFromKey(String key) {
return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key));
}
protected Class<? extends Record> getClassOfRecordFromKey(String key) {
if (KeyBuilder.matchSwitchKey(key)) {
return com.alibaba.nacos.naming.misc.SwitchDomain.class;
} else if (KeyBuilder.matchServiceMetaKey(key)) {
return com.alibaba.nacos.naming.core.Service.class;
} else if (KeyBuilder.matchInstanceListKey(key)) {
return com.alibaba.nacos.naming.core.Instances.class;
}
return Record.class;
}
protected void notifierDatumIfAbsent(String key, RecordListener listener) throws NacosException {
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
notifierAllServiceMeta(listener);
} else {
Datum datum = get(key);
if (null != datum) {
notifierDatum(key, datum, listener);
}
}
}
/**
* This notify should only notify once during startup. See {@link com.alibaba.nacos.naming.core.ServiceManager#init()}
*/
private void notifierAllServiceMeta(RecordListener listener) throws NacosException {
for (byte[] each : kvStorage.allKeys()) {
String key = new String(each);
if (listener.interests(key)) {
Datum datum = get(key);
if (null != datum) {
notifierDatum(key, datum, listener);
}
}
}
}
private void notifierDatum(String key, Datum datum, RecordListener listener) {
try {
listener.onChange(key, datum.value);
} catch (Exception e) {
Loggers.RAFT.error("NACOS-RAFT failed to notify listener", e);
}
}
}

View File

@ -1,151 +1,74 @@
/* /*
* Copyright 1999-2018 Alibaba Group Holding Ltd. * Copyright 1999-2018 Alibaba Group Holding Ltd.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.nacos.naming.consistency.persistent.impl; package com.alibaba.nacos.naming.consistency.persistent.impl;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.ByteUtils; import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.cp.CPProtocol; import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.consistency.cp.RequestProcessor4CP;
import com.alibaba.nacos.consistency.cp.MetadataKey; import com.alibaba.nacos.consistency.cp.MetadataKey;
import com.alibaba.nacos.consistency.entity.ReadRequest; import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response; import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest; import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.core.distributed.ProtocolManager; import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.exception.ErrorCode; import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener; import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Record; import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.utils.Constants; import com.alibaba.nacos.naming.utils.Constants;
import com.alibaba.nacos.sys.env.EnvUtil; import com.alibaba.nacos.sys.env.EnvUtil;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import org.apache.commons.lang3.reflect.TypeUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.lang.reflect.Type;
import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* New service data persistence handler. * In cluster mode, start the Raft protocol.
* *
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/ */
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule") @SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
@ConditionalOnProperty(value = "nacos.standalone", havingValue = "false")
@Service @Service
public class PersistentServiceProcessor extends RequestProcessor4CP implements PersistentConsistencyService { public class PersistentServiceProcessor extends BasePersistentServiceProcessor {
enum Op {
/**
* write ops.
*/
Write("Write"),
/**
* read ops.
*/
Read("Read"),
/**
* delete ops.
*/
Delete("Delete");
private final String desc;
Op(String desc) {
this.desc = desc;
}
}
private final CPProtocol protocol; private final CPProtocol protocol;
private final KvStorage kvStorage;
private final ClusterVersionJudgement versionJudgement;
private final Serializer serializer;
/**
* During snapshot processing, the processing of other requests needs to be paused.
*/
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final PersistentNotifier notifier;
/** /**
* Is there a leader node currently. * Is there a leader node currently.
*/ */
private volatile boolean hasLeader = false; private volatile boolean hasLeader = false;
/** public PersistentServiceProcessor(ProtocolManager protocolManager, ClusterVersionJudgement versionJudgement)
* Whether an unrecoverable error occurred. throws Exception {
*/ super(versionJudgement);
private volatile boolean hasError = false;
/**
* If use old raft, should not notify listener even new listener add.
*/
private volatile boolean startNotify = false;
public PersistentServiceProcessor(final ProtocolManager protocolManager,
final ClusterVersionJudgement versionJudgement) throws Exception {
this.protocol = protocolManager.getCpProtocol(); this.protocol = protocolManager.getCpProtocol();
this.versionJudgement = versionJudgement;
this.kvStorage = new NamingKvStorage(Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString());
this.serializer = SerializeFactory.getSerializer("JSON");
this.notifier = new PersistentNotifier(key -> {
try {
byte[] data = kvStorage.get(ByteUtils.toBytes(key));
Datum datum = serializer.deserialize(data, getDatumTypeFromKey(key));
return null != datum ? datum.value : null;
} catch (KvStorageException ex) {
throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg());
}
});
init();
} }
@SuppressWarnings("unchecked") @Override
private void init() { protected void afterConstruct() {
NotifyCenter.registerToPublisher(ValueChangeEvent.class, 16384); super.afterConstruct();
this.protocol.addLogProcessors(Collections.singletonList(this)); this.protocol.addLogProcessors(Collections.singletonList(this));
this.protocol.protocolMetaData() this.protocol.protocolMetaData()
.subscribe(Constants.NAMING_PERSISTENT_SERVICE_GROUP, MetadataKey.LEADER_META_DATA, .subscribe(Constants.NAMING_PERSISTENT_SERVICE_GROUP, MetadataKey.LEADER_META_DATA,
@ -155,13 +78,6 @@ public class PersistentServiceProcessor extends RequestProcessor4CP implements P
NotifyCenter.registerSubscriber(notifier); NotifyCenter.registerSubscriber(notifier);
waitLeader(); waitLeader();
startNotify = true; startNotify = true;
} else {
this.versionJudgement.registerObserver(isNewVersion -> {
if (isNewVersion) {
NotifyCenter.registerSubscriber(notifier);
startNotify = true;
}
}, 10);
} }
} }
@ -175,75 +91,6 @@ public class PersistentServiceProcessor extends RequestProcessor4CP implements P
} }
} }
@Override
public Response onRequest(ReadRequest request) {
final List<byte[]> keys = serializer
.deserialize(request.getData().toByteArray(), TypeUtils.parameterize(List.class, byte[].class));
final Lock lock = readLock;
lock.lock();
try {
final Map<byte[], byte[]> result = kvStorage.batchGet(keys);
final BatchReadResponse response = new BatchReadResponse();
result.forEach(response::append);
return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(serializer.serialize(response)))
.build();
} catch (KvStorageException e) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build();
} finally {
lock.unlock();
}
}
@Override
public Response onApply(WriteRequest request) {
final byte[] data = request.getData().toByteArray();
final BatchWriteRequest bwRequest = serializer.deserialize(data, BatchWriteRequest.class);
final Op op = Op.valueOf(request.getOperation());
final Lock lock = readLock;
lock.lock();
try {
switch (op) {
case Write:
kvStorage.batchPut(bwRequest.getKeys(), bwRequest.getValues());
break;
case Delete:
kvStorage.batchDelete(bwRequest.getKeys());
break;
default:
return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + op).build();
}
publishValueChangeEvent(op, bwRequest);
return Response.newBuilder().setSuccess(true).build();
} catch (KvStorageException e) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build();
} finally {
lock.unlock();
}
}
private void publishValueChangeEvent(final Op op, final BatchWriteRequest request) {
final List<byte[]> keys = request.getKeys();
final List<byte[]> values = request.getValues();
for (int i = 0; i < keys.size(); i++) {
final String key = new String(keys.get(i));
final Datum datum = serializer.deserialize(values.get(i), getDatumTypeFromKey(key));
final Record value = null != datum ? datum.value : null;
final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value)
.action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build();
NotifyCenter.publishEvent(event);
}
}
@Override
public String group() {
return Constants.NAMING_PERSISTENT_SERVICE_GROUP;
}
@Override
public List<SnapshotOperation> loadSnapshotOperate() {
return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, lock));
}
@Override @Override
public void put(String key, Record value) throws NacosException { public void put(String key, Record value) throws NacosException {
final BatchWriteRequest req = new BatchWriteRequest(); final BatchWriteRequest req = new BatchWriteRequest();
@ -304,63 +151,8 @@ public class PersistentServiceProcessor extends RequestProcessor4CP implements P
notifier.deregisterListener(key, listener); notifier.deregisterListener(key, listener);
} }
@Override
public void onError(Throwable error) {
super.onError(error);
hasError = true;
}
@Override @Override
public boolean isAvailable() { public boolean isAvailable() {
return hasLeader && !hasError; return hasLeader && !hasError;
} }
private Type getDatumTypeFromKey(String key) {
return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key));
}
private Class<? extends Record> getClassOfRecordFromKey(String key) {
if (KeyBuilder.matchSwitchKey(key)) {
return com.alibaba.nacos.naming.misc.SwitchDomain.class;
} else if (KeyBuilder.matchServiceMetaKey(key)) {
return com.alibaba.nacos.naming.core.Service.class;
} else if (KeyBuilder.matchInstanceListKey(key)) {
return com.alibaba.nacos.naming.core.Instances.class;
}
return Record.class;
}
private void notifierDatumIfAbsent(String key, RecordListener listener) throws NacosException {
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
notifierAllServiceMeta(listener);
} else {
Datum datum = get(key);
if (null != datum) {
notifierDatum(key, datum, listener);
}
}
}
/**
* This notify should only notify once during startup. See {@link com.alibaba.nacos.naming.core.ServiceManager#init()}
*/
private void notifierAllServiceMeta(RecordListener listener) throws NacosException {
for (byte[] each : kvStorage.allKeys()) {
String key = new String(each);
if (listener.interests(key)) {
Datum datum = get(key);
if (null != datum) {
notifierDatum(key, datum, listener);
}
}
}
}
private void notifierDatum(String key, Datum datum, RecordListener listener) {
try {
listener.onChange(key, datum.value);
} catch (Exception e) {
Loggers.RAFT.error("NACOS-RAFT failed to notify listener", e);
}
}
} }

View File

@ -0,0 +1,114 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent.impl;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.naming.utils.Constants;
import com.google.protobuf.ByteString;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
/**
* Persistent service manipulation layer in stand-alone mode.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
@ConditionalOnProperty(value = "nacos.standalone", havingValue = "true")
@Service
public class StandalonePersistentServiceProcessor extends BasePersistentServiceProcessor {
public StandalonePersistentServiceProcessor(final ClusterVersionJudgement judgement) throws Exception {
super(judgement);
}
@Override
public void put(String key, Record value) throws NacosException {
final BatchWriteRequest req = new BatchWriteRequest();
Datum datum = Datum.createDatum(key, value);
req.append(ByteUtils.toBytes(key), serializer.serialize(datum));
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
try {
onApply(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
}
@Override
public void remove(String key) throws NacosException {
final BatchWriteRequest req = new BatchWriteRequest();
req.append(ByteUtils.toBytes(key), ByteUtils.EMPTY);
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build();
try {
onApply(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
}
@Override
public Datum get(String key) throws NacosException {
final List<byte[]> keys = Collections.singletonList(ByteUtils.toBytes(key));
final ReadRequest req = ReadRequest.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP)
.setData(ByteString.copyFrom(serializer.serialize(keys))).build();
try {
final Response resp = onRequest(req);
if (resp.getSuccess()) {
BatchReadResponse response = serializer
.deserialize(resp.getData().toByteArray(), BatchReadResponse.class);
final List<byte[]> rValues = response.getValues();
return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key));
}
throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg());
} catch (Throwable e) {
throw new NacosException(ErrorCode.ProtoReadError.getCode(), e.getMessage());
}
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
notifier.registerListener(key, listener);
if (startNotify) {
notifierDatumIfAbsent(key, listener);
}
}
@Override
public void unListen(String key, RecordListener listener) throws NacosException {
notifier.deregisterListener(key, listener);
}
@Override
public boolean isAvailable() {
return !hasError;
}
}

View File

@ -127,6 +127,8 @@ public class RaftCore implements Closeable {
private final EventPublisher publisher; private final EventPublisher publisher;
private final RaftListener raftListener;
private boolean initialized = false; private boolean initialized = false;
private volatile boolean stopWork = false; private volatile boolean stopWork = false;
@ -136,7 +138,7 @@ public class RaftCore implements Closeable {
private ScheduledFuture heartbeatTask = null; private ScheduledFuture heartbeatTask = null;
public RaftCore(RaftPeerSet peers, SwitchDomain switchDomain, GlobalConfig globalConfig, RaftProxy raftProxy, public RaftCore(RaftPeerSet peers, SwitchDomain switchDomain, GlobalConfig globalConfig, RaftProxy raftProxy,
RaftStore raftStore, ClusterVersionJudgement versionJudgement) { RaftStore raftStore, ClusterVersionJudgement versionJudgement, RaftListener raftListener) {
this.peers = peers; this.peers = peers;
this.switchDomain = switchDomain; this.switchDomain = switchDomain;
this.globalConfig = globalConfig; this.globalConfig = globalConfig;
@ -145,6 +147,7 @@ public class RaftCore implements Closeable {
this.versionJudgement = versionJudgement; this.versionJudgement = versionJudgement;
this.notifier = new PersistentNotifier(key -> null == getDatum(key) ? null : getDatum(key).value); this.notifier = new PersistentNotifier(key -> null == getDatum(key) ? null : getDatum(key).value);
this.publisher = NotifyCenter.registerToPublisher(ValueChangeEvent.class, 16384); this.publisher = NotifyCenter.registerToPublisher(ValueChangeEvent.class, 16384);
this.raftListener = raftListener;
} }
/** /**
@ -175,6 +178,7 @@ public class RaftCore implements Closeable {
if (stopWork) { if (stopWork) {
try { try {
shutdown(); shutdown();
raftListener.removeOldRaftMetadata();
} catch (NacosException e) { } catch (NacosException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e); throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
} }

View File

@ -21,12 +21,15 @@ import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.utils.Constants;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener; import org.springframework.context.event.SmartApplicationListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* Inject the raft information from the naming module into the outlier information of the node. * Inject the raft information from the naming module into the outlier information of the node.
@ -38,14 +41,21 @@ import java.util.Map;
@Component @Component
public class RaftListener implements SmartApplicationListener { public class RaftListener implements SmartApplicationListener {
private static final String GROUP = "naming";
private final ServerMemberManager memberManager; private final ServerMemberManager memberManager;
private final ClusterVersionJudgement versionJudgement; private final ClusterVersionJudgement versionJudgement;
private volatile boolean stopUpdate = false; private volatile boolean stopUpdate = false;
/**
* Avoid multithreading mode. Old Raft information data cannot be properly removed.
*/
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
public RaftListener(ServerMemberManager memberManager, ClusterVersionJudgement versionJudgement) { public RaftListener(ServerMemberManager memberManager, ClusterVersionJudgement versionJudgement) {
this.memberManager = memberManager; this.memberManager = memberManager;
this.versionJudgement = versionJudgement; this.versionJudgement = versionJudgement;
@ -54,12 +64,15 @@ public class RaftListener implements SmartApplicationListener {
private void init() { private void init() {
this.versionJudgement.registerObserver(isAllNewVersion -> { this.versionJudgement.registerObserver(isAllNewVersion -> {
stopUpdate = isAllNewVersion; final Lock lock = this.writeLock;
if (stopUpdate) { lock.lock();
Loggers.RAFT.warn("start to move old raft protocol metadata"); try {
Member self = memberManager.getSelf(); stopUpdate = isAllNewVersion;
self.delExtendVal(GROUP); if (stopUpdate) {
memberManager.update(self); removeOldRaftMetadata();
}
} finally {
lock.unlock();
} }
}, -2); }, -2);
} }
@ -71,14 +84,30 @@ public class RaftListener implements SmartApplicationListener {
@Override @Override
public void onApplicationEvent(ApplicationEvent event) { public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof BaseRaftEvent && !stopUpdate) { final Lock lock = readLock;
BaseRaftEvent raftEvent = (BaseRaftEvent) event; lock.lock();
RaftPeer local = raftEvent.getLocal(); try {
String json = JacksonUtils.toJson(local); if (event instanceof BaseRaftEvent && !stopUpdate) {
Map map = JacksonUtils.toObj(json, HashMap.class); BaseRaftEvent raftEvent = (BaseRaftEvent) event;
Member self = memberManager.getSelf(); RaftPeer local = raftEvent.getLocal();
self.setExtendVal(GROUP, map); String json = JacksonUtils.toJson(local);
memberManager.update(self); Map map = JacksonUtils.toObj(json, HashMap.class);
Member self = memberManager.getSelf();
self.setExtendVal(Constants.OLD_NAMING_RAFT_GROUP, map);
memberManager.update(self);
}
if (stopUpdate) {
removeOldRaftMetadata();
}
} finally {
lock.unlock();
} }
} }
void removeOldRaftMetadata() {
Loggers.RAFT.warn("start to move old raft protocol metadata");
Member self = memberManager.getSelf();
self.delExtendVal(Constants.OLD_NAMING_RAFT_GROUP);
memberManager.update(self);
}
} }

View File

@ -139,8 +139,8 @@ public class GlobalExecutor {
NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.MILLISECONDS); NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.MILLISECONDS);
} }
public static void submitClusterVersionJudge(Runnable runnable, long delay) { public static ScheduledFuture submitClusterVersionJudge(Runnable runnable, long delay) {
NAMING_TIMER_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS); return NAMING_TIMER_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
} }
public static void submitDistroNotifyTask(Runnable runnable) { public static void submitDistroNotifyTask(Runnable runnable) {

View File

@ -26,6 +26,8 @@ public final class Constants {
private Constants() {} private Constants() {}
public static final String OLD_NAMING_RAFT_GROUP = "naming";
public static final String NAMING_PERSISTENT_SERVICE_GROUP = "naming_persistent_service"; public static final String NAMING_PERSISTENT_SERVICE_GROUP = "naming_persistent_service";
public static final String NACOS_NAMING_USE_NEW_RAFT_FIRST = "nacos.naming.use-new-raft.first"; public static final String NACOS_NAMING_USE_NEW_RAFT_FIRST = "nacos.naming.use-new-raft.first";