From f9412fd4d64f0105c672c10e745d2d7436c8d627 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Mon, 18 May 2020 22:57:40 +0800 Subject: [PATCH 1/5] fix: fix config dump --- .../nacos/common/utils/Observable.java | 24 +- .../nacos/common/utils/ThreadUtils.java | 8 + .../ConfigModuleInitializeReporter.java | 74 +++++++ .../server/service/dump/DumpService.java | 208 ++++++++++-------- .../nacos/consistency/ProtocolMetaData.java | 49 ++--- .../core/code/ModuleInitializeReporter.java | 56 +++++ .../StartingSpringApplicationRunListener.java | 37 +++- .../core/distributed/ProtocolManager.java | 10 - .../nacos/core/utils/TimerContext.java | 2 +- 9 files changed, 315 insertions(+), 153 deletions(-) create mode 100644 config/src/main/java/com/alibaba/nacos/config/server/service/ConfigModuleInitializeReporter.java create mode 100644 core/src/main/java/com/alibaba/nacos/core/code/ModuleInitializeReporter.java diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/Observable.java b/common/src/main/java/com/alibaba/nacos/common/utils/Observable.java index ecf84d974..583fb81cc 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/Observable.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/Observable.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.common.utils; +import java.util.Objects; import java.util.Set; /** @@ -25,6 +26,8 @@ public class Observable { private transient boolean changed = false; private transient Set obs = new ConcurrentHashSet<>(); + private volatile int observerCnt = 0; + private boolean alreadyAddObserver = false; /** * Adds an observer to the set of observers for this object, provided @@ -35,11 +38,14 @@ public class Observable { * @param o an observer to be added. * @throws NullPointerException if the parameter o is null. */ - public void addObserver(Observer o) { - if (o == null) { - throw new NullPointerException(); - } + public synchronized void addObserver(Observer o) { + Objects.requireNonNull(o, "Observer"); obs.add(o); + observerCnt ++; + if (!alreadyAddObserver) { + notifyAll(); + } + alreadyAddObserver = true; } /** @@ -47,8 +53,9 @@ public class Observable { * Passing {@code null} to this method will have no effect. * @param o the observer to be deleted. */ - public void deleteObserver(Observer o) { + public synchronized void deleteObserver(Observer o) { obs.remove(o); + observerCnt --; } /** @@ -96,6 +103,9 @@ public class Observable { return; } clearChanged(); + if (!alreadyAddObserver) { + ThreadUtils.objectWait(this); + } } for (Observer observer : obs) { @@ -150,8 +160,8 @@ public class Observable { * @return the number of observers of this object. */ public int countObservers() { - return obs.size(); + return observerCnt; } -} +} \ No newline at end of file diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java index bed462a18..4a82748dd 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java @@ -24,6 +24,14 @@ import java.util.concurrent.TimeUnit; */ public final class ThreadUtils { + public static void objectWait(Object object) { + try { + object.wait(); + } catch (InterruptedException ignore) { + Thread.interrupted(); + } + } + public static void sleep(long millis) { try { Thread.sleep(millis); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigModuleInitializeReporter.java b/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigModuleInitializeReporter.java new file mode 100644 index 000000000..62ec22cd7 --- /dev/null +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigModuleInitializeReporter.java @@ -0,0 +1,74 @@ +/* + * + * * Copyright 1999-2018 Alibaba Group Holding Ltd. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.nacos.config.server.service; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.config.server.Config; +import com.alibaba.nacos.core.code.ModuleInitializeReporter; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Configure the initialization completion debrief for the module + * + * @author liaochuntao + */ +@Component +public class ConfigModuleInitializeReporter implements ModuleInitializeReporter { + + private Map successRecord = new ConcurrentHashMap<>(4); + private Map exceptionRecord = new ConcurrentHashMap<>(4); + + public void setSuccess(String name, boolean success) { + successRecord.put(name, success); + } + + public void setEx(String name, Throwable ex) { + exceptionRecord.put(name, ex); + } + + @Override + public boolean alreadyInitialized() { + boolean[] initialize = new boolean[] { !successRecord.isEmpty() }; + successRecord.forEach((group, isOk) -> initialize[0] &= isOk); + return initialize[0] && exceptionRecord.isEmpty(); + } + + @Override + public boolean hasException() { + return !exceptionRecord.isEmpty(); + } + + @Override + public Throwable getError() { + StringBuilder errorBuilder = new StringBuilder(); + exceptionRecord.forEach((s, throwable) -> errorBuilder.append("[").append(s).append("]") + .append(":").append(throwable.toString()) + .append(StringUtils.LF)); + return new NacosException(NacosException.SERVER_ERROR, errorBuilder.toString()); + } + + @Override + public String group() { + return Config.class.getName(); + } +} diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java index d71a4c0ec..90b6d4b10 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java @@ -26,6 +26,7 @@ import com.alibaba.nacos.config.server.model.ConfigInfoAggr; import com.alibaba.nacos.config.server.model.ConfigInfoChanged; import com.alibaba.nacos.config.server.model.ConfigInfoWrapper; import com.alibaba.nacos.config.server.model.Page; +import com.alibaba.nacos.config.server.service.ConfigModuleInitializeReporter; import com.alibaba.nacos.config.server.service.ConfigService; import com.alibaba.nacos.config.server.service.DiskUtil; import com.alibaba.nacos.config.server.service.DynamicDataSource; @@ -45,10 +46,10 @@ import com.alibaba.nacos.core.distributed.raft.exception.NoSuchRaftGroupExceptio import com.alibaba.nacos.core.utils.ApplicationUtils; import com.alibaba.nacos.core.utils.GlobalExecutor; import com.alibaba.nacos.core.utils.InetUtils; +import com.alibaba.nacos.core.utils.TimerContext; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @@ -62,7 +63,6 @@ import java.util.Calendar; import java.util.List; import java.util.Objects; import java.util.Random; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -76,6 +76,7 @@ import static com.alibaba.nacos.config.server.utils.LogUtil.fatalLog; @Service public class DumpService { + private final ConfigModuleInitializeReporter reporter; private final PersistService persistService; private final ServerMemberManager memberManager; private final ProtocolManager protocolManager; @@ -84,12 +85,14 @@ public class DumpService { * Here you inject the dependent objects constructively, ensuring that some * of the dependent functionality is initialized ahead of time * - * @param persistService {@link PersistService} - * @param memberManager {@link ServerMemberManager} + * @param reporter {@link ConfigModuleInitializeReporter} + * @param persistService {@link PersistService} + * @param memberManager {@link ServerMemberManager} * @param protocolManager {@link ProtocolManager} */ - public DumpService(PersistService persistService, ServerMemberManager memberManager, - ProtocolManager protocolManager) { + public DumpService(ConfigModuleInitializeReporter reporter, PersistService persistService, + ServerMemberManager memberManager, ProtocolManager protocolManager) { + this.reporter = reporter; this.persistService = persistService; this.memberManager = memberManager; this.protocolManager = protocolManager; @@ -109,6 +112,9 @@ public class DumpService { @PostConstruct protected void init() throws Exception { + // I haven't initialized myself yet + this.reporter.setSuccess(DumpService.class.getName(), false); + DynamicDataSource.getInstance().getDataSource(); DumpProcessor processor = new DumpProcessor(this); @@ -156,116 +162,126 @@ public class DumpService { com.alibaba.nacos.consistency.cp.Constants.LEADER_META_DATA, observer); - // When all dump is complete, allow the following flow } else { - dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor); + dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, + dumpAllTagProcessor); } } private void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor, DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) { - LogUtil.defaultLog.warn("DumpService start"); + TimerContext.start("config dump job"); + try { + LogUtil.defaultLog.warn("DumpService start"); - Runnable dumpAll = () -> dumpAllTaskMgr - .addTask(DumpAllTask.TASK_ID, new DumpAllTask()); + Runnable dumpAll = () -> dumpAllTaskMgr + .addTask(DumpAllTask.TASK_ID, new DumpAllTask()); - Runnable dumpAllBeta = () -> dumpAllTaskMgr - .addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask()); + Runnable dumpAllBeta = () -> dumpAllTaskMgr + .addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask()); - Runnable clearConfigHistory = () -> { - log.warn("clearConfigHistory start"); - if (canExecute()) { - try { - Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), - 24 * getRetentionDays()); - int totalCount = persistService - .findConfigHistoryCountByTime(startTime); - if (totalCount > 0) { - int pageSize = 1000; - int removeTime = (totalCount + pageSize - 1) / pageSize; - log.warn( - "clearConfigHistory, getBeforeStamp:{}, totalCount:{}, pageSize:{}, removeTime:{}", - new Object[] { startTime, totalCount, pageSize, - removeTime }); - while (removeTime > 0) { - // 分页删除,以免批量太大报错 - persistService.removeConfigHistory(startTime, pageSize); - removeTime--; + Runnable clearConfigHistory = () -> { + log.warn("clearConfigHistory start"); + if (canExecute()) { + try { + Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), + 24 * getRetentionDays()); + int totalCount = persistService + .findConfigHistoryCountByTime(startTime); + if (totalCount > 0) { + int pageSize = 1000; + int removeTime = (totalCount + pageSize - 1) / pageSize; + log.warn( + "clearConfigHistory, getBeforeStamp:{}, totalCount:{}, pageSize:{}, removeTime:{}", + startTime, totalCount, pageSize, removeTime); + while (removeTime > 0) { + // 分页删除,以免批量太大报错 + persistService.removeConfigHistory(startTime, pageSize); + removeTime--; + } } } - } - catch (Throwable e) { - log.error("clearConfigHistory error", e); - } - } - }; - - try { - dumpConfigInfo(dumpAllProcessor); - - // 更新beta缓存 - LogUtil.defaultLog.info("start clear all config-info-beta."); - DiskUtil.clearAllBeta(); - if (persistService.isExistTable(BETA_TABLE_NAME)) { - dumpAllBetaProcessor - .process(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask()); - } - // 更新Tag缓存 - LogUtil.defaultLog.info("start clear all config-info-tag."); - DiskUtil.clearAllTag(); - if (persistService.isExistTable(TAG_TABLE_NAME)) { - dumpAllTagProcessor.process(DumpAllTagTask.TASK_ID, new DumpAllTagTask()); - } - - // add to dump aggr - List configList = persistService.findAllAggrGroup(); - if (configList != null && !configList.isEmpty()) { - total = configList.size(); - List> splitList = splitList(configList, - INIT_THREAD_COUNT); - for (List list : splitList) { - MergeAllDataWorker work = new MergeAllDataWorker(list); - work.start(); - } - log.info("server start, schedule merge end."); - } - } - catch (Exception e) { - LogUtil.fatalLog - .error("Nacos Server did not start because dumpservice bean construction failure :\n" - + e.getMessage(), e.getCause()); - throw new RuntimeException( - "Nacos Server did not start because dumpservice bean construction failure :\n" - + e.getMessage()); - } - if (!ApplicationUtils.getStandaloneMode()) { - Runnable heartbeat = () -> { - String heartBeatTime = TimeUtils.getCurrentTime().toString(); - // write disk - try { - DiskUtil.saveHeartBeatToDisk(heartBeatTime); - } - catch (IOException e) { - LogUtil.fatalLog.error("save heartbeat fail" + e.getMessage()); + catch (Throwable e) { + log.error("clearConfigHistory error", e); + } } }; - TimerTaskService.scheduleWithFixedDelay(heartbeat, 0, 10, TimeUnit.SECONDS); + try { + dumpConfigInfo(dumpAllProcessor); - long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10; - LogUtil.defaultLog.warn("initialDelay:{}", initialDelay); + // 更新beta缓存 + LogUtil.defaultLog.info("start clear all config-info-beta."); + DiskUtil.clearAllBeta(); + if (persistService.isExistTable(BETA_TABLE_NAME)) { + dumpAllBetaProcessor + .process(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask()); + } + // 更新Tag缓存 + LogUtil.defaultLog.info("start clear all config-info-tag."); + DiskUtil.clearAllTag(); + if (persistService.isExistTable(TAG_TABLE_NAME)) { + dumpAllTagProcessor + .process(DumpAllTagTask.TASK_ID, new DumpAllTagTask()); + } - TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, - DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); + // add to dump aggr + List configList = persistService.findAllAggrGroup(); + if (configList != null && !configList.isEmpty()) { + total = configList.size(); + List> splitList = splitList(configList, + INIT_THREAD_COUNT); + for (List list : splitList) { + MergeAllDataWorker work = new MergeAllDataWorker(list); + work.start(); + } + log.info("server start, schedule merge end."); + } + } + catch (Exception e) { + LogUtil.fatalLog + .error("Nacos Server did not start because dumpservice bean construction failure :\n" + + e.getMessage(), e.getCause()); + throw new RuntimeException( + "Nacos Server did not start because dumpservice bean construction failure :\n" + + e.getMessage()); + } + if (!ApplicationUtils.getStandaloneMode()) { + Runnable heartbeat = () -> { + String heartBeatTime = TimeUtils.getCurrentTime().toString(); + // write disk + try { + DiskUtil.saveHeartBeatToDisk(heartBeatTime); + } + catch (IOException e) { + LogUtil.fatalLog.error("save heartbeat fail" + e.getMessage()); + } + }; - TimerTaskService.scheduleWithFixedDelay(dumpAllBeta, initialDelay, - DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); + TimerTaskService + .scheduleWithFixedDelay(heartbeat, 0, 10, TimeUnit.SECONDS); + + long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10; + LogUtil.defaultLog.warn("initialDelay:{}", initialDelay); + + TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, + DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); + + TimerTaskService.scheduleWithFixedDelay(dumpAllBeta, initialDelay, + DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); + } + + TimerTaskService + .scheduleWithFixedDelay(clearConfigHistory, 10, 10, TimeUnit.MINUTES); + reporter.setSuccess(DumpService.class.getName(), true); + } + catch (Throwable ex) { + reporter.setEx(DumpService.class.getName(), ex); + } finally { + TimerContext.end(LogUtil.defaultLog); } - - TimerTaskService - .scheduleWithFixedDelay(clearConfigHistory, 10, 10, TimeUnit.MINUTES); } diff --git a/consistency/src/main/java/com/alibaba/nacos/consistency/ProtocolMetaData.java b/consistency/src/main/java/com/alibaba/nacos/consistency/ProtocolMetaData.java index aa928cfc9..18ff691f3 100644 --- a/consistency/src/main/java/com/alibaba/nacos/consistency/ProtocolMetaData.java +++ b/consistency/src/main/java/com/alibaba/nacos/consistency/ProtocolMetaData.java @@ -16,6 +16,8 @@ package com.alibaba.nacos.consistency; +import com.alibaba.nacos.common.executor.ExecutorFactory; +import com.alibaba.nacos.common.executor.NameThreadFactory; import com.alibaba.nacos.common.utils.Observable; import com.alibaba.nacos.common.utils.Observer; import com.alibaba.nacos.common.utils.StringUtils; @@ -25,8 +27,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -39,7 +40,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; @SuppressWarnings("PMD.Rule:CollectionInitShouldAssignCapacityRule") public final class ProtocolMetaData { - private transient volatile boolean stopDefer = false; + private static final Executor EXECUTOR = ExecutorFactory.newFixExecutorService(ProtocolMetaData.class.getCanonicalName(), 4, new NameThreadFactory("nacos.consistency.protocol.metadata")); private Map metaDataMap = new ConcurrentHashMap<>(4); @@ -90,17 +91,9 @@ public final class ProtocolMetaData { .unSubscribe(key, observer); } - public void stopDeferPublish() { - stopDefer = true; - } - @SuppressWarnings("PMD.ThreadPoolCreationRule") public final class MetaData { - // Each biz does not affect each other - - private transient final ExecutorService executor = Executors.newSingleThreadExecutor(); - private final Map itemMap = new ConcurrentHashMap<>(8); private transient final String group; @@ -115,7 +108,7 @@ public final class ProtocolMetaData { void put(String key, Object value) { itemMap.computeIfAbsent(key, s -> { - ValueItem item = new ValueItem(this, group + "/" + key); + ValueItem item = new ValueItem(group + "/" + key); return item; }); ValueItem item = itemMap.get(key); @@ -130,7 +123,7 @@ public final class ProtocolMetaData { void subscribe(final String key, final Observer observer) { itemMap.computeIfAbsent(key, s -> { - ValueItem item = new ValueItem(this, group + "/" + key); + ValueItem item = new ValueItem(group + "/" + key); return item; }); final ValueItem item = itemMap.get(key); @@ -149,7 +142,6 @@ public final class ProtocolMetaData { public final class ValueItem extends Observable { - private transient final MetaData holder; private transient final String path; private transient final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private transient final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); @@ -157,8 +149,7 @@ public final class ProtocolMetaData { private volatile Object data; private transient BlockingQueue deferObject = new LinkedBlockingQueue<>(); - public ValueItem(MetaData holder, String path) { - this.holder = holder; + public ValueItem(String path) { this.path = path; } @@ -178,28 +169,16 @@ public final class ProtocolMetaData { deferObject.offer(data); setChanged(); - Runnable runnable = new Runnable() { - @Override - public void run() { - if (countObservers() == 0 && !stopDefer) { - holder.executor.submit(this); - return; - } - try { - notifyObservers(deferObject.take()); - } catch (InterruptedException ignore) { - Thread.interrupted(); - } + EXECUTOR.execute(() -> { + try { + notifyObservers(deferObject.take()); + } catch (InterruptedException ignore) { + Thread.interrupted(); } - }; - notifySubscriber(runnable); + }); } finally { writeLock.unlock(); } } - - private void notifySubscriber(Runnable runnable) { - holder.executor.submit(runnable); - } } -} +} \ No newline at end of file diff --git a/core/src/main/java/com/alibaba/nacos/core/code/ModuleInitializeReporter.java b/core/src/main/java/com/alibaba/nacos/core/code/ModuleInitializeReporter.java new file mode 100644 index 000000000..a9ebc1ec8 --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/code/ModuleInitializeReporter.java @@ -0,0 +1,56 @@ +/* + * + * * Copyright 1999-2018 Alibaba Group Holding Ltd. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.nacos.core.code; + +/** + * Record whether the associated module has been initialized + * + * @author liaochuntao + */ +public interface ModuleInitializeReporter { + + /** + * Whether the initialization of the module is completed + * + * @return {@link Boolean} + */ + boolean alreadyInitialized(); + + /** + * Determines if there is an exception to terminate the program during initialization + * + * @return {@link Boolean} + */ + boolean hasException(); + + /** + * get exception + * + * @return {@link Throwable} + */ + Throwable getError(); + + /** + * The name of the module + * + * @return module name + */ + String group(); + +} diff --git a/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java b/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java index 2f6ea1b85..f92d9ec55 100644 --- a/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java +++ b/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java @@ -15,6 +15,7 @@ */ package com.alibaba.nacos.core.code; +import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.common.executor.ExecutorFactory; import com.alibaba.nacos.common.executor.NameThreadFactory; import com.alibaba.nacos.common.executor.ThreadPoolManager; @@ -35,7 +36,10 @@ import org.springframework.core.env.ConfigurableEnvironment; import java.io.File; import java.io.IOException; import java.nio.file.Paths; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -111,8 +115,31 @@ public class StartingSpringApplicationRunListener @Override public void started(ConfigurableApplicationContext context) { starting = false; - ConfigurableEnvironment env = context.getEnvironment(); + Collection reporters = context.getBeansOfType( + ModuleInitializeReporter.class).values(); + Set initializingModules = reporters.stream().collect(HashSet::new, (m, v) -> m.add(v.group()), HashSet::addAll); + + for ( ; ; ) { + for (ModuleInitializeReporter reporter : reporters) { + if (reporter.hasException()) { + failed(context, new NacosException(NacosException.SERVER_ERROR, reporter.getError())); + return; + } + + boolean finishInitialize = reporter.alreadyInitialized(); + + if (finishInitialize) { + LOGGER.info("{} finished initialize", reporter.group()); + initializingModules.remove(reporter.group()); + } + } + + if (initializingModules.isEmpty()) { + break; + } + + } closeExecutor(); @@ -152,14 +179,16 @@ public class StartingSpringApplicationRunListener LOGGER.error("Startup errors : {}", exception); - LOGGER.error("Nacos failed to start, please see {} for more details.", - Paths.get(ApplicationUtils.getNacosHome(), "logs/nacos.log")); - ThreadPoolManager.shutdown(); WatchFileCenter.shutdown(); NotifyCenter.shutdown(); closeExecutor(); + + context.close(); + + LOGGER.error("Nacos failed to start, please see {} for more details.", + Paths.get(ApplicationUtils.getNacosHome(), "logs/nacos.log")); } /** diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java b/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java index 98adde118..aece6a53b 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolManager.java @@ -102,16 +102,6 @@ public class ProtocolManager @Override public void onApplicationEvent(ContextStartedEvent event) { - stopDeferPublish(); - } - - public void stopDeferPublish() { - if (Objects.nonNull(apProtocol)) { - apProtocol.protocolMetaData().stopDeferPublish(); - } - if (Objects.nonNull(cpProtocol)) { - cpProtocol.protocolMetaData().stopDeferPublish(); - } } private void initAPProtocol() { diff --git a/core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java b/core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java index f7580250b..6ab0a90df 100644 --- a/core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java +++ b/core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java @@ -39,7 +39,7 @@ public class TimerContext { public static void end(final Logger logger) { long endTime = System.currentTimeMillis(); Pair record = TIME_RECORD.get(); - LoggerUtils.printIfDebugEnabled(logger, "{} cost time : {} ms", record.getFirst(), (endTime - record.getSecond())); + logger.info("{} cost time : {} ms", record.getFirst(), (endTime - record.getSecond())); TIME_RECORD.remove(); } From 610024dd079790795f2b6229cdbe3b88261b4e02 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 19 May 2020 10:21:22 +0800 Subject: [PATCH 2/5] refactor: remove unuse dir --- .../code/StartingSpringApplicationRunListener.java | 10 ++++------ .../com/alibaba/nacos/core/utils/TimerContext.java | 1 - 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java b/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java index f92d9ec55..33a9457d0 100644 --- a/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java +++ b/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java @@ -120,10 +120,11 @@ public class StartingSpringApplicationRunListener ModuleInitializeReporter.class).values(); Set initializingModules = reporters.stream().collect(HashSet::new, (m, v) -> m.add(v.group()), HashSet::addAll); - for ( ; ; ) { + do { for (ModuleInitializeReporter reporter : reporters) { if (reporter.hasException()) { - failed(context, new NacosException(NacosException.SERVER_ERROR, reporter.getError())); + failed(context, new NacosException(NacosException.SERVER_ERROR, + reporter.getError())); return; } @@ -135,11 +136,8 @@ public class StartingSpringApplicationRunListener } } - if (initializingModules.isEmpty()) { - break; - } - } + while (!initializingModules.isEmpty()); closeExecutor(); diff --git a/core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java b/core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java index 6ab0a90df..7ed282e4a 100644 --- a/core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java +++ b/core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java @@ -16,7 +16,6 @@ package com.alibaba.nacos.core.utils; -import com.alibaba.nacos.common.utils.LoggerUtils; import com.alibaba.nacos.common.utils.Pair; import org.slf4j.Logger; From b8dddf0ec1b6f4c96aea3f1a4c94eda1ff06421c Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 19 May 2020 14:04:11 +0800 Subject: [PATCH 3/5] refactor: block initialization before dump completes --- .../ConfigModuleInitializeReporter.java | 74 ------------------- .../core/code/ModuleInitializeReporter.java | 56 -------------- 2 files changed, 130 deletions(-) delete mode 100644 config/src/main/java/com/alibaba/nacos/config/server/service/ConfigModuleInitializeReporter.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/code/ModuleInitializeReporter.java diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigModuleInitializeReporter.java b/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigModuleInitializeReporter.java deleted file mode 100644 index 62ec22cd7..000000000 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigModuleInitializeReporter.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * * Copyright 1999-2018 Alibaba Group Holding Ltd. - * * - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package com.alibaba.nacos.config.server.service; - -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.config.server.Config; -import com.alibaba.nacos.core.code.ModuleInitializeReporter; -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Component; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Configure the initialization completion debrief for the module - * - * @author liaochuntao - */ -@Component -public class ConfigModuleInitializeReporter implements ModuleInitializeReporter { - - private Map successRecord = new ConcurrentHashMap<>(4); - private Map exceptionRecord = new ConcurrentHashMap<>(4); - - public void setSuccess(String name, boolean success) { - successRecord.put(name, success); - } - - public void setEx(String name, Throwable ex) { - exceptionRecord.put(name, ex); - } - - @Override - public boolean alreadyInitialized() { - boolean[] initialize = new boolean[] { !successRecord.isEmpty() }; - successRecord.forEach((group, isOk) -> initialize[0] &= isOk); - return initialize[0] && exceptionRecord.isEmpty(); - } - - @Override - public boolean hasException() { - return !exceptionRecord.isEmpty(); - } - - @Override - public Throwable getError() { - StringBuilder errorBuilder = new StringBuilder(); - exceptionRecord.forEach((s, throwable) -> errorBuilder.append("[").append(s).append("]") - .append(":").append(throwable.toString()) - .append(StringUtils.LF)); - return new NacosException(NacosException.SERVER_ERROR, errorBuilder.toString()); - } - - @Override - public String group() { - return Config.class.getName(); - } -} diff --git a/core/src/main/java/com/alibaba/nacos/core/code/ModuleInitializeReporter.java b/core/src/main/java/com/alibaba/nacos/core/code/ModuleInitializeReporter.java deleted file mode 100644 index a9ebc1ec8..000000000 --- a/core/src/main/java/com/alibaba/nacos/core/code/ModuleInitializeReporter.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * * Copyright 1999-2018 Alibaba Group Holding Ltd. - * * - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package com.alibaba.nacos.core.code; - -/** - * Record whether the associated module has been initialized - * - * @author liaochuntao - */ -public interface ModuleInitializeReporter { - - /** - * Whether the initialization of the module is completed - * - * @return {@link Boolean} - */ - boolean alreadyInitialized(); - - /** - * Determines if there is an exception to terminate the program during initialization - * - * @return {@link Boolean} - */ - boolean hasException(); - - /** - * get exception - * - * @return {@link Throwable} - */ - Throwable getError(); - - /** - * The name of the module - * - * @return module name - */ - String group(); - -} From 10d8edb4f3ccfdc1506b2a7763bdd985c6c5ca31 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 19 May 2020 14:16:52 +0800 Subject: [PATCH 4/5] refactor: optimize code logic --- .../nacos/common/utils/ThreadUtils.java | 3 +- .../server/service/dump/DumpService.java | 47 ++++++++++++------- .../server/service/dump/DumpServiceTest.java | 2 +- .../StartingSpringApplicationRunListener.java | 26 ---------- 4 files changed, 34 insertions(+), 44 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java index 4a82748dd..52ddfba52 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java @@ -57,7 +57,8 @@ public final class ThreadUtils { } /** - * 通过内核数,算出合适的线程数;1.5-2倍cpu内核数 + * Through the number of cores, calculate the appropriate number of threads; + * 1.5-2 times the number of CPU cores * * @return thread count */ diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java index 90b6d4b10..5ae2e213c 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java @@ -19,6 +19,7 @@ import com.alibaba.nacos.common.utils.IoUtils; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.common.utils.Observable; import com.alibaba.nacos.common.utils.Observer; +import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.manager.TaskManager; import com.alibaba.nacos.config.server.model.ConfigInfo; @@ -26,7 +27,6 @@ import com.alibaba.nacos.config.server.model.ConfigInfoAggr; import com.alibaba.nacos.config.server.model.ConfigInfoChanged; import com.alibaba.nacos.config.server.model.ConfigInfoWrapper; import com.alibaba.nacos.config.server.model.Page; -import com.alibaba.nacos.config.server.service.ConfigModuleInitializeReporter; import com.alibaba.nacos.config.server.service.ConfigService; import com.alibaba.nacos.config.server.service.DiskUtil; import com.alibaba.nacos.config.server.service.DynamicDataSource; @@ -63,8 +63,10 @@ import java.util.Calendar; import java.util.List; import java.util.Objects; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static com.alibaba.nacos.config.server.utils.LogUtil.fatalLog; @@ -76,7 +78,6 @@ import static com.alibaba.nacos.config.server.utils.LogUtil.fatalLog; @Service public class DumpService { - private final ConfigModuleInitializeReporter reporter; private final PersistService persistService; private final ServerMemberManager memberManager; private final ProtocolManager protocolManager; @@ -85,14 +86,12 @@ public class DumpService { * Here you inject the dependent objects constructively, ensuring that some * of the dependent functionality is initialized ahead of time * - * @param reporter {@link ConfigModuleInitializeReporter} * @param persistService {@link PersistService} * @param memberManager {@link ServerMemberManager} * @param protocolManager {@link ProtocolManager} */ - public DumpService(ConfigModuleInitializeReporter reporter, PersistService persistService, - ServerMemberManager memberManager, ProtocolManager protocolManager) { - this.reporter = reporter; + public DumpService(PersistService persistService, ServerMemberManager memberManager, + ProtocolManager protocolManager) { this.persistService = persistService; this.memberManager = memberManager; this.protocolManager = protocolManager; @@ -111,10 +110,7 @@ public class DumpService { } @PostConstruct - protected void init() throws Exception { - // I haven't initialized myself yet - this.reporter.setSuccess(DumpService.class.getName(), false); - + protected void init() throws Throwable { DynamicDataSource.getInstance().getDataSource(); DumpProcessor processor = new DumpProcessor(this); @@ -138,6 +134,9 @@ public class DumpService { .info("With embedded distributed storage, you need to wait for " + "the underlying master to complete before you can perform the dump operation."); + AtomicReference errorReference = new AtomicReference<>(null); + CountDownLatch waitDumpFinish = new CountDownLatch(1); + // watch path => /nacos_config/leader/ has value ? Observer observer = new Observer() { @@ -148,8 +147,16 @@ public class DumpService { if (Objects.isNull(arg)) { return; } - dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, - dumpAllTagProcessor); + try { + dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, + dumpAllTagProcessor); + } + catch (Throwable ex) { + errorReference.set(ex); + } + finally { + waitDumpFinish.countDown(); + } protocol.protocolMetaData() .unSubscribe(Constants.CONFIG_MODEL_RAFT_GROUP, com.alibaba.nacos.consistency.cp.Constants.LEADER_META_DATA, @@ -162,6 +169,17 @@ public class DumpService { com.alibaba.nacos.consistency.cp.Constants.LEADER_META_DATA, observer); + // We must wait for the dump task to complete the callback operation before + // continuing with the initialization + ThreadUtils.latchAwait(waitDumpFinish); + + // If an exception occurs during the execution of the dump task, the exception + // needs to be thrown, triggering the node to start the failed process + final Throwable ex = errorReference.get(); + if (Objects.nonNull(ex)) { + throw ex; + } + } else { dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, @@ -275,11 +293,8 @@ public class DumpService { TimerTaskService .scheduleWithFixedDelay(clearConfigHistory, 10, 10, TimeUnit.MINUTES); - reporter.setSuccess(DumpService.class.getName(), true); } - catch (Throwable ex) { - reporter.setEx(DumpService.class.getName(), ex); - } finally { + finally { TimerContext.end(LogUtil.defaultLog); } diff --git a/config/src/test/java/com/alibaba/nacos/config/server/service/dump/DumpServiceTest.java b/config/src/test/java/com/alibaba/nacos/config/server/service/dump/DumpServiceTest.java index af3d3877c..acc5e3dd5 100644 --- a/config/src/test/java/com/alibaba/nacos/config/server/service/dump/DumpServiceTest.java +++ b/config/src/test/java/com/alibaba/nacos/config/server/service/dump/DumpServiceTest.java @@ -16,7 +16,7 @@ public class DumpServiceTest { DumpService service; @Test - public void init() throws Exception { + public void init() throws Throwable { service.init(); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java b/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java index 33a9457d0..5d753f39b 100644 --- a/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java +++ b/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java @@ -15,7 +15,6 @@ */ package com.alibaba.nacos.core.code; -import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.common.executor.ExecutorFactory; import com.alibaba.nacos.common.executor.NameThreadFactory; import com.alibaba.nacos.common.executor.ThreadPoolManager; @@ -36,10 +35,7 @@ import org.springframework.core.env.ConfigurableEnvironment; import java.io.File; import java.io.IOException; import java.nio.file.Paths; -import java.util.Collection; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -116,28 +112,6 @@ public class StartingSpringApplicationRunListener public void started(ConfigurableApplicationContext context) { starting = false; ConfigurableEnvironment env = context.getEnvironment(); - Collection reporters = context.getBeansOfType( - ModuleInitializeReporter.class).values(); - Set initializingModules = reporters.stream().collect(HashSet::new, (m, v) -> m.add(v.group()), HashSet::addAll); - - do { - for (ModuleInitializeReporter reporter : reporters) { - if (reporter.hasException()) { - failed(context, new NacosException(NacosException.SERVER_ERROR, - reporter.getError())); - return; - } - - boolean finishInitialize = reporter.alreadyInitialized(); - - if (finishInitialize) { - LOGGER.info("{} finished initialize", reporter.group()); - initializingModules.remove(reporter.group()); - } - } - - } - while (!initializingModules.isEmpty()); closeExecutor(); From c4a93d1c378fe0b31b0c2322325eefcaf842230d Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Wed, 20 May 2020 15:45:15 +0800 Subject: [PATCH 5/5] refactor: code refactor --- .travis.yml | 2 +- .../com/alibaba/nacos/client/ConfigTest.java | 35 ++++++++++++++++--- .../server/service/dump/DumpService.java | 6 +--- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/.travis.yml b/.travis.yml index c9da1a3f4..b8a83104a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,7 +29,7 @@ before_install: script: - mvn -B clean package apache-rat:check findbugs:findbugs -Dmaven.test.skip=true - mvn -Prelease-nacos -Dmaven.test.skip=true clean install -U - - mvn clean install -Pit-test + - mvn clean package -Pit-test after_success: - mvn clean package -Pit-test - mvn sonar:sonar -Psonar-apache diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java index bc7c500f5..0a0ca4b7a 100644 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java @@ -18,19 +18,46 @@ package com.alibaba.nacos.client; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.common.utils.ThreadUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.Properties; /** * @author liaochuntao */ +@Ignore public class ConfigTest { - private ConfigService config7; - private ConfigService config8; - private ConfigService config9; + private ConfigService configService; - public void test() { + @Before + public void before() throws Exception { + Properties properties = new Properties(); + properties.setProperty(PropertyKeyConst.NAMESPACE, "bebf0150-e1ea-47e2-81fe-6814caf2b952"); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); + properties.setProperty(PropertyKeyConst.USERNAME, "chuntaojun"); + properties.setProperty(PropertyKeyConst.PASSWORD, "1017"); + configService = NacosFactory.createConfigService(properties); + } + @Test + public void test() throws Exception { + final String dataId = "lessspring"; + final String group = "lessspring"; + final String content = "lessspring-" + System.currentTimeMillis(); + boolean result = configService.publishConfig(dataId, group, content); + Assert.assertTrue(result); + + ThreadUtils.sleep(10_000); + String response = configService.getConfig(dataId, group, 5000); + System.out.println(response); } } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java index 5ae2e213c..d30f16797 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java @@ -105,10 +105,6 @@ public class DumpService { return memberManager; } - public ProtocolManager getProtocolManager() { - return protocolManager; - } - @PostConstruct protected void init() throws Throwable { DynamicDataSource.getInstance().getDataSource(); @@ -295,7 +291,7 @@ public class DumpService { .scheduleWithFixedDelay(clearConfigHistory, 10, 10, TimeUnit.MINUTES); } finally { - TimerContext.end(LogUtil.defaultLog); + TimerContext.end(LogUtil.dumpLog); } }