diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/ClusterVersionJudgement.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/ClusterVersionJudgement.java index 6788fd38f..1ea1c70ce 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/ClusterVersionJudgement.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/ClusterVersionJudgement.java @@ -22,6 +22,7 @@ import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.MemberMetaDataConstants; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.naming.misc.GlobalExecutor; +import com.alibaba.nacos.sys.env.EnvUtil; import org.springframework.stereotype.Component; import java.util.Collection; @@ -64,6 +65,11 @@ public class ClusterVersionJudgement { } protected void runVersionListener() { + // Single machine mode, do upgrade operation directly. + if (EnvUtil.getStandaloneMode()) { + notifyAllListener(); + return; + } try { judge(); } finally { @@ -72,6 +78,7 @@ public class ClusterVersionJudgement { } protected void judge() { + Collection members = memberManager.allMembers(); final String oldVersion = "1.4.0"; boolean allMemberIsNewVersion = true; @@ -83,15 +90,19 @@ public class ClusterVersionJudgement { } // can only trigger once if (allMemberIsNewVersion && !this.allMemberIsNewVersion) { - this.allMemberIsNewVersion = true; - Collections.sort(observers); - for (ConsumerWithPriority consumer : observers) { - consumer.consumer.accept(true); - } - observers.clear(); + notifyAllListener(); } } + private void notifyAllListener() { + this.allMemberIsNewVersion = true; + Collections.sort(observers); + for (ConsumerWithPriority consumer : observers) { + consumer.consumer.accept(true); + } + observers.clear(); + } + public boolean allMemberIsNewVersion() { return allMemberIsNewVersion; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyServiceDelegateImpl.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyServiceDelegateImpl.java index a22d22d26..5557713d0 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyServiceDelegateImpl.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/PersistentConsistencyServiceDelegateImpl.java @@ -19,7 +19,7 @@ package com.alibaba.nacos.naming.consistency.persistent; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.RecordListener; -import com.alibaba.nacos.naming.consistency.persistent.impl.PersistentServiceProcessor; +import com.alibaba.nacos.naming.consistency.persistent.impl.BasePersistentServiceProcessor; import com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl; import com.alibaba.nacos.naming.pojo.Record; import org.springframework.stereotype.Component; @@ -36,13 +36,13 @@ public class PersistentConsistencyServiceDelegateImpl implements PersistentConsi private final RaftConsistencyServiceImpl oldPersistentConsistencyService; - private final PersistentServiceProcessor newPersistentConsistencyService; + private final BasePersistentServiceProcessor newPersistentConsistencyService; private volatile boolean switchNewPersistentService = false; public PersistentConsistencyServiceDelegateImpl(ClusterVersionJudgement versionJudgement, RaftConsistencyServiceImpl oldPersistentConsistencyService, - PersistentServiceProcessor newPersistentConsistencyService) { + BasePersistentServiceProcessor newPersistentConsistencyService) { this.versionJudgement = versionJudgement; this.oldPersistentConsistencyService = oldPersistentConsistencyService; this.newPersistentConsistencyService = newPersistentConsistencyService; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/BasePersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/BasePersistentServiceProcessor.java new file mode 100644 index 000000000..127f867dc --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/BasePersistentServiceProcessor.java @@ -0,0 +1,265 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.consistency.persistent.impl; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.utils.ByteUtils; +import com.alibaba.nacos.consistency.DataOperation; +import com.alibaba.nacos.consistency.SerializeFactory; +import com.alibaba.nacos.consistency.Serializer; +import com.alibaba.nacos.consistency.cp.RequestProcessor4CP; +import com.alibaba.nacos.consistency.entity.ReadRequest; +import com.alibaba.nacos.consistency.entity.Response; +import com.alibaba.nacos.consistency.entity.WriteRequest; +import com.alibaba.nacos.consistency.snapshot.SnapshotOperation; +import com.alibaba.nacos.core.exception.KvStorageException; +import com.alibaba.nacos.core.storage.kv.KvStorage; +import com.alibaba.nacos.naming.consistency.Datum; +import com.alibaba.nacos.naming.consistency.KeyBuilder; +import com.alibaba.nacos.naming.consistency.RecordListener; +import com.alibaba.nacos.naming.consistency.ValueChangeEvent; +import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; +import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService; +import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier; +import com.alibaba.nacos.naming.misc.Loggers; +import com.alibaba.nacos.naming.misc.UtilsAndCommons; +import com.alibaba.nacos.naming.pojo.Record; +import com.alibaba.nacos.naming.utils.Constants; +import com.google.protobuf.ByteString; +import org.apache.commons.lang3.reflect.TypeUtils; + +import java.lang.reflect.Type; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * New service data persistence handler. + * + * @author liaochuntao + */ +public abstract class BasePersistentServiceProcessor extends RequestProcessor4CP + implements PersistentConsistencyService { + + enum Op { + /** + * write ops. + */ + Write("Write"), + + /** + * read ops. + */ + Read("Read"), + + /** + * delete ops. + */ + Delete("Delete"); + + protected final String desc; + + Op(String desc) { + this.desc = desc; + } + } + + protected final KvStorage kvStorage; + + protected final Serializer serializer; + + /** + * Whether an unrecoverable error occurred. + */ + protected volatile boolean hasError = false; + + /** + * If use old raft, should not notify listener even new listener add. + */ + protected volatile boolean startNotify = false; + + /** + * During snapshot processing, the processing of other requests needs to be paused. + */ + protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + protected final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + + protected final ClusterVersionJudgement versionJudgement; + + protected final PersistentNotifier notifier; + + public BasePersistentServiceProcessor(final ClusterVersionJudgement judgement) throws Exception { + this.versionJudgement = judgement; + this.kvStorage = new NamingKvStorage(Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString()); + this.serializer = SerializeFactory.getSerializer("JSON"); + this.notifier = new PersistentNotifier(key -> { + try { + byte[] data = kvStorage.get(ByteUtils.toBytes(key)); + Datum datum = serializer.deserialize(data, getDatumTypeFromKey(key)); + return null != datum ? datum.value : null; + } catch (KvStorageException ex) { + throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg()); + } + }); + afterConstruct(); + } + + @SuppressWarnings("unchecked") + protected void afterConstruct() { + NotifyCenter.registerToPublisher(ValueChangeEvent.class, 16384); + listenOldRaftClose(); + } + + private void listenOldRaftClose() { + this.versionJudgement.registerObserver(isNewVersion -> { + if (isNewVersion) { + NotifyCenter.registerSubscriber(notifier); + startNotify = true; + } + }, 10); + } + + @Override + public Response onRequest(ReadRequest request) { + final List keys = serializer + .deserialize(request.getData().toByteArray(), TypeUtils.parameterize(List.class, byte[].class)); + final Lock lock = readLock; + lock.lock(); + try { + final Map result = kvStorage.batchGet(keys); + final BatchReadResponse response = new BatchReadResponse(); + result.forEach(response::append); + return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(serializer.serialize(response))) + .build(); + } catch (KvStorageException e) { + return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build(); + } finally { + lock.unlock(); + } + } + + @Override + public Response onApply(WriteRequest request) { + final byte[] data = request.getData().toByteArray(); + final BatchWriteRequest bwRequest = serializer.deserialize(data, BatchWriteRequest.class); + final Op op = Op.valueOf(request.getOperation()); + final Lock lock = readLock; + lock.lock(); + try { + switch (op) { + case Write: + kvStorage.batchPut(bwRequest.getKeys(), bwRequest.getValues()); + break; + case Delete: + kvStorage.batchDelete(bwRequest.getKeys()); + break; + default: + return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + op).build(); + } + publishValueChangeEvent(op, bwRequest); + return Response.newBuilder().setSuccess(true).build(); + } catch (KvStorageException e) { + return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build(); + } finally { + lock.unlock(); + } + } + + private void publishValueChangeEvent(final Op op, final BatchWriteRequest request) { + final List keys = request.getKeys(); + final List values = request.getValues(); + for (int i = 0; i < keys.size(); i++) { + final String key = new String(keys.get(i)); + final Datum datum = serializer.deserialize(values.get(i), getDatumTypeFromKey(key)); + final Record value = null != datum ? datum.value : null; + final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value) + .action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build(); + NotifyCenter.publishEvent(event); + } + } + + @Override + public String group() { + return Constants.NAMING_PERSISTENT_SERVICE_GROUP; + } + + @Override + public List loadSnapshotOperate() { + return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, lock)); + } + + @Override + public void onError(Throwable error) { + super.onError(error); + hasError = true; + } + + protected Type getDatumTypeFromKey(String key) { + return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key)); + } + + protected Class getClassOfRecordFromKey(String key) { + if (KeyBuilder.matchSwitchKey(key)) { + return com.alibaba.nacos.naming.misc.SwitchDomain.class; + } else if (KeyBuilder.matchServiceMetaKey(key)) { + return com.alibaba.nacos.naming.core.Service.class; + } else if (KeyBuilder.matchInstanceListKey(key)) { + return com.alibaba.nacos.naming.core.Instances.class; + } + return Record.class; + } + + protected void notifierDatumIfAbsent(String key, RecordListener listener) throws NacosException { + if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) { + notifierAllServiceMeta(listener); + } else { + Datum datum = get(key); + if (null != datum) { + notifierDatum(key, datum, listener); + } + } + } + + /** + * This notify should only notify once during startup. See {@link com.alibaba.nacos.naming.core.ServiceManager#init()} + */ + private void notifierAllServiceMeta(RecordListener listener) throws NacosException { + for (byte[] each : kvStorage.allKeys()) { + String key = new String(each); + if (listener.interests(key)) { + Datum datum = get(key); + if (null != datum) { + notifierDatum(key, datum, listener); + } + } + } + } + + private void notifierDatum(String key, Datum datum, RecordListener listener) { + try { + listener.onChange(key, datum.value); + } catch (Exception e) { + Loggers.RAFT.error("NACOS-RAFT failed to notify listener", e); + } + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java index 7ea4d0201..72df2bdeb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java @@ -1,151 +1,74 @@ /* - * Copyright 1999-2018 Alibaba Group Holding Ltd. + * Copyright 1999-2018 Alibaba Group Holding Ltd. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.alibaba.nacos.naming.consistency.persistent.impl; import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.utils.ByteUtils; import com.alibaba.nacos.common.utils.StringUtils; -import com.alibaba.nacos.consistency.DataOperation; -import com.alibaba.nacos.consistency.SerializeFactory; -import com.alibaba.nacos.consistency.Serializer; import com.alibaba.nacos.consistency.cp.CPProtocol; -import com.alibaba.nacos.consistency.cp.RequestProcessor4CP; import com.alibaba.nacos.consistency.cp.MetadataKey; import com.alibaba.nacos.consistency.entity.ReadRequest; import com.alibaba.nacos.consistency.entity.Response; import com.alibaba.nacos.consistency.entity.WriteRequest; -import com.alibaba.nacos.consistency.snapshot.SnapshotOperation; import com.alibaba.nacos.core.distributed.ProtocolManager; import com.alibaba.nacos.core.exception.ErrorCode; -import com.alibaba.nacos.core.exception.KvStorageException; -import com.alibaba.nacos.core.storage.kv.KvStorage; import com.alibaba.nacos.naming.consistency.Datum; -import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.naming.consistency.RecordListener; -import com.alibaba.nacos.naming.consistency.ValueChangeEvent; import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; -import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService; -import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier; import com.alibaba.nacos.naming.misc.Loggers; -import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.pojo.Record; import com.alibaba.nacos.naming.utils.Constants; import com.alibaba.nacos.sys.env.EnvUtil; import com.google.protobuf.ByteString; -import org.apache.commons.lang3.reflect.TypeUtils; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; -import java.lang.reflect.Type; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * New service data persistence handler. + * In cluster mode, start the Raft protocol. * * @author liaochuntao */ @SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule") +@ConditionalOnProperty(value = "nacos.standalone", havingValue = "false") @Service -public class PersistentServiceProcessor extends RequestProcessor4CP implements PersistentConsistencyService { - - enum Op { - /** - * write ops. - */ - Write("Write"), - - /** - * read ops. - */ - Read("Read"), - - /** - * delete ops. - */ - Delete("Delete"); - - private final String desc; - - Op(String desc) { - this.desc = desc; - } - } +public class PersistentServiceProcessor extends BasePersistentServiceProcessor { private final CPProtocol protocol; - private final KvStorage kvStorage; - - private final ClusterVersionJudgement versionJudgement; - - private final Serializer serializer; - - /** - * During snapshot processing, the processing of other requests needs to be paused. - */ - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); - - private final PersistentNotifier notifier; - /** * Is there a leader node currently. */ private volatile boolean hasLeader = false; - /** - * Whether an unrecoverable error occurred. - */ - private volatile boolean hasError = false; - - /** - * If use old raft, should not notify listener even new listener add. - */ - private volatile boolean startNotify = false; - - public PersistentServiceProcessor(final ProtocolManager protocolManager, - final ClusterVersionJudgement versionJudgement) throws Exception { + public PersistentServiceProcessor(ProtocolManager protocolManager, ClusterVersionJudgement versionJudgement) + throws Exception { + super(versionJudgement); this.protocol = protocolManager.getCpProtocol(); - this.versionJudgement = versionJudgement; - this.kvStorage = new NamingKvStorage(Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString()); - this.serializer = SerializeFactory.getSerializer("JSON"); - this.notifier = new PersistentNotifier(key -> { - try { - byte[] data = kvStorage.get(ByteUtils.toBytes(key)); - Datum datum = serializer.deserialize(data, getDatumTypeFromKey(key)); - return null != datum ? datum.value : null; - } catch (KvStorageException ex) { - throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg()); - } - }); - init(); } - @SuppressWarnings("unchecked") - private void init() { - NotifyCenter.registerToPublisher(ValueChangeEvent.class, 16384); + @Override + protected void afterConstruct() { + super.afterConstruct(); this.protocol.addLogProcessors(Collections.singletonList(this)); this.protocol.protocolMetaData() .subscribe(Constants.NAMING_PERSISTENT_SERVICE_GROUP, MetadataKey.LEADER_META_DATA, @@ -155,13 +78,6 @@ public class PersistentServiceProcessor extends RequestProcessor4CP implements P NotifyCenter.registerSubscriber(notifier); waitLeader(); startNotify = true; - } else { - this.versionJudgement.registerObserver(isNewVersion -> { - if (isNewVersion) { - NotifyCenter.registerSubscriber(notifier); - startNotify = true; - } - }, 10); } } @@ -175,75 +91,6 @@ public class PersistentServiceProcessor extends RequestProcessor4CP implements P } } - @Override - public Response onRequest(ReadRequest request) { - final List keys = serializer - .deserialize(request.getData().toByteArray(), TypeUtils.parameterize(List.class, byte[].class)); - final Lock lock = readLock; - lock.lock(); - try { - final Map result = kvStorage.batchGet(keys); - final BatchReadResponse response = new BatchReadResponse(); - result.forEach(response::append); - return Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom(serializer.serialize(response))) - .build(); - } catch (KvStorageException e) { - return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build(); - } finally { - lock.unlock(); - } - } - - @Override - public Response onApply(WriteRequest request) { - final byte[] data = request.getData().toByteArray(); - final BatchWriteRequest bwRequest = serializer.deserialize(data, BatchWriteRequest.class); - final Op op = Op.valueOf(request.getOperation()); - final Lock lock = readLock; - lock.lock(); - try { - switch (op) { - case Write: - kvStorage.batchPut(bwRequest.getKeys(), bwRequest.getValues()); - break; - case Delete: - kvStorage.batchDelete(bwRequest.getKeys()); - break; - default: - return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + op).build(); - } - publishValueChangeEvent(op, bwRequest); - return Response.newBuilder().setSuccess(true).build(); - } catch (KvStorageException e) { - return Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build(); - } finally { - lock.unlock(); - } - } - - private void publishValueChangeEvent(final Op op, final BatchWriteRequest request) { - final List keys = request.getKeys(); - final List values = request.getValues(); - for (int i = 0; i < keys.size(); i++) { - final String key = new String(keys.get(i)); - final Datum datum = serializer.deserialize(values.get(i), getDatumTypeFromKey(key)); - final Record value = null != datum ? datum.value : null; - final ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value) - .action(Op.Delete.equals(op) ? DataOperation.DELETE : DataOperation.CHANGE).build(); - NotifyCenter.publishEvent(event); - } - } - - @Override - public String group() { - return Constants.NAMING_PERSISTENT_SERVICE_GROUP; - } - - @Override - public List loadSnapshotOperate() { - return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, lock)); - } - @Override public void put(String key, Record value) throws NacosException { final BatchWriteRequest req = new BatchWriteRequest(); @@ -304,63 +151,8 @@ public class PersistentServiceProcessor extends RequestProcessor4CP implements P notifier.deregisterListener(key, listener); } - @Override - public void onError(Throwable error) { - super.onError(error); - hasError = true; - } - @Override public boolean isAvailable() { return hasLeader && !hasError; } - - private Type getDatumTypeFromKey(String key) { - return TypeUtils.parameterize(Datum.class, getClassOfRecordFromKey(key)); - } - - private Class getClassOfRecordFromKey(String key) { - if (KeyBuilder.matchSwitchKey(key)) { - return com.alibaba.nacos.naming.misc.SwitchDomain.class; - } else if (KeyBuilder.matchServiceMetaKey(key)) { - return com.alibaba.nacos.naming.core.Service.class; - } else if (KeyBuilder.matchInstanceListKey(key)) { - return com.alibaba.nacos.naming.core.Instances.class; - } - return Record.class; - } - - private void notifierDatumIfAbsent(String key, RecordListener listener) throws NacosException { - if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) { - notifierAllServiceMeta(listener); - } else { - Datum datum = get(key); - if (null != datum) { - notifierDatum(key, datum, listener); - } - } - } - - /** - * This notify should only notify once during startup. See {@link com.alibaba.nacos.naming.core.ServiceManager#init()} - */ - private void notifierAllServiceMeta(RecordListener listener) throws NacosException { - for (byte[] each : kvStorage.allKeys()) { - String key = new String(each); - if (listener.interests(key)) { - Datum datum = get(key); - if (null != datum) { - notifierDatum(key, datum, listener); - } - } - } - } - - private void notifierDatum(String key, Datum datum, RecordListener listener) { - try { - listener.onChange(key, datum.value); - } catch (Exception e) { - Loggers.RAFT.error("NACOS-RAFT failed to notify listener", e); - } - } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/StandalonePersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/StandalonePersistentServiceProcessor.java new file mode 100644 index 000000000..1ce3cc46e --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/StandalonePersistentServiceProcessor.java @@ -0,0 +1,114 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.consistency.persistent.impl; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.utils.ByteUtils; +import com.alibaba.nacos.consistency.entity.ReadRequest; +import com.alibaba.nacos.consistency.entity.Response; +import com.alibaba.nacos.consistency.entity.WriteRequest; +import com.alibaba.nacos.core.exception.ErrorCode; +import com.alibaba.nacos.naming.consistency.Datum; +import com.alibaba.nacos.naming.consistency.RecordListener; +import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; +import com.alibaba.nacos.naming.pojo.Record; +import com.alibaba.nacos.naming.utils.Constants; +import com.google.protobuf.ByteString; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +import java.util.Collections; +import java.util.List; + +/** + * Persistent service manipulation layer in stand-alone mode. + * + * @author liaochuntao + */ +@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule") +@ConditionalOnProperty(value = "nacos.standalone", havingValue = "true") +@Service +public class StandalonePersistentServiceProcessor extends BasePersistentServiceProcessor { + + public StandalonePersistentServiceProcessor(final ClusterVersionJudgement judgement) throws Exception { + super(judgement); + } + + @Override + public void put(String key, Record value) throws NacosException { + final BatchWriteRequest req = new BatchWriteRequest(); + Datum datum = Datum.createDatum(key, value); + req.append(ByteUtils.toBytes(key), serializer.serialize(datum)); + final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) + .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build(); + try { + onApply(request); + } catch (Exception e) { + throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); + } + } + + @Override + public void remove(String key) throws NacosException { + final BatchWriteRequest req = new BatchWriteRequest(); + req.append(ByteUtils.toBytes(key), ByteUtils.EMPTY); + final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) + .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build(); + try { + onApply(request); + } catch (Exception e) { + throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); + } + } + + @Override + public Datum get(String key) throws NacosException { + final List keys = Collections.singletonList(ByteUtils.toBytes(key)); + final ReadRequest req = ReadRequest.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP) + .setData(ByteString.copyFrom(serializer.serialize(keys))).build(); + try { + final Response resp = onRequest(req); + if (resp.getSuccess()) { + BatchReadResponse response = serializer + .deserialize(resp.getData().toByteArray(), BatchReadResponse.class); + final List rValues = response.getValues(); + return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key)); + } + throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg()); + } catch (Throwable e) { + throw new NacosException(ErrorCode.ProtoReadError.getCode(), e.getMessage()); + } + } + + @Override + public void listen(String key, RecordListener listener) throws NacosException { + notifier.registerListener(key, listener); + if (startNotify) { + notifierDatumIfAbsent(key, listener); + } + } + + @Override + public void unListen(String key, RecordListener listener) throws NacosException { + notifier.deregisterListener(key, listener); + } + + @Override + public boolean isAvailable() { + return !hasError; + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java index b777f13eb..aeafeeb21 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java @@ -127,6 +127,8 @@ public class RaftCore implements Closeable { private final EventPublisher publisher; + private final RaftListener raftListener; + private boolean initialized = false; private volatile boolean stopWork = false; @@ -136,7 +138,7 @@ public class RaftCore implements Closeable { private ScheduledFuture heartbeatTask = null; public RaftCore(RaftPeerSet peers, SwitchDomain switchDomain, GlobalConfig globalConfig, RaftProxy raftProxy, - RaftStore raftStore, ClusterVersionJudgement versionJudgement) { + RaftStore raftStore, ClusterVersionJudgement versionJudgement, RaftListener raftListener) { this.peers = peers; this.switchDomain = switchDomain; this.globalConfig = globalConfig; @@ -145,6 +147,7 @@ public class RaftCore implements Closeable { this.versionJudgement = versionJudgement; this.notifier = new PersistentNotifier(key -> null == getDatum(key) ? null : getDatum(key).value); this.publisher = NotifyCenter.registerToPublisher(ValueChangeEvent.class, 16384); + this.raftListener = raftListener; } /** @@ -175,6 +178,7 @@ public class RaftCore implements Closeable { if (stopWork) { try { shutdown(); + raftListener.removeOldRaftMetadata(); } catch (NacosException e) { throw new NacosRuntimeException(NacosException.SERVER_ERROR, e); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftListener.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftListener.java index fa3a0d37d..11d545147 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftListener.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftListener.java @@ -21,12 +21,15 @@ import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement; import com.alibaba.nacos.naming.misc.Loggers; +import com.alibaba.nacos.naming.utils.Constants; import org.springframework.context.ApplicationEvent; import org.springframework.context.event.SmartApplicationListener; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Inject the raft information from the naming module into the outlier information of the node. @@ -38,14 +41,21 @@ import java.util.Map; @Component public class RaftListener implements SmartApplicationListener { - private static final String GROUP = "naming"; - private final ServerMemberManager memberManager; private final ClusterVersionJudgement versionJudgement; private volatile boolean stopUpdate = false; + /** + * Avoid multithreading mode. Old Raft information data cannot be properly removed. + */ + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + + private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + public RaftListener(ServerMemberManager memberManager, ClusterVersionJudgement versionJudgement) { this.memberManager = memberManager; this.versionJudgement = versionJudgement; @@ -54,12 +64,15 @@ public class RaftListener implements SmartApplicationListener { private void init() { this.versionJudgement.registerObserver(isAllNewVersion -> { - stopUpdate = isAllNewVersion; - if (stopUpdate) { - Loggers.RAFT.warn("start to move old raft protocol metadata"); - Member self = memberManager.getSelf(); - self.delExtendVal(GROUP); - memberManager.update(self); + final Lock lock = this.writeLock; + lock.lock(); + try { + stopUpdate = isAllNewVersion; + if (stopUpdate) { + removeOldRaftMetadata(); + } + } finally { + lock.unlock(); } }, -2); } @@ -71,14 +84,30 @@ public class RaftListener implements SmartApplicationListener { @Override public void onApplicationEvent(ApplicationEvent event) { - if (event instanceof BaseRaftEvent && !stopUpdate) { - BaseRaftEvent raftEvent = (BaseRaftEvent) event; - RaftPeer local = raftEvent.getLocal(); - String json = JacksonUtils.toJson(local); - Map map = JacksonUtils.toObj(json, HashMap.class); - Member self = memberManager.getSelf(); - self.setExtendVal(GROUP, map); - memberManager.update(self); + final Lock lock = readLock; + lock.lock(); + try { + if (event instanceof BaseRaftEvent && !stopUpdate) { + BaseRaftEvent raftEvent = (BaseRaftEvent) event; + RaftPeer local = raftEvent.getLocal(); + String json = JacksonUtils.toJson(local); + Map map = JacksonUtils.toObj(json, HashMap.class); + Member self = memberManager.getSelf(); + self.setExtendVal(Constants.OLD_NAMING_RAFT_GROUP, map); + memberManager.update(self); + } + if (stopUpdate) { + removeOldRaftMetadata(); + } + } finally { + lock.unlock(); } } + + void removeOldRaftMetadata() { + Loggers.RAFT.warn("start to move old raft protocol metadata"); + Member self = memberManager.getSelf(); + self.delExtendVal(Constants.OLD_NAMING_RAFT_GROUP); + memberManager.update(self); + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java index 2804d033f..cb0311bbf 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java @@ -139,8 +139,8 @@ public class GlobalExecutor { NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.MILLISECONDS); } - public static void submitClusterVersionJudge(Runnable runnable, long delay) { - NAMING_TIMER_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS); + public static ScheduledFuture submitClusterVersionJudge(Runnable runnable, long delay) { + return NAMING_TIMER_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS); } public static void submitDistroNotifyTask(Runnable runnable) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/utils/Constants.java b/naming/src/main/java/com/alibaba/nacos/naming/utils/Constants.java index 45cf3f795..830ff6716 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/utils/Constants.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/utils/Constants.java @@ -26,6 +26,8 @@ public final class Constants { private Constants() {} + public static final String OLD_NAMING_RAFT_GROUP = "naming"; + public static final String NAMING_PERSISTENT_SERVICE_GROUP = "naming_persistent_service"; public static final String NACOS_NAMING_USE_NEW_RAFT_FIRST = "nacos.naming.use-new-raft.first";