diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/NamingMaintainService.java b/api/src/main/java/com/alibaba/nacos/api/naming/NamingMaintainService.java index 8dfc24a91..abcb5969b 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/NamingMaintainService.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/NamingMaintainService.java @@ -167,4 +167,10 @@ public interface NamingMaintainService { */ void updateService(Service service, AbstractSelector selector) throws NacosException; + /** + * Shutdown the resource service. + * + * @throws NacosException exception. + */ + void shutDown() throws NacosException; } diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties deleted file mode 100644 index 0b3684628..000000000 --- a/api/src/main/resources/application.properties +++ /dev/null @@ -1,16 +0,0 @@ -# -# Copyright 1999-2018 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -version=${project.version} diff --git a/client/src/main/java/com/alibaba/nacos/client/logging/AbstractNacosLogging.java b/client/src/main/java/com/alibaba/nacos/client/logging/AbstractNacosLogging.java index 4d5a78db6..fa7427748 100644 --- a/client/src/main/java/com/alibaba/nacos/client/logging/AbstractNacosLogging.java +++ b/client/src/main/java/com/alibaba/nacos/client/logging/AbstractNacosLogging.java @@ -19,6 +19,8 @@ package com.alibaba.nacos.client.logging; import com.alibaba.nacos.common.utils.ConvertUtils; import com.alibaba.nacos.common.utils.StringUtils; +import java.io.File; + /** * Abstract nacos logging. * @@ -31,13 +33,13 @@ public abstract class AbstractNacosLogging { private static final String NACOS_LOGGING_DEFAULT_CONFIG_ENABLED_PROPERTY = "nacos.logging.default.config.enabled"; - private static final String NACOS_LOGGING_PATH_PROPERTY = "nacos.logging.path"; + private static final String NACOS_LOGGING_PATH_PROPERTY = "JM.LOG.PATH"; static { String loggingPath = System.getProperty(NACOS_LOGGING_PATH_PROPERTY); if (StringUtils.isBlank(loggingPath)) { String userHome = System.getProperty("user.home"); - System.setProperty(NACOS_LOGGING_PATH_PROPERTY, userHome + "/logs/nacos"); + System.setProperty(NACOS_LOGGING_PATH_PROPERTY, userHome + File.separator + "logs"); } } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingMaintainService.java b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingMaintainService.java index a6cac7314..2a553c22f 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingMaintainService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingMaintainService.java @@ -161,4 +161,8 @@ public class NacosNamingMaintainService implements NamingMaintainService { serverProxy.updateService(service, selector); } + @Override + public void shutDown() throws NacosException { + serverProxy.shutdown(); + } } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/cache/ConcurrentDiskUtil.java b/client/src/main/java/com/alibaba/nacos/client/naming/cache/ConcurrentDiskUtil.java index 77eb2bad4..d227a6edd 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/cache/ConcurrentDiskUtil.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/cache/ConcurrentDiskUtil.java @@ -143,11 +143,12 @@ public class ConcurrentDiskUtil { } } while (null == lock); - ByteBuffer sendBuffer = ByteBuffer.wrap(content.getBytes(charsetName)); + byte[] contentBytes = content.getBytes(charsetName); + ByteBuffer sendBuffer = ByteBuffer.wrap(contentBytes); while (sendBuffer.hasRemaining()) { channel.write(sendBuffer); } - channel.truncate(content.length()); + channel.truncate(contentBytes.length); } catch (FileNotFoundException e) { throw new IOException("file not exist"); } finally { diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java b/client/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java index 06a3666ec..59bde5df0 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java @@ -108,6 +108,9 @@ public class PushReceiver implements Runnable, Closeable { udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress())); } catch (Exception e) { + if (closed) { + return; + } NAMING_LOGGER.error("[NA] error while receiving push data", e); } } diff --git a/client/src/main/java/com/alibaba/nacos/client/security/SecurityProxy.java b/client/src/main/java/com/alibaba/nacos/client/security/SecurityProxy.java index e1bac67be..f0b956b32 100644 --- a/client/src/main/java/com/alibaba/nacos/client/security/SecurityProxy.java +++ b/client/src/main/java/com/alibaba/nacos/client/security/SecurityProxy.java @@ -28,8 +28,6 @@ import com.fasterxml.jackson.databind.JsonNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -127,13 +125,13 @@ public class SecurityProxy { * @param server server address * @return true if login successfully */ - public boolean login(String server) throws UnsupportedEncodingException { + public boolean login(String server) { if (StringUtils.isNotBlank(username)) { Map params = new HashMap(2); Map bodyMap = new HashMap(2); params.put("username", username); - bodyMap.put("password", URLEncoder.encode(password, "utf-8")); + bodyMap.put("password", password); String url = "http://" + server + contextPath + LOGIN_URL; if (server.contains(Constants.HTTP_PREFIX)) { diff --git a/client/src/main/java/com/alibaba/nacos/client/utils/ParamUtil.java b/client/src/main/java/com/alibaba/nacos/client/utils/ParamUtil.java index bbab52593..b3c04043e 100644 --- a/client/src/main/java/com/alibaba/nacos/client/utils/ParamUtil.java +++ b/client/src/main/java/com/alibaba/nacos/client/utils/ParamUtil.java @@ -20,9 +20,9 @@ import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.SystemPropertyKeyConst; import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.common.utils.StringUtils; +import com.alibaba.nacos.common.utils.VersionUtils; import org.slf4j.Logger; -import java.io.InputStream; import java.util.Properties; import java.util.concurrent.Callable; import java.util.regex.Pattern; @@ -80,19 +80,7 @@ public class ParamUtil { } LOGGER.info("[settings] [http-client] connect timeout:{}", connectTimeout); - try { - InputStream in = ValidatorUtils.class.getClassLoader().getResourceAsStream("application.properties"); - Properties props = new Properties(); - props.load(in); - String val = null; - val = props.getProperty("version"); - if (val != null) { - clientVersion = val; - } - LOGGER.info("NACOS_CLIENT_VERSION: {}", clientVersion); - } catch (Exception e) { - LOGGER.error("[500] read application.properties", e); - } + clientVersion = VersionUtils.version; try { perTaskConfigSize = Double.valueOf(System.getProperty("PER_TASK_CONFIG_SIZE", "3000")); diff --git a/client/src/main/resources/application.properties b/client/src/main/resources/application.properties deleted file mode 100644 index 0b3684628..000000000 --- a/client/src/main/resources/application.properties +++ /dev/null @@ -1,16 +0,0 @@ -# -# Copyright 1999-2018 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -version=${project.version} diff --git a/client/src/main/resources/nacos-log4j2.xml b/client/src/main/resources/nacos-log4j2.xml index f6d491d6d..9ce9d1f8b 100644 --- a/client/src/main/resources/nacos-log4j2.xml +++ b/client/src/main/resources/nacos-log4j2.xml @@ -17,8 +17,8 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n @@ -31,37 +31,36 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n - + - + - + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n - + - + - - - + + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n diff --git a/client/src/main/resources/nacos-logback.xml b/client/src/main/resources/nacos-logback.xml index a9687925e..b46e176b3 100644 --- a/client/src/main/resources/nacos-logback.xml +++ b/client/src/main/resources/nacos-logback.xml @@ -19,10 +19,10 @@ nacos - ${nacos.logging.path}/config.log + ${JM.LOG.PATH}/nacos/config.log - ${nacos.logging.path}/config.log.%i + ${JM.LOG.PATH}/nacos/config.log.%i ${JM.LOG.RETAIN.COUNT:-7} @@ -36,10 +36,10 @@ - ${nacos.logging.path}/naming.log + ${JM.LOG.PATH}/nacos/naming.log - ${nacos.logging.path}/naming.log.%i + ${JM.LOG.PATH}/nacos/naming.log.%i ${JM.LOG.RETAIN.COUNT:-7} diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/cache/DiskCacheTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/cache/DiskCacheTest.java index 6079e4da1..6ec35843b 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/cache/DiskCacheTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/cache/DiskCacheTest.java @@ -47,6 +47,7 @@ public class DiskCacheTest { instance.setIp("1.1.1.1"); instance.setPort(1234); instance.setServiceName("testName"); + instance.addMetadata("chinese", "中文"); serviceInfo.setHosts(Collections.singletonList(instance)); } @@ -87,9 +88,10 @@ public class DiskCacheTest { } private void assertInstance(Instance actual, Instance expected) { - assertEquals(actual.getServiceName(), actual.getServiceName()); - assertEquals(actual.getClusterName(), actual.getClusterName()); - assertEquals(actual.getIp(), actual.getIp()); - assertEquals(actual.getPort(), actual.getPort()); + assertEquals(actual.getServiceName(), expected.getServiceName()); + assertEquals(actual.getClusterName(), expected.getClusterName()); + assertEquals(actual.getIp(), expected.getIp()); + assertEquals(actual.getPort(), expected.getPort()); + assertEquals(actual.getMetadata(), expected.getMetadata()); } } diff --git a/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java b/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java index c3bab0863..5e6e93363 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java @@ -21,7 +21,7 @@ package com.alibaba.nacos.common.task; * * @author xiweng.yy */ -public abstract class AbstractExecuteTask implements NacosTask { +public abstract class AbstractExecuteTask implements NacosTask, Runnable { @Override public boolean shouldProcess() { diff --git a/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java b/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java index 9dcfad7cd..22275fd68 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java @@ -29,5 +29,5 @@ public interface NacosTaskProcessor { * @param task task. * @return process task result. */ - boolean process(AbstractDelayTask task); + boolean process(NacosTask task); } diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java index 9b571c6ea..b690eb23f 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java @@ -16,20 +16,13 @@ package com.alibaba.nacos.common.task.engine; -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.common.executor.ExecutorFactory; -import com.alibaba.nacos.common.executor.NameThreadFactory; import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; -import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; /** * Abstract nacos task execute engine. @@ -40,57 +33,12 @@ public abstract class AbstractNacosTaskExecuteEngine implem private final Logger log; - private final ScheduledExecutorService processingExecutor; - private final ConcurrentHashMap taskProcessors = new ConcurrentHashMap(); - protected final ConcurrentHashMap tasks; - - protected final ReentrantLock lock = new ReentrantLock(); - private NacosTaskProcessor defaultTaskProcessor; - public AbstractNacosTaskExecuteEngine(String name) { - this(name, 32, null, 100L); - } - - public AbstractNacosTaskExecuteEngine(String name, Logger logger) { - this(name, 32, logger, 100L); - } - - public AbstractNacosTaskExecuteEngine(String name, Logger logger, long processInterval) { - this(name, 32, logger, processInterval); - } - - public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger) { - this(name, initCapacity, logger, 100L); - } - - public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { + public AbstractNacosTaskExecuteEngine(Logger logger) { this.log = null != logger ? logger : LoggerFactory.getLogger(AbstractNacosTaskExecuteEngine.class.getName()); - tasks = new ConcurrentHashMap(initCapacity); - processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); - processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); - } - - @Override - public int size() { - lock.lock(); - try { - return tasks.size(); - } finally { - lock.unlock(); - } - } - - @Override - public boolean isEmpty() { - lock.lock(); - try { - return tasks.isEmpty(); - } finally { - lock.unlock(); - } } @Override @@ -118,56 +66,7 @@ public abstract class AbstractNacosTaskExecuteEngine implem this.defaultTaskProcessor = defaultTaskProcessor; } - @Override - public T removeTask(Object key) { - lock.lock(); - try { - T task = tasks.get(key); - if (null != task && task.shouldProcess()) { - return tasks.remove(key); - } else { - return null; - } - } finally { - lock.unlock(); - } - } - - @Override - public Collection getAllTaskKeys() { - Collection keys = new HashSet(); - lock.lock(); - try { - keys.addAll(tasks.keySet()); - } finally { - lock.unlock(); - } - return keys; - } - - @Override - public void shutdown() throws NacosException { - processingExecutor.shutdown(); - } - protected Logger getEngineLog() { return log; } - - /** - * process tasks in execute engine. - */ - protected abstract void processTasks(); - - private class ProcessRunnable implements Runnable { - - @Override - public void run() { - try { - AbstractNacosTaskExecuteEngine.this.processTasks(); - } catch (Throwable e) { - log.error(e.toString(), e); - } - } - } } diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java index 192d4518d..01b4e0de7 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java @@ -16,11 +16,19 @@ package com.alibaba.nacos.common.task.engine; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.executor.ExecutorFactory; +import com.alibaba.nacos.common.executor.NameThreadFactory; import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import org.slf4j.Logger; import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * Nacos delay task execute engine. @@ -29,27 +37,105 @@ import java.util.Collection; */ public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine { + private final ScheduledExecutorService processingExecutor; + + protected final ConcurrentHashMap tasks; + + protected final ReentrantLock lock = new ReentrantLock(); + public NacosDelayTaskExecuteEngine(String name) { - super(name); + this(name, null); } public NacosDelayTaskExecuteEngine(String name, Logger logger) { - super(name, logger); + this(name, 32, logger, 100L); } public NacosDelayTaskExecuteEngine(String name, Logger logger, long processInterval) { - super(name, logger, processInterval); + this(name, 32, logger, processInterval); } public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger) { - super(name, initCapacity, logger); + this(name, initCapacity, logger, 100L); } public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { - super(name, initCapacity, logger, processInterval); + super(logger); + tasks = new ConcurrentHashMap(initCapacity); + processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); + processingExecutor + .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); } @Override + public int size() { + lock.lock(); + try { + return tasks.size(); + } finally { + lock.unlock(); + } + } + + @Override + public boolean isEmpty() { + lock.lock(); + try { + return tasks.isEmpty(); + } finally { + lock.unlock(); + } + } + + @Override + public AbstractDelayTask removeTask(Object key) { + lock.lock(); + try { + AbstractDelayTask task = tasks.get(key); + if (null != task && task.shouldProcess()) { + return tasks.remove(key); + } else { + return null; + } + } finally { + lock.unlock(); + } + } + + @Override + public Collection getAllTaskKeys() { + Collection keys = new HashSet(); + lock.lock(); + try { + keys.addAll(tasks.keySet()); + } finally { + lock.unlock(); + } + return keys; + } + + @Override + public void shutdown() throws NacosException { + processingExecutor.shutdown(); + } + + @Override + public void addTask(Object key, AbstractDelayTask newTask) { + lock.lock(); + try { + AbstractDelayTask existTask = tasks.get(key); + if (null != existTask) { + newTask.merge(existTask); + } + tasks.put(key, newTask); + } finally { + lock.unlock(); + } + } + + /** + * process tasks in execute engine. + */ protected void processTasks() { Collection keys = getAllTaskKeys(); for (Object taskKey : keys) { @@ -74,22 +160,20 @@ public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine< } } - @Override - public void addTask(Object key, AbstractDelayTask newTask) { - lock.lock(); - try { - AbstractDelayTask existTask = tasks.get(key); - if (null != existTask) { - newTask.merge(existTask); - } - tasks.put(key, newTask); - } finally { - lock.unlock(); - } - } - private void retryFailedTask(Object key, AbstractDelayTask task) { task.setLastProcessTime(System.currentTimeMillis()); addTask(key, task); } + + private class ProcessRunnable implements Runnable { + + @Override + public void run() { + try { + processTasks(); + } catch (Throwable e) { + getEngineLog().error(e.toString(), e); + } + } + } } diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java new file mode 100644 index 000000000..81de69162 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java @@ -0,0 +1,111 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.task.engine; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.common.task.NacosTaskProcessor; +import com.alibaba.nacos.common.utils.ThreadUtils; +import org.slf4j.Logger; + +import java.util.Collection; + +/** + * Nacos execute task execute engine. + * + * @author xiweng.yy + */ +public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine { + + private final TaskExecuteWorker[] executeWorkers; + + public NacosExecuteTaskExecuteEngine(String name, Logger logger) { + this(name, logger, ThreadUtils.getSuitableThreadCount(1)); + } + + public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) { + super(logger); + executeWorkers = new TaskExecuteWorker[dispatchWorkerCount]; + for (int mod = 0; mod < dispatchWorkerCount; ++mod) { + executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog()); + } + } + + @Override + public int size() { + int result = 0; + for (TaskExecuteWorker each : executeWorkers) { + result += each.pendingTaskCount(); + } + return result; + } + + @Override + public boolean isEmpty() { + return 0 == size(); + } + + @Override + public void addTask(Object tag, AbstractExecuteTask task) { + NacosTaskProcessor processor = getProcessor(tag); + if (null != processor) { + processor.process(task); + return; + } + TaskExecuteWorker worker = getWorker(tag); + worker.process(task); + } + + private TaskExecuteWorker getWorker(Object tag) { + int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount(); + return executeWorkers[idx]; + } + + private int workersCount() { + return executeWorkers.length; + } + + @Override + public AbstractExecuteTask removeTask(Object key) { + throw new UnsupportedOperationException("ExecuteTaskEngine do not support remove task"); + } + + @Override + public Collection getAllTaskKeys() { + throw new UnsupportedOperationException("ExecuteTaskEngine do not support get all task keys"); + } + + @Override + public void shutdown() throws NacosException { + for (TaskExecuteWorker each : executeWorkers) { + each.shutdown(); + } + } + + /** + * Get workers status. + * + * @return workers status string + */ + public String workersStatus() { + StringBuilder sb = new StringBuilder(); + for (TaskExecuteWorker worker : executeWorkers) { + sb.append(worker.status()).append("\n"); + } + return sb.toString(); + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorker.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java similarity index 64% rename from core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorker.java rename to common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java index c8ab9f2f8..59fa690f0 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorker.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java @@ -14,69 +14,69 @@ * limitations under the License. */ -package com.alibaba.nacos.core.distributed.distro.task.execute; +package com.alibaba.nacos.common.task.engine; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.common.lifecycle.Closeable; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.common.task.NacosTask; +import com.alibaba.nacos.common.task.NacosTaskProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicBoolean; /** - * Distro execute worker. + * Nacos execute task execute worker. * * @author xiweng.yy */ -public final class DistroExecuteWorker implements Closeable { +public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(DistroExecuteWorker.class); + /** + * Max task queue size 32768. + */ + private static final int QUEUE_CAPACITY = 1 << 15; - private static final int QUEUE_CAPACITY = 50000; - - private final BlockingQueue queue; + private final Logger log; private final String name; + private final BlockingQueue queue; + private final AtomicBoolean closed; - - public DistroExecuteWorker(final int mod, final int total) { - name = getClass().getName() + "_" + mod + "%" + total; - queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY); - closed = new AtomicBoolean(false); + + public TaskExecuteWorker(final String name, final int mod, final int total) { + this(name, mod, total, null); + } + + public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) { + this.name = name + "_" + mod + "%" + total; + this.queue = new ArrayBlockingQueue(QUEUE_CAPACITY); + this.closed = new AtomicBoolean(false); + this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger; new InnerWorker(name).start(); } - + public String getName() { return name; } - - /** - * Execute task without result. - */ - public void execute(Runnable task) { - putTask(task); + + @Override + public boolean process(NacosTask task) { + if (task instanceof AbstractExecuteTask) { + putTask((Runnable) task); + } + return true; } - - /** - * Execute task with a result. - */ - public Future execute(Callable task) { - FutureTask future = new FutureTask(task); - putTask(future); - return future; - } - + private void putTask(Runnable task) { try { queue.put(task); } catch (InterruptedException ire) { - LOGGER.error(ire.toString(), ire); + log.error(ire.toString(), ire); } } @@ -101,12 +101,12 @@ public final class DistroExecuteWorker implements Closeable { * Inner execute worker. */ private class InnerWorker extends Thread { - + InnerWorker(String name) { setDaemon(false); setName(name); } - + @Override public void run() { while (!closed.get()) { @@ -116,10 +116,10 @@ public final class DistroExecuteWorker implements Closeable { task.run(); long duration = System.currentTimeMillis() - begin; if (duration > 1000L) { - LOGGER.warn("distro task {} takes {}ms", task, duration); + log.warn("distro task {} takes {}ms", task, duration); } } catch (Throwable e) { - LOGGER.error("[DISTRO-FAILED] " + e.toString(), e); + log.error("[DISTRO-FAILED] " + e.toString(), e); } } } diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java index 67d68babb..57149f205 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java @@ -94,9 +94,19 @@ public final class ThreadUtils { * @return thread count */ public static int getSuitableThreadCount() { + return getSuitableThreadCount(THREAD_MULTIPLER); + } + + /** + * Through the number of cores, calculate the appropriate number of threads. + * + * @param threadMultiple multiple time of cores + * @return thread count + */ + public static int getSuitableThreadCount(int threadMultiple) { final int coreCount = Runtime.getRuntime().availableProcessors(); int workerCount = 1; - while (workerCount < coreCount * THREAD_MULTIPLER) { + while (workerCount < coreCount * threadMultiple) { workerCount <<= 1; } return workerCount; diff --git a/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java b/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java new file mode 100644 index 000000000..4d166686a --- /dev/null +++ b/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.task.engine; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class NacosExecuteTaskExecuteEngineTest { + + private NacosExecuteTaskExecuteEngine executeTaskExecuteEngine; + + @Before + public void setUp() { + executeTaskExecuteEngine = new NacosExecuteTaskExecuteEngine("TEST", null); + } + + @After + public void tearDown() throws NacosException { + executeTaskExecuteEngine.shutdown(); + } + + @Mock + private AbstractExecuteTask task; + + @Test + public void testAddTask() { + executeTaskExecuteEngine.addTask("test", task); + verify(task).run(); + assertTrue(executeTaskExecuteEngine.isEmpty()); + assertEquals(0, executeTaskExecuteEngine.size()); + } +} diff --git a/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java b/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java index 90d42a3bd..95c775d05 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java @@ -43,6 +43,7 @@ import com.alibaba.nacos.config.server.service.trace.ConfigTraceService; import com.alibaba.nacos.config.server.utils.MD5Util; import com.alibaba.nacos.config.server.utils.ParamUtils; import com.alibaba.nacos.config.server.utils.RequestUtil; +import com.alibaba.nacos.config.server.utils.NamespaceUtil; import com.alibaba.nacos.config.server.utils.TimeUtils; import com.alibaba.nacos.config.server.utils.ZipUtils; import com.alibaba.nacos.sys.utils.InetUtils; @@ -90,8 +91,6 @@ public class ConfigController { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigController.class); - private static final String NAMESPACE_PUBLIC_KEY = "public"; - private static final String EXPORT_CONFIG_FILE_NAME = "nacos_config_export_"; private static final String EXPORT_CONFIG_FILE_NAME_EXT = ".zip"; @@ -175,7 +174,7 @@ public class ConfigController { .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService - .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIp(), + .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; } @@ -196,7 +195,7 @@ public class ConfigController { throws IOException, ServletException, NacosException { // check tenant ParamUtils.checkTenant(tenant); - tenant = processTenant(tenant); + tenant = NamespaceUtil.processNamespaceParameter(tenant); // check params ParamUtils.checkParam(dataId, group, "datumId", "content"); ParamUtils.checkParam(tag); @@ -469,7 +468,7 @@ public class ConfigController { @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "ids", required = false) List ids) { ids.removeAll(Collections.singleton(null)); - tenant = processTenant(tenant); + tenant = NamespaceUtil.processNamespaceParameter(tenant); List dataList = persistService.findAllConfigInfo4Export(dataId, group, tenant, appName, ids); List zipItemList = new ArrayList<>(); StringBuilder metaData = null; @@ -527,12 +526,12 @@ public class ConfigController { return ResultBuilder.buildResult(ResultCodeEnum.DATA_EMPTY, failedData); } - if (StringUtils.isNotBlank(namespace)) { - if (persistService.tenantInfoCountByTenantId(namespace) <= 0) { - failedData.put("succCount", 0); - return ResultBuilder.buildResult(ResultCodeEnum.NAMESPACE_NOT_EXIST, failedData); - } + namespace = NamespaceUtil.processNamespaceParameter(namespace); + if (StringUtils.isNotBlank(namespace) && persistService.tenantInfoCountByTenantId(namespace) <= 0) { + failedData.put("succCount", 0); + return ResultBuilder.buildResult(ResultCodeEnum.NAMESPACE_NOT_EXIST, failedData); } + List configInfoList = null; try { ZipUtils.UnZipResult unziped = ZipUtils.unzip(file.getBytes()); @@ -598,7 +597,7 @@ public class ConfigController { configInfo.getTenant(), time.getTime())); ConfigTraceService .logPersistenceEvent(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant(), - requestIpApp, time.getTime(), InetUtils.getSelfIp(), + requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, configInfo.getContent()); } return ResultBuilder.buildSuccessResult("导入成功", saveResult); @@ -628,10 +627,9 @@ public class ConfigController { return ResultBuilder.buildResult(ResultCodeEnum.NO_SELECTED_CONFIG, failedData); } configBeansList.removeAll(Collections.singleton(null)); - - if (NAMESPACE_PUBLIC_KEY.equalsIgnoreCase(namespace)) { - namespace = ""; - } else if (persistService.tenantInfoCountByTenantId(namespace) <= 0) { + + namespace = NamespaceUtil.processNamespaceParameter(namespace); + if (StringUtils.isNotBlank(namespace) && persistService.tenantInfoCountByTenantId(namespace) <= 0) { failedData.put("succCount", 0); return ResultBuilder.buildResult(ResultCodeEnum.NAMESPACE_NOT_EXIST, failedData); } @@ -684,17 +682,10 @@ public class ConfigController { configInfo.getTenant(), time.getTime())); ConfigTraceService .logPersistenceEvent(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant(), - requestIpApp, time.getTime(), InetUtils.getSelfIp(), + requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, configInfo.getContent()); } return ResultBuilder.buildSuccessResult("Clone Completed Successfully", saveResult); } - private String processTenant(String tenant) { - if (StringUtils.isEmpty(tenant) || NAMESPACE_PUBLIC_KEY.equalsIgnoreCase(tenant)) { - return ""; - } - return tenant; - } - } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/controller/HealthController.java b/config/src/main/java/com/alibaba/nacos/config/server/controller/HealthController.java index 10870b977..1fa35f643 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/controller/HealthController.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/controller/HealthController.java @@ -69,7 +69,7 @@ public class HealthController { sb.append("master db (").append(dbStatus.split(":")[1]).append(") down. "); } if (!memberManager.isInIpList()) { - sb.append("server ip ").append(InetUtils.getSelfIp()) + sb.append("server ip ").append(InetUtils.getSelfIP()) .append(" is not in the serverList of address server. "); } } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java b/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java index 720ce3acb..9c00aeb65 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java @@ -44,10 +44,6 @@ public final class TaskManager extends NacosDelayTaskExecuteEngine implements Ta Condition notEmpty = this.lock.newCondition(); - public TaskManager() { - this(null); - } - public TaskManager(String name) { super(name, LOGGER, 100L); this.name = name; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/model/app/ApplicationInfo.java b/config/src/main/java/com/alibaba/nacos/config/server/model/app/ApplicationInfo.java index 0100068ad..acc318d7b 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/model/app/ApplicationInfo.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/model/app/ApplicationInfo.java @@ -102,7 +102,7 @@ public class ApplicationInfo { */ public boolean canCurrentServerOwnTheLock() { boolean currentOwnerIsMe = - subInfoCollectLockOwner == null || InetUtils.getSelfIp().equals(subInfoCollectLockOwner); + subInfoCollectLockOwner == null || InetUtils.getSelfIP().equals(subInfoCollectLockOwner); if (currentOwnerIsMe) { return true; @@ -115,7 +115,7 @@ public class ApplicationInfo { } public String currentServer() { - return InetUtils.getSelfIp(); + return InetUtils.getSelfIP(); } } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java index f5d14285e..4470269db 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java @@ -419,7 +419,7 @@ public abstract class DumpService { } } else { // remove config info - persistService.removeConfigInfo(dataId, group, tenant, InetUtils.getSelfIp(), null); + persistService.removeConfigInfo(dataId, group, tenant, InetUtils.getSelfIP(), null); LOGGER.warn( "[merge-delete] delete config info because no datum. dataId=" + dataId + ", groupId=" + group); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java index 44452091e..cea4c8eae 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.service.dump.processor; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfoBetaWrapper; import com.alibaba.nacos.config.server.model.Page; @@ -43,7 +43,7 @@ public class DumpAllBetaProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { int rowCount = persistService.configInfoBetaCount(); int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java index 72ba2f07a..96527deac 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java @@ -16,9 +16,9 @@ package com.alibaba.nacos.config.server.service.dump.processor; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfoWrapper; import com.alibaba.nacos.config.server.model.Page; @@ -47,7 +47,7 @@ public class DumpAllProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { long currentMaxId = persistService.findConfigMaxId(); long lastMaxId = 0; while (lastMaxId < currentMaxId) { diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java index 2b013f70e..d0b823134 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.service.dump.processor; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfoTagWrapper; import com.alibaba.nacos.config.server.model.Page; @@ -42,7 +42,7 @@ public class DumpAllTagProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { int rowCount = persistService.configInfoTagCount(); int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java index cabd9a225..0444fe3d5 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java @@ -16,9 +16,9 @@ package com.alibaba.nacos.config.server.service.dump.processor; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfoWrapper; @@ -47,7 +47,7 @@ public class DumpChangeProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { LogUtil.DEFAULT_LOG.warn("quick start; startTime:{},endTime:{}", startTime, endTime); LogUtil.DEFAULT_LOG.warn("updateMd5 start"); long startUpdateMd5 = System.currentTimeMillis(); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java index f2ccc17ed..545be5da4 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.service.dump.processor; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfo4Beta; @@ -44,7 +44,7 @@ public class DumpProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { final PersistService persistService = dumpService.getPersistService(); DumpTask dumpTask = (DumpTask) task; String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey()); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDatumService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDatumService.java index ef34f1d01..e08a56020 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDatumService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDatumService.java @@ -106,7 +106,7 @@ public class MergeDatumService { return; } for (ConfigInfoChanged item : persistService.findAllAggrGroup()) { - addMergeTask(item.getDataId(), item.getGroup(), item.getTenant(), InetUtils.getSelfIp()); + addMergeTask(item.getDataId(), item.getGroup(), item.getTenant(), InetUtils.getSelfIP()); } } @@ -163,7 +163,7 @@ public class MergeDatumService { ContentUtils.truncateContent(cf.getContent())); } else { // remove - persistService.removeConfigInfo(dataId, group, tenant, InetUtils.getSelfIp(), null); + persistService.removeConfigInfo(dataId, group, tenant, InetUtils.getSelfIP(), null); LOGGER.warn("[merge-delete] delete config info because no datum. dataId=" + dataId + ", groupId=" + group); } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java index 063a244f8..17fda431f 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java @@ -17,8 +17,8 @@ package com.alibaba.nacos.config.server.service.merge; import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfoAggr; @@ -52,7 +52,7 @@ public class MergeTaskProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { MergeDataTask mergeTask = (MergeDataTask) task; final String dataId = mergeTask.dataId; final String group = mergeTask.groupId; @@ -84,7 +84,7 @@ public class MergeTaskProcessor implements NacosTaskProcessor { ContentUtils.truncateContent(cf.getContent())); ConfigTraceService - .logPersistenceEvent(dataId, group, tenant, null, time.getTime(), InetUtils.getSelfIp(), + .logPersistenceEvent(dataId, group, tenant, null, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_MERGE, cf.getContent()); } else { // remove @@ -98,7 +98,7 @@ public class MergeTaskProcessor implements NacosTaskProcessor { "[merge-delete] delete config info because no datum. dataId=" + dataId + ", groupId=" + group); ConfigTraceService - .logPersistenceEvent(dataId, group, tenant, null, time.getTime(), InetUtils.getSelfIp(), + .logPersistenceEvent(dataId, group, tenant, null, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_REMOVE, null); } NotifyCenter.publishEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java index 6349777fe..e1f0b5438 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java @@ -154,7 +154,7 @@ public class AsyncNotifyService { if (unHealthNeedDelay) { // target ip is unhealthy, then put it in the notification list ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, - task.getLastModified(), InetUtils.getSelfIp(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, + task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task asyncTaskExecute(task); @@ -162,7 +162,7 @@ public class AsyncNotifyService { Header header = Header.newInstance(); header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified())); - header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIp()); + header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP()); if (task.isBeta) { header.addParam("isBeta", "true"); } @@ -280,12 +280,14 @@ public class AsyncNotifyService { long delayed = System.currentTimeMillis() - task.getLastModified(); if (result.ok()) { - ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIp(), ConfigTraceService.NOTIFY_EVENT_OK, delayed, + ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, + task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_OK, delayed, task.target); } else { LOGGER.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", task.target, task.getDataId(), task.getGroup(), task.getLastModified(), result.getCode()); - ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIp(), ConfigTraceService.NOTIFY_EVENT_ERROR, delayed, + ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, + task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_ERROR, delayed, task.target); //get delay time and set fail count to the task @@ -306,7 +308,7 @@ public class AsyncNotifyService { task.getGroup(), task.getLastModified(), ex.toString()); ConfigTraceService .logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), - InetUtils.getSelfIp(), ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed, task.target); + InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed, task.target); //get delay time and set fail count to the task asyncTaskExecute(task); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java index bf778db69..b427f42b8 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java @@ -18,7 +18,7 @@ package com.alibaba.nacos.config.server.service.notify; import com.alibaba.nacos.common.executor.ExecutorFactory; import com.alibaba.nacos.common.executor.NameThreadFactory; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.config.server.utils.LogUtil; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.ServerMemberManager; @@ -50,7 +50,7 @@ public class NotifySingleService { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { NotifySingleTask notifyTask = (NotifySingleTask) task; return notifyToDump(notifyTask.getDataId(), notifyTask.getGroup(), notifyTask.getTenant(), notifyTask.getLastModified(), notifyTask.target); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java index 79bdd56c4..aa505c7fb 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java @@ -17,8 +17,8 @@ package com.alibaba.nacos.config.server.service.notify; import com.alibaba.nacos.common.model.RestResult; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.monitor.MetricsMonitor; import com.alibaba.nacos.config.server.service.trace.ConfigTraceService; @@ -46,7 +46,7 @@ public class NotifyTaskProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { NotifyTask notifyTask = (NotifyTask) task; String dataId = notifyTask.getDataId(); String group = notifyTask.getGroup(); @@ -73,13 +73,13 @@ public class NotifyTaskProcessor implements NacosTaskProcessor { */ List headers = Arrays .asList(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(lastModified), - NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIp()); + NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP()); String urlString = MessageFormat .format(URL_PATTERN, serverIp, ApplicationUtils.getContextPath(), dataId, group); RestResult result = NotifyService.invokeURL(urlString, headers, Constants.ENCODE); if (result.ok()) { - ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, InetUtils.getSelfIp(), + ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_OK, delayed, serverIp); MetricsMonitor.getNotifyRtTimer().record(delayed, TimeUnit.MILLISECONDS); @@ -89,7 +89,7 @@ public class NotifyTaskProcessor implements NacosTaskProcessor { MetricsMonitor.getConfigNotifyException().increment(); LOGGER.error("[notify-error] {}, {}, to {}, result {}", new Object[] {dataId, group, serverIp, result.getCode()}); - ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, InetUtils.getSelfIp(), + ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_ERROR, delayed, serverIp); return false; } @@ -97,7 +97,7 @@ public class NotifyTaskProcessor implements NacosTaskProcessor { MetricsMonitor.getConfigNotifyException().increment(); LOGGER.error("[notify-exception] " + dataId + ", " + group + ", to " + serverIp + ", " + e.toString()); LOGGER.debug("[notify-exception] " + dataId + ", " + group + ", to " + serverIp + ", " + e.toString(), e); - ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, InetUtils.getSelfIp(), + ConfigTraceService.logNotifyEvent(dataId, group, tenant, null, lastModified, InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed, serverIp); return false; } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/repository/extrnal/ExternalStoragePersistServiceImpl.java b/config/src/main/java/com/alibaba/nacos/config/server/service/repository/extrnal/ExternalStoragePersistServiceImpl.java index 22c9a6ea7..276b61afe 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/repository/extrnal/ExternalStoragePersistServiceImpl.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/repository/extrnal/ExternalStoragePersistServiceImpl.java @@ -2292,7 +2292,7 @@ public class ExternalStoragePersistServiceImpl implements PersistService { .queryForObject(sqlFetchRows, new Object[] {nid}, HISTORY_DETAIL_ROW_MAPPER); return historyInfo; } catch (DataAccessException e) { - LogUtil.FATAL_LOG.error("[list-config-history] error, nid:{}", new Object[] {nid}, e); + LogUtil.FATAL_LOG.error("[detail-config-history] error, nid:{}", new Object[] {nid}, e); throw e; } } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/trace/ConfigTraceService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/trace/ConfigTraceService.java index d1ae51f42..e36f82f4b 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/trace/ConfigTraceService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/trace/ConfigTraceService.java @@ -87,7 +87,7 @@ public class ConfigTraceService { // (md5) String md5 = content == null ? null : MD5Utils.md5Hex(content, Constants.ENCODE); - LogUtil.TRACE_LOG.info("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}", InetUtils.getSelfIp(), dataId, group, tenant, + LogUtil.TRACE_LOG.info("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}", InetUtils.getSelfIP(), dataId, group, tenant, requestIpAppName, ts, handleIp, "persist", type, -1, md5); } @@ -116,7 +116,7 @@ public class ConfigTraceService { } //localIp | dataid | group | tenant | requestIpAppName | ts | handleIp | event | type | [delayed] | ext // (targetIp) - LogUtil.TRACE_LOG.info("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}", InetUtils.getSelfIp(), dataId, group, tenant, + LogUtil.TRACE_LOG.info("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}", InetUtils.getSelfIP(), dataId, group, tenant, requestIpAppName, ts, handleIp, "notify", type, delayed, targetIp); } @@ -143,7 +143,7 @@ public class ConfigTraceService { tenant = null; } //localIp | dataid | group | tenant | requestIpAppName | ts | handleIp | event | type | [delayed] | length - LogUtil.TRACE_LOG.info("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}", InetUtils.getSelfIp(), dataId, group, tenant, + LogUtil.TRACE_LOG.info("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}", InetUtils.getSelfIP(), dataId, group, tenant, requestIpAppName, ts, handleIp, "dump", type, delayed, length); } @@ -169,7 +169,7 @@ public class ConfigTraceService { } //localIp | dataid | group | tenant | requestIpAppName | ts | handleIp | event | type | [delayed = -1] LogUtil.TRACE_LOG - .info("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}", InetUtils.getSelfIp(), dataId, group, tenant, requestIpAppName, + .info("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}", InetUtils.getSelfIP(), dataId, group, tenant, requestIpAppName, ts, handleIp, "dump-all", type, -1); } @@ -196,7 +196,7 @@ public class ConfigTraceService { } //localIp | dataid | group | tenant| requestIpAppName| ts | event | type | [delayed] | ext(clientIp) LogUtil.TRACE_LOG - .info("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}", InetUtils.getSelfIp(), dataId, group, tenant, requestIpAppName, + .info("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}", InetUtils.getSelfIP(), dataId, group, tenant, requestIpAppName, ts, "pull", type, delayed, clientIp); } } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/utils/NamespaceUtil.java b/config/src/main/java/com/alibaba/nacos/config/server/utils/NamespaceUtil.java new file mode 100644 index 000000000..2fc0277ef --- /dev/null +++ b/config/src/main/java/com/alibaba/nacos/config/server/utils/NamespaceUtil.java @@ -0,0 +1,47 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.config.server.utils; + +import org.apache.commons.lang3.StringUtils; + +/** + * namespace(tenant) util. + * Because config and naming treat namespace(tenant) differently, + * this tool class can only be used by the config module. + * @author klw(213539@qq.com) + * @date 2020/10/12 17:56 + */ +public class NamespaceUtil { + + private static final String NAMESPACE_PUBLIC_KEY = "public"; + + private static final String NAMESPACE_NULL_KEY = "null"; + + /** + * Treat the namespace(tenant) parameters with values of "public" and "null" as an empty string. + * @param tenant namespace(tenant) id + * @return java.lang.String A namespace(tenant) string processed + */ + public static String processNamespaceParameter(String tenant) { + if (StringUtils.isBlank(tenant) || NAMESPACE_PUBLIC_KEY.equalsIgnoreCase(tenant) || NAMESPACE_NULL_KEY + .equalsIgnoreCase(tenant)) { + return ""; + } + return tenant.trim(); + } + +} diff --git a/config/src/test/java/com/alibaba/nacos/config/server/utils/NamespaceUtilTest.java b/config/src/test/java/com/alibaba/nacos/config/server/utils/NamespaceUtilTest.java new file mode 100644 index 000000000..16bbd700d --- /dev/null +++ b/config/src/test/java/com/alibaba/nacos/config/server/utils/NamespaceUtilTest.java @@ -0,0 +1,47 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.config.server.utils; + +import org.junit.Assert; +import org.junit.Test; + +/** + * test NamespaceUtil. + * + * @author klw(213539 @ qq.com) + * @date 2020/10/13 9:46 + */ +public class NamespaceUtilTest { + + @Test + public void testProcessTenantParameter() { + String strPublic = "public"; + String strNull = "null"; + String strEmpty = ""; + String strAbc = "abc"; + String strdef123 = "def123"; + String strAbcHasSpace = " abc "; + Assert.assertEquals(strEmpty, NamespaceUtil.processNamespaceParameter(strPublic)); + Assert.assertEquals(strEmpty, NamespaceUtil.processNamespaceParameter(strNull)); + Assert.assertEquals(strEmpty, NamespaceUtil.processNamespaceParameter(strEmpty)); + Assert.assertEquals(strEmpty, NamespaceUtil.processNamespaceParameter(null)); + Assert.assertEquals(strAbc, NamespaceUtil.processNamespaceParameter(strAbc)); + Assert.assertEquals(strdef123, NamespaceUtil.processNamespaceParameter(strdef123)); + Assert.assertEquals(strAbc, NamespaceUtil.processNamespaceParameter(strAbcHasSpace)); + } + +} diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java index 2af1507d5..697efac38 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberMetaDataConstants.java @@ -40,9 +40,5 @@ public class MemberMetaDataConstants { public static final String SUPPORT_REMOTE_C_TYPE = "remoteConnectType"; - public static final String[] META_KEY_LIST = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT, WEIGHT, - LAST_REFRESH_TIME, VERSION, SUPPORT_REMOTE_C_TYPE}; - - public static final String[] META_KEY_LIST_WITHOUT_LAST_REFRESH_TIME = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT, - WEIGHT, VERSION, SUPPORT_REMOTE_C_TYPE}; + public static final String[] BASIC_META_KEYS = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT, WEIGHT, VERSION}; } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java index f99ca11f8..ca3cdae98 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtils.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; @@ -263,40 +264,40 @@ public class MemberUtils { } /** - * Judge whether two member is full equals. + * Judge whether basic info has changed. * * @param actual actual member * @param expected expected member * @return true if all content is same, otherwise false */ - public static boolean fullEquals(Member actual, Member expected) { + public static boolean isBasicInfoChanged(Member actual, Member expected) { if (null == expected) { return null == actual; } if (!expected.getIp().equals(actual.getIp())) { - return false; + return true; } if (expected.getPort() != actual.getPort()) { - return false; + return true; } if (!expected.getAddress().equals(actual.getAddress())) { - return false; + return true; } if (!expected.getState().equals(actual.getState())) { - return false; + return true; } - return equalsExtendInfo(expected, actual); + return isBasicInfoChangedInExtendInfo(expected, actual); } - private static boolean equalsExtendInfo(Member expected, Member actual) { - for (String each : MemberMetaDataConstants.META_KEY_LIST_WITHOUT_LAST_REFRESH_TIME) { + private static boolean isBasicInfoChangedInExtendInfo(Member expected, Member actual) { + for (String each : MemberMetaDataConstants.BASIC_META_KEYS) { if (expected.getExtendInfo().containsKey(each) != actual.getExtendInfo().containsKey(each)) { - return false; + return true; } - if (null != expected.getExtendVal(each) && !expected.getExtendVal(each).equals(actual.getExtendVal(each))) { - return false; + if (!Objects.equals(expected.getExtendVal(each), actual.getExtendVal(each))) { + return true; } } - return true; + return false; } } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java index 62a7c260f..4e4536b79 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java @@ -133,7 +133,7 @@ public class ServerMemberManager implements ApplicationListener() { @Override public void onEvent(InetUtils.IPChangeEvent event) { - String newAddress = event.getNewIp() + ":" + port; + String newAddress = event.getNewIP() + ":" + port; ServerMemberManager.this.localAddress = newAddress; ApplicationUtils.setLocalAddress(localAddress); - + Member self = ServerMemberManager.this.self; - self.setIp(event.getNewIp()); - - String oldAddress = event.getOldIp() + ":" + port; + self.setIp(event.getNewIP()); + + String oldAddress = event.getOldIP() + ":" + port; ServerMemberManager.this.serverList.remove(oldAddress); ServerMemberManager.this.serverList.put(newAddress, self); @@ -213,10 +213,11 @@ public class ServerMemberManager implements ApplicationListener= maxFailCount) { diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/StandaloneMemberLookup.java b/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/StandaloneMemberLookup.java index a5d13254c..47dabbc5d 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/StandaloneMemberLookup.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/lookup/StandaloneMemberLookup.java @@ -33,7 +33,7 @@ public class StandaloneMemberLookup extends AbstractMemberLookup { @Override public void start() { if (start.compareAndSet(false, true)) { - String url = InetUtils.getSelfIp() + ":" + ApplicationUtils.getPort(); + String url = InetUtils.getSelfIP() + ":" + ApplicationUtils.getPort(); afterLookup(MemberUtils.readServerConf(Collections.singletonList(url))); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java b/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java index e666d1a6b..12804721b 100644 --- a/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java +++ b/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java @@ -86,7 +86,7 @@ public class StartingSpringApplicationRunListener implements SpringApplicationRu System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, ApplicationUtils.FUNCTION_MODE_NAMING); } - System.setProperty(LOCAL_IP_PROPERTY_KEY, InetUtils.getSelfIp()); + System.setProperty(LOCAL_IP_PROPERTY_KEY, InetUtils.getSelfIP()); } @Override diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java index 3724a2727..8cfe9ea03 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java @@ -20,7 +20,7 @@ import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor; -import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteWorkersManager; +import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteTaskExecuteEngine; import org.springframework.stereotype.Component; /** @@ -33,7 +33,7 @@ public class DistroTaskEngineHolder { private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine(); - private final DistroExecuteWorkersManager executeWorkersManager = new DistroExecuteWorkersManager(); + private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine(); public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) { DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder); @@ -44,7 +44,7 @@ public class DistroTaskEngineHolder { return delayTaskExecuteEngine; } - public DistroExecuteWorkersManager getExecuteWorkersManager() { + public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() { return executeWorkersManager; } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java index 6a40fb110..8e8ab3b09 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.core.distributed.distro.task.delay; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder; @@ -43,7 +43,7 @@ public class DistroDelayTaskProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true; } @@ -51,7 +51,7 @@ public class DistroDelayTaskProcessor implements NacosTaskProcessor { DistroKey distroKey = distroDelayTask.getDistroKey(); if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) { DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); - distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask); + distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; } else if (DataOperation.DELETE.equals(distroDelayTask.getAction())) { DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder); diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java index 98719effe..13fa43875 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java @@ -28,7 +28,7 @@ import com.alibaba.nacos.core.utils.Loggers; * * @author xiweng.yy */ -public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask implements Runnable { +public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask { private final DistroKey distroKey; diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteTaskExecuteEngine.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteTaskExecuteEngine.java new file mode 100644 index 000000000..1883dac2a --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteTaskExecuteEngine.java @@ -0,0 +1,32 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.distributed.distro.task.execute; + +import com.alibaba.nacos.common.task.engine.NacosExecuteTaskExecuteEngine; +import com.alibaba.nacos.core.utils.Loggers; + +/** + * Distro execute task execute engine. + * + * @author xiweng.yy + */ +public class DistroExecuteTaskExecuteEngine extends NacosExecuteTaskExecuteEngine { + + public DistroExecuteTaskExecuteEngine() { + super(DistroExecuteTaskExecuteEngine.class.getSimpleName(), Loggers.DISTRO); + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorkersManager.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorkersManager.java deleted file mode 100644 index afd19f0fe..000000000 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorkersManager.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.distributed.distro.task.execute; - -/** - * Distro execute workers manager. - * - * @author xiweng.yy - */ -public final class DistroExecuteWorkersManager { - - private final DistroExecuteWorker[] connectionWorkers; - - public DistroExecuteWorkersManager() { - int workerCount = findWorkerCount(); - connectionWorkers = new DistroExecuteWorker[workerCount]; - for (int mod = 0; mod < workerCount; ++mod) { - connectionWorkers[mod] = new DistroExecuteWorker(mod, workerCount); - } - } - - private int findWorkerCount() { - final int coreCount = Runtime.getRuntime().availableProcessors(); - int result = 1; - while (result < coreCount) { - result <<= 1; - } - return result; - } - - /** - * Dispatch task to worker by tag. - */ - public void dispatch(Object tag, Runnable task) { - DistroExecuteWorker worker = getWorker(tag); - worker.execute(task); - } - - private DistroExecuteWorker getWorker(Object tag) { - int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount(); - return connectionWorkers[idx]; - } - - /** - * Get workers status. - * - * @return workers status string - */ - public String workersStatus() { - StringBuilder sb = new StringBuilder(); - for (DistroExecuteWorker worker : connectionWorkers) { - sb.append(worker.status()).append("\n"); - } - return sb.toString(); - } - - public int workersCount() { - return connectionWorkers.length; - } - -} diff --git a/core/src/test/java/com/alibaba/nacos/core/cluster/MemberUtilsTest.java b/core/src/test/java/com/alibaba/nacos/core/cluster/MemberUtilsTest.java new file mode 100644 index 000000000..490d7153f --- /dev/null +++ b/core/src/test/java/com/alibaba/nacos/core/cluster/MemberUtilsTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.cluster; + +import com.alibaba.nacos.sys.utils.ApplicationUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.core.env.ConfigurableEnvironment; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(MockitoJUnitRunner.class) +public class MemberUtilsTest { + + private static final String IP = "1.1.1.1"; + + private static final int PORT = 8848; + + @Mock + private ConfigurableEnvironment environment; + + private Member originalMember; + + @Before + public void setUp() { + ApplicationUtils.injectEnvironment(environment); + originalMember = buildMember(); + } + + private Member buildMember() { + return Member.builder().ip(IP).port(PORT).state(NodeState.UP).build(); + } + + @Test + public void testIsBasicInfoChangedNoChangeWithoutExtendInfo() { + Member newMember = buildMember(); + assertFalse(MemberUtils.isBasicInfoChanged(newMember, originalMember)); + } + + @Test + public void testIsBasicInfoChangedNoChangeWithExtendInfo() { + Member newMember = buildMember(); + newMember.setExtendVal("test", "test"); + assertFalse(MemberUtils.isBasicInfoChanged(newMember, originalMember)); + } + + @Test + public void testIsBasicInfoChangedForIp() { + Member newMember = buildMember(); + newMember.setIp("1.1.1.2"); + assertTrue(MemberUtils.isBasicInfoChanged(newMember, originalMember)); + } + + @Test + public void testIsBasicInfoChangedForPort() { + Member newMember = buildMember(); + newMember.setPort(PORT + 1); + assertTrue(MemberUtils.isBasicInfoChanged(newMember, originalMember)); + } + + @Test + public void testIsBasicInfoChangedForAddress() { + Member newMember = buildMember(); + newMember.setAddress("test"); + assertTrue(MemberUtils.isBasicInfoChanged(newMember, originalMember)); + } + + @Test + public void testIsBasicInfoChangedForStatus() { + Member newMember = buildMember(); + newMember.setState(NodeState.DOWN); + assertTrue(MemberUtils.isBasicInfoChanged(newMember, originalMember)); + } + + @Test + public void testIsBasicInfoChangedForMoreBasicExtendInfo() { + Member newMember = buildMember(); + newMember.setExtendVal(MemberMetaDataConstants.VERSION, "TEST"); + assertTrue(MemberUtils.isBasicInfoChanged(newMember, originalMember)); + } + + @Test + public void testIsBasicInfoChangedForChangedBasicExtendInfo() { + Member newMember = buildMember(); + newMember.setExtendVal(MemberMetaDataConstants.WEIGHT, "100"); + assertTrue(MemberUtils.isBasicInfoChanged(newMember, originalMember)); + } +} diff --git a/core/src/test/java/com/alibaba/nacos/core/cluster/ServerMemberManagerTest.java b/core/src/test/java/com/alibaba/nacos/core/cluster/ServerMemberManagerTest.java new file mode 100644 index 000000000..9d395b8f4 --- /dev/null +++ b/core/src/test/java/com/alibaba/nacos/core/cluster/ServerMemberManagerTest.java @@ -0,0 +1,111 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.cluster; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.notify.EventPublisher; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.sys.utils.ApplicationUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.core.env.ConfigurableEnvironment; + +import javax.servlet.ServletContext; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ServerMemberManagerTest { + + @Mock + private ConfigurableEnvironment environment; + + @Mock + private ServletContext servletContext; + + @Mock + private EventPublisher eventPublisher; + + private ServerMemberManager serverMemberManager; + + private static final AtomicBoolean EVENT_PUBLISH = new AtomicBoolean(false); + + @Before + public void setUp() throws Exception { + when(environment.getProperty("server.port", Integer.class, 8848)).thenReturn(8848); + when(environment.getProperty("nacos.member-change-event.queue.size", Integer.class, 128)).thenReturn(128); + ApplicationUtils.injectEnvironment(environment); + when(servletContext.getContextPath()).thenReturn(""); + serverMemberManager = new ServerMemberManager(servletContext); + serverMemberManager.updateMember(Member.builder().ip("1.1.1.1").port(8848).state(NodeState.UP).build()); + serverMemberManager.getMemberAddressInfos().add("1.1.1.1:8848"); + NotifyCenter.getPublisherMap().put(MembersChangeEvent.class.getCanonicalName(), eventPublisher); + } + + @After + public void tearDown() throws NacosException { + EVENT_PUBLISH.set(false); + NotifyCenter.deregisterPublisher(MembersChangeEvent.class); + serverMemberManager.shutdown(); + } + + @Test + public void testUpdateNonExistMember() { + Member newMember = Member.builder().ip("1.1.1.2").port(8848).state(NodeState.UP).build(); + assertFalse(serverMemberManager.update(newMember)); + } + + @Test + public void testUpdateDownMember() { + Member newMember = Member.builder().ip("1.1.1.1").port(8848).state(NodeState.DOWN).build(); + assertTrue(serverMemberManager.update(newMember)); + assertFalse(serverMemberManager.getMemberAddressInfos().contains("1.1.1.1:8848")); + verify(eventPublisher).publish(any(MembersChangeEvent.class)); + } + + @Test + public void testUpdateVersionMember() { + Member newMember = Member.builder().ip("1.1.1.1").port(8848).state(NodeState.UP).build(); + newMember.setExtendVal(MemberMetaDataConstants.VERSION, "testVersion"); + assertTrue(serverMemberManager.update(newMember)); + assertTrue(serverMemberManager.getMemberAddressInfos().contains("1.1.1.1:8848")); + assertEquals("testVersion", + serverMemberManager.getServerList().get("1.1.1.1:8848").getExtendVal(MemberMetaDataConstants.VERSION)); + verify(eventPublisher).publish(any(MembersChangeEvent.class)); + } + + @Test + public void testUpdateNonBasicExtendInfoMember() { + Member newMember = Member.builder().ip("1.1.1.1").port(8848).state(NodeState.UP).build(); + newMember.setExtendVal("naming", "test"); + assertTrue(serverMemberManager.update(newMember)); + assertTrue(serverMemberManager.getMemberAddressInfos().contains("1.1.1.1:8848")); + assertEquals("test", serverMemberManager.getServerList().get("1.1.1.1:8848").getExtendVal("naming")); + verify(eventPublisher, never()).publish(any(MembersChangeEvent.class)); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java index a82d3db50..5e13e1643 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java @@ -18,9 +18,9 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined; import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.consistency.DataOperation; -import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask; +import com.alibaba.nacos.naming.consistency.KeyBuilder; import java.util.HashSet; import java.util.Set; @@ -49,15 +49,18 @@ public class DistroHttpCombinedKeyDelayTask extends DistroDelayTask { public void merge(AbstractDelayTask task) { actualResourceKeys.addAll(((DistroHttpCombinedKeyDelayTask) task).getActualResourceKeys()); if (actualResourceKeys.size() >= batchSize) { - this.setLastProcessTime(0); DistroHttpCombinedKey.incrementSequence(); + setLastProcessTime(0); + } else { + setLastProcessTime(task.getLastProcessTime()); } } @Override public DistroKey getDistroKey() { DistroKey taskKey = super.getDistroKey(); - DistroHttpCombinedKey result = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, taskKey.getTargetServer()); + DistroHttpCombinedKey result = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, + taskKey.getTargetServer()); result.setResourceKey(taskKey.getResourceKey()); result.getActualResourceTypes().addAll(actualResourceKeys); return result; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java index 9acd566fc..2903ee092 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined; +import com.alibaba.nacos.common.task.AbstractExecuteTask; import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine; @@ -31,7 +32,7 @@ import com.alibaba.nacos.naming.misc.Loggers; * * @author xiweng.yy */ -public class DistroHttpCombinedKeyExecuteTask implements Runnable { +public class DistroHttpCombinedKeyExecuteTask extends AbstractExecuteTask { private final GlobalConfig globalConfig; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java index 43264e0a0..d4bb5b57d 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder; @@ -50,12 +50,12 @@ public class DistroHttpDelayTaskProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); DistroHttpCombinedKeyExecuteTask executeTask = new DistroHttpCombinedKeyExecuteTask(globalConfig, distroTaskEngineHolder.getDelayTaskExecuteEngine(), distroKey, distroDelayTask.getAction()); - distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, executeTask); + distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, executeTask); return true; } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java index 180c93141..5df9462c4 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java @@ -175,7 +175,6 @@ public class OperatorController { * @param request request * @return metrics information */ - @Secured(resource = "naming/metrics", action = ActionTypes.READ) @GetMapping("/metrics") public ObjectNode metrics(HttpServletRequest request) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java index 47365be17..e5d82c89a 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java @@ -40,7 +40,14 @@ import com.alibaba.nacos.naming.push.PushService; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.collect.Sets; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import javax.annotation.PostConstruct; +import javax.annotation.Resource; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -56,14 +63,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.annotation.PostConstruct; -import javax.annotation.Resource; - -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; /** * Core manager storing all services in Nacos. diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/SubscribeManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/SubscribeManager.java index 6dd6ec921..8079d1fa9 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/SubscribeManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/SubscribeManager.java @@ -110,7 +110,7 @@ public class SubscribeManager { + UtilsAndCommons.NACOS_NAMING_CONTEXT + SUBSCRIBER_ON_SYNC_URL, new ArrayList<>(), paramValues); - if (!result.ok()) { + if (result.ok()) { Subscribers subscribers = JacksonUtils.toObj(result.getData(), Subscribers.class); subscriberList.addAll(subscribers.getSubscribers()); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java index 552fd32a7..186a41ce0 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java @@ -66,6 +66,7 @@ public class HealthCheckReactor { } try { scheduledFuture.cancel(true); + futureMap.remove(task.taskKey()); } catch (Exception e) { Loggers.EVT_LOG.error("[CANCEL-CHECK] cancel failed!", e); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java index 5b762d4c1..bb08c2ac3 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/NamingProxy.java @@ -146,7 +146,7 @@ public class NamingProxy { } throw new IOException("failed to req API: " + "http://" + server + ApplicationUtils.getContextPath() - + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL + ". code: " + result.getCode() + " msg: " + + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: " + result.getMessage()); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/misc/NetUtils.java b/naming/src/main/java/com/alibaba/nacos/naming/misc/NetUtils.java index 7cd328d28..caaa48e1e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/misc/NetUtils.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/misc/NetUtils.java @@ -32,7 +32,7 @@ public class NetUtils { * @return local server address */ public static String localServer() { - return InetUtils.getSelfIp() + UtilsAndCommons.IP_PORT_SPLITER + ApplicationUtils.getPort(); + return InetUtils.getSelfIP() + UtilsAndCommons.IP_PORT_SPLITER + ApplicationUtils.getPort(); } /** diff --git a/naming/src/test/java/com/alibaba/nacos/naming/controllers/ClusterControllerTest.java b/naming/src/test/java/com/alibaba/nacos/naming/controllers/ClusterControllerTest.java index 3130f63e1..2e18a0a7f 100644 --- a/naming/src/test/java/com/alibaba/nacos/naming/controllers/ClusterControllerTest.java +++ b/naming/src/test/java/com/alibaba/nacos/naming/controllers/ClusterControllerTest.java @@ -52,6 +52,8 @@ public class ClusterControllerTest extends BaseTest { @Before public void before() { super.before(); + mockInjectSwitchDomain(); + mockInjectDistroMapper(); mockmvc = MockMvcBuilders.standaloneSetup(clusterController).build(); } @@ -87,17 +89,44 @@ public class ClusterControllerTest extends BaseTest { } @Test - public void testUpdateInvalidType() throws Exception { - expectedException.expectCause(isA(NacosException.class)); - expectedException.expectMessage("unknown health check type:{\"type\":\"123\"}"); + public void testUpdateHealthCheckerType() throws Exception { + Service service = new Service(TEST_SERVICE_NAME); service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID); when(serviceManager.getService(Constants.DEFAULT_NAMESPACE_ID, TEST_SERVICE_NAME)).thenReturn(service); + MockHttpServletRequestBuilder builder = MockMvcRequestBuilders .put(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/cluster").param("clusterName", TEST_CLUSTER_NAME) .param("serviceName", TEST_SERVICE_NAME).param("healthChecker", "{\"type\":\"123\"}") .param("checkPort", "1").param("useInstancePort4Check", "true"); mockmvc.perform(builder); + + Assert.assertEquals("NONE", service.getClusterMap().get(TEST_CLUSTER_NAME).getHealthChecker().getType()); + + MockHttpServletRequestBuilder builder2 = MockMvcRequestBuilders + .put(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/cluster").param("clusterName", TEST_CLUSTER_NAME) + .param("serviceName", TEST_SERVICE_NAME).param("healthChecker", "{\"type\":\"TCP\"}") + .param("checkPort", "1").param("useInstancePort4Check", "true"); + mockmvc.perform(builder2); + + Assert.assertEquals("TCP", service.getClusterMap().get(TEST_CLUSTER_NAME).getHealthChecker().getType()); + + MockHttpServletRequestBuilder builder3 = MockMvcRequestBuilders + .put(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/cluster").param("clusterName", TEST_CLUSTER_NAME) + .param("serviceName", TEST_SERVICE_NAME).param("healthChecker", "{\"type\":\"HTTP\"}") + .param("checkPort", "1").param("useInstancePort4Check", "true"); + mockmvc.perform(builder3); + + Assert.assertEquals("HTTP", service.getClusterMap().get(TEST_CLUSTER_NAME).getHealthChecker().getType()); + + MockHttpServletRequestBuilder builder4 = MockMvcRequestBuilders + .put(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/cluster").param("clusterName", TEST_CLUSTER_NAME) + .param("serviceName", TEST_SERVICE_NAME).param("healthChecker", "{\"type\":\"MYSQL\"}") + .param("checkPort", "1").param("useInstancePort4Check", "true"); + mockmvc.perform(builder4); + + Assert.assertEquals("MYSQL", service.getClusterMap().get(TEST_CLUSTER_NAME).getHealthChecker().getType()); + } } diff --git a/pom.xml b/pom.xml index cc44181a8..5e32b22c4 100644 --- a/pom.xml +++ b/pom.xml @@ -126,7 +126,7 @@ 1.3.0 - 2.1.16.RELEASE + 2.1.17.RELEASE 3.0 2.6 3.4 diff --git a/sys/src/main/java/com/alibaba/nacos/sys/utils/ApplicationUtils.java b/sys/src/main/java/com/alibaba/nacos/sys/utils/ApplicationUtils.java index 9b744b669..3447dc52a 100644 --- a/sys/src/main/java/com/alibaba/nacos/sys/utils/ApplicationUtils.java +++ b/sys/src/main/java/com/alibaba/nacos/sys/utils/ApplicationUtils.java @@ -346,7 +346,7 @@ public class ApplicationUtils implements ApplicationContextInitializer PREFERRED_NETWORKS = new ArrayList(); @@ -82,55 +83,56 @@ public class InetUtils { Runnable ipAutoRefresh = new Runnable() { @Override public void run() { - String nacosIp = System.getProperty(NACOS_SERVER_IP); - if (StringUtils.isBlank(nacosIp)) { - nacosIp = ApplicationUtils.getProperty(IP_ADDRESS); + String nacosIP = System.getProperty(NACOS_SERVER_IP); + if (StringUtils.isBlank(nacosIP)) { + nacosIP = ApplicationUtils.getProperty(IP_ADDRESS); } - if (!StringUtils.isBlank(nacosIp) && !isIP(nacosIp)) { - throw new RuntimeException("nacos address " + nacosIp + " is not ip"); + boolean illegalIP = !StringUtils.isBlank(nacosIP) && !(isIP(nacosIP) || isDomain(nacosIP)); + if (illegalIP) { + throw new RuntimeException("nacos address " + nacosIP + " is not ip"); } - String tmpSelfIp = nacosIp; - if (StringUtils.isBlank(tmpSelfIp)) { - preferHostnameOverIp = Boolean.getBoolean(SYSTEM_PREFER_HOSTNAME_OVER_IP); + String tmpSelfIP = nacosIP; + if (StringUtils.isBlank(tmpSelfIP)) { + preferHostnameOverIP = Boolean.getBoolean(SYSTEM_PREFER_HOSTNAME_OVER_IP); - if (!preferHostnameOverIp) { - preferHostnameOverIp = Boolean + if (!preferHostnameOverIP) { + preferHostnameOverIP = Boolean .parseBoolean(ApplicationUtils.getProperty(PREFER_HOSTNAME_OVER_IP)); } - if (preferHostnameOverIp) { + if (preferHostnameOverIP) { InetAddress inetAddress; try { inetAddress = InetAddress.getLocalHost(); if (inetAddress.getHostName().equals(inetAddress.getCanonicalHostName())) { - tmpSelfIp = inetAddress.getHostName(); + tmpSelfIP = inetAddress.getHostName(); } else { - tmpSelfIp = inetAddress.getCanonicalHostName(); + tmpSelfIP = inetAddress.getCanonicalHostName(); } } catch (UnknownHostException ignore) { LOG.warn("Unable to retrieve localhost"); } } else { - tmpSelfIp = Objects.requireNonNull(findFirstNonLoopbackAddress()).getHostAddress(); + tmpSelfIP = Objects.requireNonNull(findFirstNonLoopbackAddress()).getHostAddress(); } } - if (!Objects.equals(selfIp, tmpSelfIp) && Objects.nonNull(selfIp)) { + if (!Objects.equals(selfIP, tmpSelfIP) && Objects.nonNull(selfIP)) { IPChangeEvent event = new IPChangeEvent(); - event.setOldIp(selfIp); - event.setNewIp(tmpSelfIp); + event.setOldIP(selfIP); + event.setNewIP(tmpSelfIP); NotifyCenter.publishEvent(event); } - selfIp = tmpSelfIp; + selfIP = tmpSelfIP; } }; ipAutoRefresh.run(); } - public static String getSelfIp() { - return selfIp; + public static String getSelfIP() { + return selfIP; } /** @@ -219,35 +221,50 @@ public class InetUtils { return matcher.matches(); } + /** + * juege str is right domain. + * + * @param str nacosIP + * @return nacosIP is domain + */ + public static boolean isDomain(String str) { + InetSocketAddress address = new InetSocketAddress(str, 0); + boolean unResolved = address.isUnresolved(); + if (unResolved) { + LOG.warn("the domain: '" + str + "' can not be resolved"); + } + return !unResolved; + } + /** * {@link com.alibaba.nacos.core.cluster.ServerMemberManager} is listener. */ @SuppressWarnings({"PMD.ClassNamingShouldBeCamelRule", "checkstyle:AbbreviationAsWordInName"}) public static class IPChangeEvent extends SlowEvent { - private String oldIp; + private String oldIP; - private String newIp; + private String newIP; - public String getOldIp() { - return oldIp; + public String getOldIP() { + return oldIP; } - public void setOldIp(String oldIp) { - this.oldIp = oldIp; + public void setOldIP(String oldIP) { + this.oldIP = oldIP; } - public String getNewIp() { - return newIp; + public String getNewIP() { + return newIP; } - public void setNewIp(String newIp) { - this.newIp = newIp; + public void setNewIP(String newIP) { + this.newIP = newIP; } @Override public String toString() { - return "IPChangeEvent{" + "oldIp='" + oldIp + '\'' + ", newIp='" + newIp + '\'' + '}'; + return "IPChangeEvent{" + "oldIP='" + oldIP + '\'' + ", newIP='" + newIP + '\'' + '}'; } } diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigDerbyRaft_DITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigDerbyRaft_DITCase.java index 6afce7ee6..95e41ce61 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigDerbyRaft_DITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigDerbyRaft_DITCase.java @@ -357,7 +357,7 @@ public class ConfigDerbyRaft_DITCase extends BaseClusterTest { // transfer leader to ip:8807 Map transfer = new HashMap<>(); - transfer.put(JRaftConstants.TRANSFER_LEADER, InetUtils.getSelfIp() + ":9847"); + transfer.put(JRaftConstants.TRANSFER_LEADER, InetUtils.getSelfIP() + ":9847"); RestResult result = protocol7.execute(transfer); System.out.println(result); Assert.assertTrue(result.ok()); @@ -372,7 +372,7 @@ public class ConfigDerbyRaft_DITCase extends BaseClusterTest { // transfer leader to ip:8808 transfer = new HashMap<>(); - transfer.put(JRaftConstants.TRANSFER_LEADER, InetUtils.getSelfIp() + ":9848"); + transfer.put(JRaftConstants.TRANSFER_LEADER, InetUtils.getSelfIP() + ":9848"); result = protocol8.execute(transfer); System.out.println(result); Assert.assertTrue(result.ok()); @@ -387,7 +387,7 @@ public class ConfigDerbyRaft_DITCase extends BaseClusterTest { // transfer leader to ip:8809 transfer = new HashMap<>(); - transfer.put(JRaftConstants.TRANSFER_LEADER, InetUtils.getSelfIp() + ":9849"); + transfer.put(JRaftConstants.TRANSFER_LEADER, InetUtils.getSelfIP() + ":9849"); result = protocol9.execute(transfer); System.out.println(result); Assert.assertTrue(result.ok()); diff --git a/test/src/test/java/com/alibaba/nacos/test/core/BaseClusterTest.java b/test/src/test/java/com/alibaba/nacos/test/core/BaseClusterTest.java index cf1c2fead..def842773 100644 --- a/test/src/test/java/com/alibaba/nacos/test/core/BaseClusterTest.java +++ b/test/src/test/java/com/alibaba/nacos/test/core/BaseClusterTest.java @@ -90,7 +90,7 @@ public class BaseClusterTest extends HttpClient4Test { static { System.getProperties().setProperty("nacos.core.auth.enabled", "false"); System.getProperties().setProperty("embeddedStorage", "true"); - String ip = InetUtils.getSelfIp(); + String ip = InetUtils.getSelfIP(); clusterInfo = "nacos.member.list=" + ip + ":8847," + ip + ":8848," + ip + ":8849"; NotifyCenter.registerSubscriber(new Subscriber() { diff --git a/test/src/test/java/com/alibaba/nacos/test/core/InetUtils_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/core/InetUtils_ITCase.java index 1e912809e..f1c520977 100644 --- a/test/src/test/java/com/alibaba/nacos/test/core/InetUtils_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/core/InetUtils_ITCase.java @@ -39,7 +39,7 @@ public class InetUtils_ITCase { static { System.setProperty("nacos.core.inet.auto-refresh", "3"); // For load InetUtils.class - InetUtils.getSelfIp(); + InetUtils.getSelfIP(); } @Test @@ -53,10 +53,10 @@ public class InetUtils_ITCase { Subscriber subscribe = new Subscriber() { @Override public void onEvent(InetUtils.IPChangeEvent event) { - if (Objects.nonNull(event.getOldIp())) { + if (Objects.nonNull(event.getOldIP())) { try { System.out.println(event); - reference.set(event.getNewIp()); + reference.set(event.getNewIP()); } finally { latch.countDown(); @@ -74,7 +74,7 @@ public class InetUtils_ITCase { latch.await(10_000L, TimeUnit.MILLISECONDS); Assert.assertEquals(testIp, reference.get()); - Assert.assertEquals(testIp, InetUtils.getSelfIp()); + Assert.assertEquals(testIp, InetUtils.getSelfIP()); } } diff --git a/test/src/test/java/com/alibaba/nacos/test/core/cluster/MemberLookup_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/core/cluster/MemberLookup_ITCase.java index 033343c99..08a04c3a1 100644 --- a/test/src/test/java/com/alibaba/nacos/test/core/cluster/MemberLookup_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/core/cluster/MemberLookup_ITCase.java @@ -69,7 +69,7 @@ public class MemberLookup_ITCase extends BaseTest { DiskUtils.forceMkdir(Paths.get(path, "conf").toString()); File file = Paths.get(path, "conf", name).toFile(); DiskUtils.touch(file); - String ip = InetUtils.getSelfIp(); + String ip = InetUtils.getSelfIP(); DiskUtils.writeFile(file, (ip + ":8848," + ip + ":8847," + ip + ":8849").getBytes( StandardCharsets.UTF_8), false); diff --git a/test/src/test/java/com/alibaba/nacos/test/naming/NamingMaintainService_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/naming/NamingMaintainService_ITCase.java index 047d40c23..d59dd9e59 100644 --- a/test/src/test/java/com/alibaba/nacos/test/naming/NamingMaintainService_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/naming/NamingMaintainService_ITCase.java @@ -29,6 +29,7 @@ import com.alibaba.nacos.api.selector.ExpressionSelector; import com.alibaba.nacos.api.selector.NoneSelector; import com.alibaba.nacos.sys.utils.ApplicationUtils; import com.alibaba.nacos.test.BaseTest; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -163,5 +164,10 @@ public class NamingMaintainService_ITCase extends BaseTest { Assert.assertTrue(namingMaintainService.deleteService(serviceName)); } - + + @After + public void tearDown() throws NacosException { + namingMaintainService.shutDown(); + namingService.shutDown(); + } }