From 82961fe72189447195f423092635f723754d19b0 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Wed, 5 Jun 2019 13:35:01 +0800 Subject: [PATCH 01/11] feat: --- .../java/com/alibaba/nacos/test/config/ConfigAPI_ITCase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_ITCase.java index 103fce8dd..dd8a7a53b 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_ITCase.java @@ -70,7 +70,8 @@ public class ConfigAPI_ITCase { @Before public void setUp() throws Exception { Properties properties = new Properties(); - properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1"+":"+port); + properties.put(PropertyKeyConst.ENDPOINT, "127.0.0.1"); + properties.put(PropertyKeyConst.ENDPOINT_PORT, "8080"); iconfig = NacosFactory.createConfigService(properties); agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); From 72a3a9b2132e51472aa62f3b206e29fb04af50f1 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Wed, 5 Jun 2019 22:31:09 +0800 Subject: [PATCH 02/11] feat: --- .../alibaba/nacos/api/PropertyKeyConst.java | 2 + .../client/config/NacosConfigService.java | 2 +- .../client/config/http/ServerHttpAgent.java | 30 ++++++-- .../client/config/impl/ClientWorker.java | 48 +++++++++++-- .../client/config/impl/ServerListManager.java | 70 +++++++++++++++++++ .../com/alibaba/nacos/client/RandomTest.java | 49 +++++++++++++ .../server/service/LongPollingService.java | 1 + .../nacos/test/config/ConfigAPI_ITCase.java | 3 +- 8 files changed, 189 insertions(+), 16 deletions(-) create mode 100644 client/src/test/java/com/alibaba/nacos/client/RandomTest.java diff --git a/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java b/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java index 96c906452..29b0672b6 100644 --- a/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java +++ b/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java @@ -44,6 +44,8 @@ public class PropertyKeyConst { public final static String ENCODE = "encode"; + public final static String TIMEOUT = "timeout"; + public final static String NAMING_LOAD_CACHE_AT_START = "namingLoadCacheAtStart"; public final static String NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount"; diff --git a/client/src/main/java/com/alibaba/nacos/client/config/NacosConfigService.java b/client/src/main/java/com/alibaba/nacos/client/config/NacosConfigService.java index 9569b9570..9e753a0dd 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/NacosConfigService.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/NacosConfigService.java @@ -81,7 +81,7 @@ public class NacosConfigService implements ConfigService { initNamespace(properties); agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); agent.start(); - worker = new ClientWorker(agent, configFilterChainManager); + worker = new ClientWorker(agent, configFilterChainManager, properties); } private void initNamespace(Properties properties) { diff --git a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java index f26135c12..207eb5c2a 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java @@ -42,6 +42,8 @@ import java.util.Date; import java.util.List; import java.util.Properties; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; /** * Server Agent @@ -106,35 +108,49 @@ public class ServerHttpAgent implements HttpAgent { long readTimeoutMs) throws IOException { final long endTime = System.currentTimeMillis() + readTimeoutMs; boolean isSSL = false; + do { + + String currentServerAddr = serverListMgr.getCurrentServerAddr(); + try { List newHeaders = getSpasHeaders(paramValues); if (headers != null) { newHeaders.addAll(headers); } + HttpResult result = HttpSimpleClient.httpPost( - getUrl(serverListMgr.getCurrentServerAddr(), path, isSSL), newHeaders, paramValues, encoding, + getUrl(currentServerAddr, path, isSSL), newHeaders, paramValues, encoding, readTimeoutMs, isSSL); if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR || result.code == HttpURLConnection.HTTP_BAD_GATEWAY || result.code == HttpURLConnection.HTTP_UNAVAILABLE) { LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}", - serverListMgr.getCurrentServerAddr(), result.code); + currentServerAddr, result.code); } else { + serverListMgr.delErrServerRecord(currentServerAddr); return result; } } catch (ConnectException ce) { - LOGGER.error("[NACOS ConnectException] currentServerAddr: {}", serverListMgr.getCurrentServerAddr()); - serverListMgr.refreshCurrentServerAddr(); + LOGGER.error("[NACOS ConnectException] currentServerAddr: {}", currentServerAddr); + serverListMgr.addErrServerRecord(currentServerAddr); + serverListMgr.refreshCurrentServerAddr(currentServerAddr); } catch (SocketTimeoutException stoe) { - LOGGER.error("[NACOS SocketTimeoutException]", "currentServerAddr: {}", - serverListMgr.getCurrentServerAddr()); + LOGGER.error("[NACOS SocketTimeoutException] currentServerAddr: {}, err : {}", + currentServerAddr, stoe.getMessage()); + serverListMgr.addErrServerRecord(currentServerAddr); serverListMgr.refreshCurrentServerAddr(); } catch (IOException ioe) { - LOGGER.error("[NACOS IOException] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe); + LOGGER.error("[NACOS IOException] currentServerAddr: " + currentServerAddr, ioe); throw ioe; } + // 保证依旧可以重试 + if (serverListMgr.isMaxRetry()) { + LOGGER.error("The maximum number of tolerable server reconnection errors has been reached"); + return null; + } + } while (System.currentTimeMillis() <= endTime); LOGGER.error("no available server"); diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index 7490404f6..7c27cd759 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -15,6 +15,7 @@ */ package com.alibaba.nacos.client.config.impl; +import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; @@ -29,6 +30,7 @@ import com.alibaba.nacos.client.monitor.MetricsMonitor; import com.alibaba.nacos.client.utils.LogUtils; import com.alibaba.nacos.client.utils.ParamUtil; import com.alibaba.nacos.client.utils.StringUtils; +import com.alibaba.nacos.client.utils.TemplateUtils; import org.slf4j.Logger; import java.io.File; @@ -42,6 +44,8 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -341,7 +345,10 @@ public class ClientWorker { List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) { List params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); - long timeout = TimeUnit.SECONDS.toMillis(30L); + + // Take a custom timeout parameter to circumvent network errors caused by network latency + + long timeout = TimeUnit.SECONDS.toMillis(this.timeout); List headers = new ArrayList(2); headers.add("Long-Pulling-Timeout"); @@ -361,6 +368,11 @@ public class ClientWorker { HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); + // The maximum number of tolerable server reconnection errors has been reached + if (result == null) { + return Collections.emptyList(); + } + if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); @@ -413,10 +425,14 @@ public class ClientWorker { } @SuppressWarnings("PMD.ThreadPoolCreationRule") - public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager) { + public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; + // Initialize the timeout parameter + + init(properties); + executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -427,7 +443,7 @@ public class ClientWorker { } }); - executorService = Executors.newCachedThreadPool(new ThreadFactory() { + executorService = Executors.newScheduledThreadPool(Integer.MAX_VALUE, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); @@ -448,6 +464,20 @@ public class ClientWorker { }, 1L, 10L, TimeUnit.MILLISECONDS); } + private void init(Properties properties) { + try { + String timeoutStr = TemplateUtils.stringBlankAndThenExecute(properties.getProperty(PropertyKeyConst.TIMEOUT), new Callable() { + @Override + public String call() throws Exception { + return String.valueOf(Constants.SO_TIMEOUT); + } + }); + timeout = Long.parseLong(timeoutStr); + } catch (NumberFormatException nfe) { + timeout = Constants.SO_TIMEOUT; + } + } + class LongPollingRunnable implements Runnable { private int taskId; @@ -507,10 +537,14 @@ public class ClientWorker { } } inInitializingCacheList.clear(); + executorService.execute(this); } catch (Throwable e) { LOGGER.error("longPolling error", e); - } finally { - executorService.execute(this); + + // If the server fails, punish the task execution and delay the next task execution + // to avoid the client sending a large number of requests when the server cannot + // respond to the request + executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } } @@ -526,7 +560,7 @@ public class ClientWorker { } final ScheduledExecutorService executor; - final ExecutorService executorService; + final ScheduledExecutorService executorService; /** * groupKey -> cacheData */ @@ -536,5 +570,7 @@ public class ClientWorker { HttpAgent agent; ConfigFilterChainManager configFilterChainManager; private boolean isHealthServer = true; + private long timeout; private double currentLongingTaskCount = 0; + private long taskPenaltyTime = 2000; } diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java index 06a3ee065..59feb32cd 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java @@ -28,7 +28,9 @@ import java.io.StringReader; import java.net.HttpURLConnection; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * Serverlist Manager @@ -43,6 +45,8 @@ public class ServerListManager { isFixed = false; isStarted = false; name = DEFAULT_NAME; + + errServerSet = new CopyOnWriteArraySet(); } public ServerListManager(List fixed) { @@ -69,6 +73,9 @@ public class ServerListManager { name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()])) + "-" + namespace; } + + errServerSet = new CopyOnWriteArraySet(); + } public ServerListManager(String host, int port) { @@ -76,6 +83,9 @@ public class ServerListManager { isStarted = false; name = CUSTOM_NAME + "-" + host + "-" + port; addressServerUrl = String.format("http://%s:%d/%s/%s", host, port, contentPath, serverListName); + + errServerSet = new CopyOnWriteArraySet(); + } public ServerListManager(String endpoint) throws NacosException { @@ -106,6 +116,9 @@ public class ServerListManager { addressServerUrl = String.format("http://%s:%d/%s/%s?namespace=%s", endpoint, endpointPort, contentPath, serverListName, namespace); } + + errServerSet = new CopyOnWriteArraySet(); + } public ServerListManager(Properties properties) throws NacosException { @@ -151,6 +164,9 @@ public class ServerListManager { contentPath, serverListName, namespace); } } + + errServerSet = new CopyOnWriteArraySet(); + } private void initParam(Properties properties) { @@ -225,6 +241,15 @@ public class ServerListManager { return new ServerAddressIterator(serverUrls); } + // Consider adding a penalty mechanism, with the offending node coming last + + Iterator iterator(String errServerIp) { + if (serverUrls.isEmpty()) { + LOGGER.error("[{}] [iterator-serverlist] No server address defined!", name); + } + return new ServerAddressIterator(serverUrls, errServerIp); + } + class GetServerListTask implements Runnable { final String url; @@ -328,6 +353,12 @@ public class ServerListManager { currentServerAddr = iterator().next(); } + // Consider adding a penalty mechanism, with the offending node coming last + + public void refreshCurrentServerAddr(String errServerIp) { + currentServerAddr = iterator(errServerIp).next(); + } + public String getCurrentServerAddr() { if (StringUtils.isBlank(currentServerAddr)) { currentServerAddr = iterator().next(); @@ -335,6 +366,27 @@ public class ServerListManager { return currentServerAddr; } + public void addErrServerRecord(String errServerIp) { + errServerSet.add(errServerIp); + if (errServerSet.size() == serverUrls.size()) { + retryCnt.incrementAndGet(); + // 对错误 errServerSet 进行清空,进行新一轮的重试 + errServerSet.clear(); + } + } + + public void delErrServerRecord(String errServerIp) { + errServerSet.remove(errServerIp); + // 如果当前的 errServerSet.size() 与 serverUrls.size() 的比例介于 (0, 1/2] 之间,清除重试次数信息 + if (errServerSet.size() * 2 <= serverUrls.size()) { + retryCnt.set(0); + } + } + + public boolean isMaxRetry() { + return retryCnt.get() >= 4; + } + public String getContentPath() { return contentPath; } @@ -374,6 +426,9 @@ public class ServerListManager { private String serverListName = ParamUtil.getDefaultNodesPath(); volatile List serverUrls = new ArrayList(); + private final CopyOnWriteArraySet errServerSet; + private final AtomicInteger retryCnt = new AtomicInteger(0); + private volatile String currentServerAddr; public String serverPort = ParamUtil.getDefaultServerPort(); @@ -426,6 +481,21 @@ class ServerAddressIterator implements Iterator { iter = sorted.iterator(); } + // Consider adding a penalty mechanism, with the offending node coming last + + public ServerAddressIterator(List source, String errServerIp) { + sorted = new ArrayList(); + for (String address : source) { + if (address.equals(errServerIp)) { + continue; + } + sorted.add(new RandomizedServerAddress(address)); + } + Collections.sort(sorted); + sorted.add(new RandomizedServerAddress(errServerIp)); + iter = sorted.iterator(); + } + public boolean hasNext() { return iter.hasNext(); } diff --git a/client/src/test/java/com/alibaba/nacos/client/RandomTest.java b/client/src/test/java/com/alibaba/nacos/client/RandomTest.java new file mode 100644 index 000000000..39139fad6 --- /dev/null +++ b/client/src/test/java/com/alibaba/nacos/client/RandomTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +/** + * @author liaochuntao + * @date 2019-06-05 20:55 + **/ +public class RandomTest { + + private Random random; + + @Before + public void before() { + random = new Random(); + } + + @Test + public void test() throws InterruptedException { + long[] tmp = new long[10]; + for (int i = 0; i < 10; i++) { + tmp[i] = random.nextInt(Integer.MAX_VALUE); + } + Thread.sleep(100); + for (int i = 0; i < 10; i++) { + System.out.println(tmp[i] == random.nextInt(Integer.MAX_VALUE)); + } + } + +} diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java index 42f277357..35dbbfb51 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java @@ -406,6 +406,7 @@ public class LongPollingService extends AbstractEventListener { } } + //TODO 为什么任务延迟执行时间设置为超时时间?会不会导致 read timeout }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this); diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_ITCase.java index dd8a7a53b..103fce8dd 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_ITCase.java @@ -70,8 +70,7 @@ public class ConfigAPI_ITCase { @Before public void setUp() throws Exception { Properties properties = new Properties(); - properties.put(PropertyKeyConst.ENDPOINT, "127.0.0.1"); - properties.put(PropertyKeyConst.ENDPOINT_PORT, "8080"); + properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1"+":"+port); iconfig = NacosFactory.createConfigService(properties); agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); From 1f102f52f463ce46401906d47934bc5336419997 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Thu, 6 Jun 2019 20:38:24 +0800 Subject: [PATCH 03/11] feat: --- .../alibaba/nacos/api/PropertyKeyConst.java | 4 +- .../alibaba/nacos/api/common/Constants.java | 10 +++ .../client/config/http/ServerHttpAgent.java | 48 +++++++------- .../client/config/impl/ClientWorker.java | 63 +++++++++---------- .../client/config/impl/ServerListManager.java | 62 ++++-------------- .../client/naming/NacosNamingService.java | 3 +- .../server/service/LongPollingService.java | 5 +- 7 files changed, 81 insertions(+), 114 deletions(-) diff --git a/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java b/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java index 29b0672b6..5ba03e717 100644 --- a/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java +++ b/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java @@ -44,7 +44,9 @@ public class PropertyKeyConst { public final static String ENCODE = "encode"; - public final static String TIMEOUT = "timeout"; + public final static String CONFIG_LONG_POLL_TIMEOUT = "config.long-poll.timeout"; + + public final static String CONFIG_RETRY_TIME = "config.retry.time"; public final static String NAMING_LOAD_CACHE_AT_START = "namingLoadCacheAtStart"; diff --git a/api/src/main/java/com/alibaba/nacos/api/common/Constants.java b/api/src/main/java/com/alibaba/nacos/api/common/Constants.java index 3ec47eda8..cee264e07 100644 --- a/api/src/main/java/com/alibaba/nacos/api/common/Constants.java +++ b/api/src/main/java/com/alibaba/nacos/api/common/Constants.java @@ -88,6 +88,16 @@ public class Constants { */ public static final int SO_TIMEOUT = 60000; + /** + * millisecond + */ + public static final int CONFIG_LONG_POLL_TIMEOUT = 5000; + + /** + * millisecond + */ + public static final int CONFIG_RETRY_TIME = 2000; + /** * millisecond */ diff --git a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java index 207eb5c2a..6bef3841e 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java @@ -42,8 +42,6 @@ import java.util.Date; import java.util.List; import java.util.Properties; import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicInteger; /** * Server Agent @@ -88,13 +86,13 @@ public class ServerHttpAgent implements HttpAgent { return result; } } catch (ConnectException ce) { - LOGGER.error("[NACOS ConnectException] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); + LOGGER.error("[NACOS ConnectException httpGet] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); serverListMgr.refreshCurrentServerAddr(); } catch (SocketTimeoutException stoe) { - LOGGER.error("[NACOS SocketTimeoutException] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); + LOGGER.error("[NACOS SocketTimeoutException httpGet] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); serverListMgr.refreshCurrentServerAddr(); } catch (IOException ioe) { - LOGGER.error("[NACOS IOException] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe); + LOGGER.error("[NACOS IOException httpGet] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe); throw ioe; } } while (System.currentTimeMillis() <= endTime); @@ -109,9 +107,10 @@ public class ServerHttpAgent implements HttpAgent { final long endTime = System.currentTimeMillis() + readTimeoutMs; boolean isSSL = false; - do { + String currentServerAddr = serverListMgr.getCurrentServerAddr(); + int maxRetry = 3; - String currentServerAddr = serverListMgr.getCurrentServerAddr(); + do { try { List newHeaders = getSpasHeaders(paramValues); @@ -125,36 +124,35 @@ public class ServerHttpAgent implements HttpAgent { if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR || result.code == HttpURLConnection.HTTP_BAD_GATEWAY || result.code == HttpURLConnection.HTTP_UNAVAILABLE) { - LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}", + LOGGER.error("[NACOS ConnectException httpPost] currentServerAddr: {}, httpCode: {}", currentServerAddr, result.code); } else { - serverListMgr.delErrServerRecord(currentServerAddr); return result; } } catch (ConnectException ce) { - LOGGER.error("[NACOS ConnectException] currentServerAddr: {}", currentServerAddr); - serverListMgr.addErrServerRecord(currentServerAddr); - serverListMgr.refreshCurrentServerAddr(currentServerAddr); + LOGGER.error("[NACOS ConnectException httpPost] currentServerAddr: {}", currentServerAddr); } catch (SocketTimeoutException stoe) { - LOGGER.error("[NACOS SocketTimeoutException] currentServerAddr: {}, err : {}", + LOGGER.error("[NACOS SocketTimeoutException httpPost] currentServerAddr: {}, err : {}", currentServerAddr, stoe.getMessage()); - serverListMgr.addErrServerRecord(currentServerAddr); - serverListMgr.refreshCurrentServerAddr(); } catch (IOException ioe) { - LOGGER.error("[NACOS IOException] currentServerAddr: " + currentServerAddr, ioe); + LOGGER.error("[NACOS IOException httpPost] currentServerAddr: " + currentServerAddr, ioe); throw ioe; } - // 保证依旧可以重试 - if (serverListMgr.isMaxRetry()) { - LOGGER.error("The maximum number of tolerable server reconnection errors has been reached"); - return null; + if (serverListMgr.hasNextServer()) { + currentServerAddr = serverListMgr.getNextServerAddr(); + } else { + maxRetry --; + if (maxRetry < 0) { + throw new ConnectException("The maximum number of tolerable server reconnection errors has been reached"); + } + serverListMgr.refreshCurrentServerAddr(); } } while (System.currentTimeMillis() <= endTime); - LOGGER.error("no available server"); - throw new ConnectException("no available server"); + LOGGER.error("no available server, currentServerAddr : {}", currentServerAddr); + throw new ConnectException("no available server, currentServerAddr : " + currentServerAddr); } @Override @@ -180,13 +178,13 @@ public class ServerHttpAgent implements HttpAgent { return result; } } catch (ConnectException ce) { - LOGGER.error("[NACOS ConnectException] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); + LOGGER.error("[NACOS ConnectException httpDelete] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); serverListMgr.refreshCurrentServerAddr(); } catch (SocketTimeoutException stoe) { - LOGGER.error("[NACOS SocketTimeoutException] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); + LOGGER.error("[NACOS SocketTimeoutException httpDelete] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); serverListMgr.refreshCurrentServerAddr(); } catch (IOException ioe) { - LOGGER.error("[NACOS IOException] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe); + LOGGER.error("[NACOS IOException httpDelete] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe); throw ioe; } diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index 7c27cd759..4bc59e039 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -31,6 +31,7 @@ import com.alibaba.nacos.client.utils.LogUtils; import com.alibaba.nacos.client.utils.ParamUtil; import com.alibaba.nacos.client.utils.StringUtils; import com.alibaba.nacos.client.utils.TemplateUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.slf4j.Logger; import java.io.File; @@ -45,12 +46,15 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static com.alibaba.nacos.api.common.Constants.LINE_SEPARATOR; @@ -147,7 +151,7 @@ public class ClientWorker { //reset so that server not hang this check cache.setInitializing(true); } else { - int taskId = cacheMap.get().size() / (int)ParamUtil.getPerTaskConfigSize(); + int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize(); cache.setTaskId(taskId); } @@ -303,9 +307,9 @@ public class ClientWorker { // 分任务 int listenerSize = cacheMap.get().size(); // 向上取整为批数 - int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); + int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { - for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) { + for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题 executorService.execute(new LongPollingRunnable(i)); } @@ -316,7 +320,7 @@ public class ClientWorker { /** * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。 */ - List checkUpdateDataIds(List cacheDatas, List inInitializingCacheList) { + List checkUpdateDataIds(List cacheDatas, List inInitializingCacheList) throws IOException { StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDatas) { if (!cacheData.isUseLocalConfigInfo()) { @@ -342,14 +346,12 @@ public class ClientWorker { /** * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。 */ - List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) { + List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); // Take a custom timeout parameter to circumvent network errors caused by network latency - long timeout = TimeUnit.SECONDS.toMillis(this.timeout); - List headers = new ArrayList(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); @@ -368,11 +370,6 @@ public class ClientWorker { HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); - // The maximum number of tolerable server reconnection errors has been reached - if (result == null) { - return Collections.emptyList(); - } - if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); @@ -383,6 +380,7 @@ public class ClientWorker { } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); + throw e; } return Collections.emptyList(); } @@ -447,13 +445,14 @@ public class ClientWorker { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); - t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName()); + t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); executor.scheduleWithFixedDelay(new Runnable() { + @Override public void run() { try { checkConfigInfo(); @@ -465,17 +464,10 @@ public class ClientWorker { } private void init(Properties properties) { - try { - String timeoutStr = TemplateUtils.stringBlankAndThenExecute(properties.getProperty(PropertyKeyConst.TIMEOUT), new Callable() { - @Override - public String call() throws Exception { - return String.valueOf(Constants.SO_TIMEOUT); - } - }); - timeout = Long.parseLong(timeoutStr); - } catch (NumberFormatException nfe) { - timeout = Constants.SO_TIMEOUT; - } + + timeout = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), Constants.CONFIG_LONG_POLL_TIMEOUT); + + taskPenaltyTime = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME); } class LongPollingRunnable implements Runnable { @@ -485,9 +477,12 @@ public class ClientWorker { this.taskId = taskId; } + @Override public void run() { + + List cacheDatas = new ArrayList(); + List inInitializingCacheList = new ArrayList(); try { - List cacheDatas = new ArrayList(); // check failover config for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { @@ -503,7 +498,6 @@ public class ClientWorker { } } - List inInitializingCacheList = new ArrayList(); // check server config List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); @@ -537,13 +531,13 @@ public class ClientWorker { } } inInitializingCacheList.clear(); - executorService.execute(this); - } catch (Throwable e) { - LOGGER.error("longPolling error", e); - // If the server fails, punish the task execution and delay the next task execution - // to avoid the client sending a large number of requests when the server cannot - // respond to the request + executorService.execute(this); + + } catch (Throwable e) { + + // If the rotation training task is abnormal, the next execution time of the task will be punished + LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } @@ -561,6 +555,7 @@ public class ClientWorker { final ScheduledExecutorService executor; final ScheduledExecutorService executorService; + /** * groupKey -> cacheData */ @@ -571,6 +566,6 @@ public class ClientWorker { ConfigFilterChainManager configFilterChainManager; private boolean isHealthServer = true; private long timeout; - private double currentLongingTaskCount = 0; - private long taskPenaltyTime = 2000; + private volatile double currentLongingTaskCount = 0; + private int taskPenaltyTime; } diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java index 59feb32cd..f999c4f3b 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java @@ -28,9 +28,7 @@ import java.io.StringReader; import java.net.HttpURLConnection; import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * Serverlist Manager @@ -45,8 +43,6 @@ public class ServerListManager { isFixed = false; isStarted = false; name = DEFAULT_NAME; - - errServerSet = new CopyOnWriteArraySet(); } public ServerListManager(List fixed) { @@ -73,9 +69,6 @@ public class ServerListManager { name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()])) + "-" + namespace; } - - errServerSet = new CopyOnWriteArraySet(); - } public ServerListManager(String host, int port) { @@ -83,9 +76,6 @@ public class ServerListManager { isStarted = false; name = CUSTOM_NAME + "-" + host + "-" + port; addressServerUrl = String.format("http://%s:%d/%s/%s", host, port, contentPath, serverListName); - - errServerSet = new CopyOnWriteArraySet(); - } public ServerListManager(String endpoint) throws NacosException { @@ -116,9 +106,6 @@ public class ServerListManager { addressServerUrl = String.format("http://%s:%d/%s/%s?namespace=%s", endpoint, endpointPort, contentPath, serverListName, namespace); } - - errServerSet = new CopyOnWriteArraySet(); - } public ServerListManager(Properties properties) throws NacosException { @@ -165,8 +152,6 @@ public class ServerListManager { } } - errServerSet = new CopyOnWriteArraySet(); - } private void initParam(Properties properties) { @@ -241,15 +226,6 @@ public class ServerListManager { return new ServerAddressIterator(serverUrls); } - // Consider adding a penalty mechanism, with the offending node coming last - - Iterator iterator(String errServerIp) { - if (serverUrls.isEmpty()) { - LOGGER.error("[{}] [iterator-serverlist] No server address defined!", name); - } - return new ServerAddressIterator(serverUrls, errServerIp); - } - class GetServerListTask implements Runnable { final String url; @@ -353,38 +329,20 @@ public class ServerListManager { currentServerAddr = iterator().next(); } - // Consider adding a penalty mechanism, with the offending node coming last - - public void refreshCurrentServerAddr(String errServerIp) { - currentServerAddr = iterator(errServerIp).next(); - } - public String getCurrentServerAddr() { if (StringUtils.isBlank(currentServerAddr)) { - currentServerAddr = iterator().next(); + iterator = iterator(); + currentServerAddr = iterator.next(); } return currentServerAddr; } - public void addErrServerRecord(String errServerIp) { - errServerSet.add(errServerIp); - if (errServerSet.size() == serverUrls.size()) { - retryCnt.incrementAndGet(); - // 对错误 errServerSet 进行清空,进行新一轮的重试 - errServerSet.clear(); - } + public boolean hasNextServer() { + return iterator.hasNext(); } - public void delErrServerRecord(String errServerIp) { - errServerSet.remove(errServerIp); - // 如果当前的 errServerSet.size() 与 serverUrls.size() 的比例介于 (0, 1/2] 之间,清除重试次数信息 - if (errServerSet.size() * 2 <= serverUrls.size()) { - retryCnt.set(0); - } - } - - public boolean isMaxRetry() { - return retryCnt.get() >= 4; + public String getNextServerAddr() { + return iterator.next(); } public String getContentPath() { @@ -426,16 +384,15 @@ public class ServerListManager { private String serverListName = ParamUtil.getDefaultNodesPath(); volatile List serverUrls = new ArrayList(); - private final CopyOnWriteArraySet errServerSet; - private final AtomicInteger retryCnt = new AtomicInteger(0); - private volatile String currentServerAddr; + private Iterator iterator; public String serverPort = ParamUtil.getDefaultServerPort(); public String addressServerUrl; private String serverAddrsStr; + } /** @@ -496,14 +453,17 @@ class ServerAddressIterator implements Iterator { iter = sorted.iterator(); } + @Override public boolean hasNext() { return iter.hasNext(); } + @Override public String next() { return iter.next().serverIp; } + @Override public void remove() { throw new UnsupportedOperationException(); } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java index b7d5207f8..c8c365475 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java @@ -89,7 +89,8 @@ public class NacosNamingService implements NamingService { serverProxy = new NamingProxy(namespace, endpoint, serverList); serverProxy.setProperties(properties); beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties)); - hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties)); + hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), + initPollingThreadCount(properties)); } private int initClientBeatThreadCount(Properties properties) { diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java index 35dbbfb51..7b11bea96 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java @@ -241,8 +241,9 @@ public class LongPollingService extends AbstractEventListener { // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制 asyncContext.setTimeout(0L); + // I think there is a problem with the task delay setting here that could easily cause a client read timeout scheduler.execute( - new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); + new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, delayTime, appName, tag)); } @Override @@ -406,7 +407,7 @@ public class LongPollingService extends AbstractEventListener { } } - //TODO 为什么任务延迟执行时间设置为超时时间?会不会导致 read timeout + }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this); From be213f56011b72f997b4fea4603a9f2a4f752378 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 7 Jun 2019 16:47:25 +0800 Subject: [PATCH 04/11] test(nacos-client): add ConfigLongPollTest --- .../nacos/client/ConfigLongPollTest.java | 81 +++++++++++++++++++ .../naming/DeregisterInstance_ITCase.java | 11 ++- 2 files changed, 88 insertions(+), 4 deletions(-) create mode 100644 client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java new file mode 100644 index 000000000..4a2ef3323 --- /dev/null +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import org.junit.Before; +import org.junit.Test; + +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * @author liaochuntao + * @date 2019-06-07 16:37 + **/ +public class ConfigLongPollTest { + + private ConfigService configService; + + @Before + public void init() throws NacosException { + Properties properties = new Properties(); + properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); + properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 2000); + properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 5000); + configService = NacosFactory.createConfigService(properties); + } + + @Test + public void test() throws InterruptedException, NacosException { + + synchronized (this) { + + configService.addListener("test", "DEFAULT_GROUP", new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configInfo) { + System.out.println(configInfo); + } + }); + + configService.addListener("test-1", "DEFAULT_GROUP", new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configInfo) { + System.out.println(configInfo); + } + }); + + wait(); + } + + } + +} diff --git a/test/src/test/java/com/alibaba/nacos/test/naming/DeregisterInstance_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/naming/DeregisterInstance_ITCase.java index 7fbdf9900..b52ba01f6 100644 --- a/test/src/test/java/com/alibaba/nacos/test/naming/DeregisterInstance_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/naming/DeregisterInstance_ITCase.java @@ -25,6 +25,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.test.annotation.Repeat; import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays; @@ -75,7 +76,7 @@ public class DeregisterInstance_ITCase { */ @Test public void dregDomTest() throws Exception { - String serviceName = randomDomainName(); + String serviceName = randomDomainName() + "-dom"; System.out.println(serviceName); naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT); naming.registerInstance(serviceName, "127.0.0.2", TEST_PORT); @@ -108,11 +109,11 @@ public class DeregisterInstance_ITCase { * * @throws Exception */ + @Repeat(value = 20) @Test public void dregDomClusterTest() throws Exception { - String serviceName = randomDomainName(); - + String serviceName = randomDomainName() + "-cluster"; System.out.println(serviceName); naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1"); @@ -120,6 +121,7 @@ public class DeregisterInstance_ITCase { List instances; instances = naming.getAllInstances(serviceName); + System.out.println("before : " + instances.toString()); verifyInstanceList(instances, 2, serviceName); instances = naming.getAllInstances(serviceName); @@ -130,6 +132,7 @@ public class DeregisterInstance_ITCase { TimeUnit.SECONDS.sleep(5); instances = naming.getAllInstances(serviceName); + System.out.println("after : " + instances.toString()); Assert.assertEquals(1, instances.size()); @@ -150,7 +153,7 @@ public class DeregisterInstance_ITCase { @Test public void dregLastDomTest() throws Exception { - String serviceName = randomDomainName(); + String serviceName = randomDomainName() + "-last"; naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1"); naming.registerInstance(serviceName, "127.0.0.2", TEST_PORT, "c2"); From 92ff0a0873dbb144916a33f5a0f0198d63246cb6 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 7 Jun 2019 16:58:29 +0800 Subject: [PATCH 05/11] fix(remove ConfigLongPollTest): --- .../nacos/client/ConfigLongPollTest.java | 162 +++++++++--------- 1 file changed, 81 insertions(+), 81 deletions(-) diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java index 4a2ef3323..fc21944be 100644 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java @@ -1,81 +1,81 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.client; - -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import org.junit.Before; -import org.junit.Test; - -import java.util.Properties; -import java.util.concurrent.Executor; - -/** - * @author liaochuntao - * @date 2019-06-07 16:37 - **/ -public class ConfigLongPollTest { - - private ConfigService configService; - - @Before - public void init() throws NacosException { - Properties properties = new Properties(); - properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); - properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 2000); - properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 5000); - configService = NacosFactory.createConfigService(properties); - } - - @Test - public void test() throws InterruptedException, NacosException { - - synchronized (this) { - - configService.addListener("test", "DEFAULT_GROUP", new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configInfo) { - System.out.println(configInfo); - } - }); - - configService.addListener("test-1", "DEFAULT_GROUP", new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configInfo) { - System.out.println(configInfo); - } - }); - - wait(); - } - - } - -} +///* +// * Copyright 1999-2018 Alibaba Group Holding Ltd. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package com.alibaba.nacos.client; +// +//import com.alibaba.nacos.api.NacosFactory; +//import com.alibaba.nacos.api.PropertyKeyConst; +//import com.alibaba.nacos.api.config.ConfigService; +//import com.alibaba.nacos.api.config.listener.Listener; +//import com.alibaba.nacos.api.exception.NacosException; +//import org.junit.Before; +//import org.junit.Test; +// +//import java.util.Properties; +//import java.util.concurrent.Executor; +// +///** +// * @author liaochuntao +// * @date 2019-06-07 16:37 +// **/ +//public class ConfigLongPollTest { +// +// private ConfigService configService; +// +// @Before +// public void init() throws NacosException { +// Properties properties = new Properties(); +// properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); +// properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 2000); +// properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 5000); +// configService = NacosFactory.createConfigService(properties); +// } +// +// @Test +// public void test() throws InterruptedException, NacosException { +// +// synchronized (this) { +// +// configService.addListener("test", "DEFAULT_GROUP", new Listener() { +// @Override +// public Executor getExecutor() { +// return null; +// } +// +// @Override +// public void receiveConfigInfo(String configInfo) { +// System.out.println(configInfo); +// } +// }); +// +// configService.addListener("test-1", "DEFAULT_GROUP", new Listener() { +// @Override +// public Executor getExecutor() { +// return null; +// } +// +// @Override +// public void receiveConfigInfo(String configInfo) { +// System.out.println(configInfo); +// } +// }); +// +// wait(); +// } +// +// } +// +//} From 456e84785f04c141663c4a7288041cceeed3d6ee Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 7 Jun 2019 18:41:08 +0800 Subject: [PATCH 06/11] refactor(nacos-client:config): Rewrite the HTTP retry rule --- .../client/config/http/ServerHttpAgent.java | 38 +++- .../nacos/client/ConfigLongPollTest.java | 164 +++++++++--------- .../com/alibaba/nacos/client/RandomTest.java | 49 ------ .../server/service/LongPollingService.java | 2 +- .../naming/DeregisterInstance_ITCase.java | 9 +- 5 files changed, 118 insertions(+), 144 deletions(-) delete mode 100644 client/src/test/java/com/alibaba/nacos/client/RandomTest.java diff --git a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java index 6bef3841e..4923a68e0 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java @@ -68,6 +68,9 @@ public class ServerHttpAgent implements HttpAgent { boolean isSSL = false; + String currentServerAddr = serverListMgr.getCurrentServerAddr(); + int maxRetry = 3; + do { try { List newHeaders = getSpasHeaders(paramValues); @@ -75,7 +78,7 @@ public class ServerHttpAgent implements HttpAgent { newHeaders.addAll(headers); } HttpResult result = HttpSimpleClient.httpGet( - getUrl(serverListMgr.getCurrentServerAddr(), path, isSSL), newHeaders, paramValues, encoding, + getUrl(currentServerAddr, path, isSSL), newHeaders, paramValues, encoding, readTimeoutMs, isSSL); if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR || result.code == HttpURLConnection.HTTP_BAD_GATEWAY @@ -87,14 +90,23 @@ public class ServerHttpAgent implements HttpAgent { } } catch (ConnectException ce) { LOGGER.error("[NACOS ConnectException httpGet] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); - serverListMgr.refreshCurrentServerAddr(); } catch (SocketTimeoutException stoe) { LOGGER.error("[NACOS SocketTimeoutException httpGet] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); - serverListMgr.refreshCurrentServerAddr(); } catch (IOException ioe) { LOGGER.error("[NACOS IOException httpGet] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe); throw ioe; } + + if (serverListMgr.hasNextServer()) { + currentServerAddr = serverListMgr.getNextServerAddr(); + } else { + maxRetry --; + if (maxRetry < 0) { + throw new ConnectException("[NACOS HTTP-GET] The maximum number of tolerable server reconnection errors has been reached"); + } + serverListMgr.refreshCurrentServerAddr(); + } + } while (System.currentTimeMillis() <= endTime); LOGGER.error("no available server"); @@ -144,7 +156,7 @@ public class ServerHttpAgent implements HttpAgent { } else { maxRetry --; if (maxRetry < 0) { - throw new ConnectException("The maximum number of tolerable server reconnection errors has been reached"); + throw new ConnectException("[NACOS HTTP-POST] The maximum number of tolerable server reconnection errors has been reached"); } serverListMgr.refreshCurrentServerAddr(); } @@ -160,6 +172,10 @@ public class ServerHttpAgent implements HttpAgent { long readTimeoutMs) throws IOException { final long endTime = System.currentTimeMillis() + readTimeoutMs; boolean isSSL = false; + + String currentServerAddr = serverListMgr.getCurrentServerAddr(); + int maxRetry = 3; + do { try { List newHeaders = getSpasHeaders(paramValues); @@ -167,7 +183,7 @@ public class ServerHttpAgent implements HttpAgent { newHeaders.addAll(headers); } HttpResult result = HttpSimpleClient.httpDelete( - getUrl(serverListMgr.getCurrentServerAddr(), path, isSSL), newHeaders, paramValues, encoding, + getUrl(currentServerAddr, path, isSSL), newHeaders, paramValues, encoding, readTimeoutMs, isSSL); if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR || result.code == HttpURLConnection.HTTP_BAD_GATEWAY @@ -179,15 +195,23 @@ public class ServerHttpAgent implements HttpAgent { } } catch (ConnectException ce) { LOGGER.error("[NACOS ConnectException httpDelete] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); - serverListMgr.refreshCurrentServerAddr(); } catch (SocketTimeoutException stoe) { LOGGER.error("[NACOS SocketTimeoutException httpDelete] currentServerAddr:{}", serverListMgr.getCurrentServerAddr()); - serverListMgr.refreshCurrentServerAddr(); } catch (IOException ioe) { LOGGER.error("[NACOS IOException httpDelete] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe); throw ioe; } + if (serverListMgr.hasNextServer()) { + currentServerAddr = serverListMgr.getNextServerAddr(); + } else { + maxRetry --; + if (maxRetry < 0) { + throw new ConnectException("[NACOS HTTP-DELETE] The maximum number of tolerable server reconnection errors has been reached"); + } + serverListMgr.refreshCurrentServerAddr(); + } + } while (System.currentTimeMillis() <= endTime); LOGGER.error("no available server"); diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java index fc21944be..845205748 100644 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java @@ -1,81 +1,83 @@ -///* -// * Copyright 1999-2018 Alibaba Group Holding Ltd. -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package com.alibaba.nacos.client; -// -//import com.alibaba.nacos.api.NacosFactory; -//import com.alibaba.nacos.api.PropertyKeyConst; -//import com.alibaba.nacos.api.config.ConfigService; -//import com.alibaba.nacos.api.config.listener.Listener; -//import com.alibaba.nacos.api.exception.NacosException; -//import org.junit.Before; -//import org.junit.Test; -// -//import java.util.Properties; -//import java.util.concurrent.Executor; -// -///** -// * @author liaochuntao -// * @date 2019-06-07 16:37 -// **/ -//public class ConfigLongPollTest { -// -// private ConfigService configService; -// -// @Before -// public void init() throws NacosException { -// Properties properties = new Properties(); -// properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); -// properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 2000); -// properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 5000); -// configService = NacosFactory.createConfigService(properties); -// } -// -// @Test -// public void test() throws InterruptedException, NacosException { -// -// synchronized (this) { -// -// configService.addListener("test", "DEFAULT_GROUP", new Listener() { -// @Override -// public Executor getExecutor() { -// return null; -// } -// -// @Override -// public void receiveConfigInfo(String configInfo) { -// System.out.println(configInfo); -// } -// }); -// -// configService.addListener("test-1", "DEFAULT_GROUP", new Listener() { -// @Override -// public Executor getExecutor() { -// return null; -// } -// -// @Override -// public void receiveConfigInfo(String configInfo) { -// System.out.println(configInfo); -// } -// }); -// -// wait(); -// } -// -// } -// -//} +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * @author liaochuntao + * @date 2019-06-07 16:37 + **/ +@Ignore +public class ConfigLongPollTest { + + private ConfigService configService; + + @Before + public void init() throws NacosException { + Properties properties = new Properties(); + properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); + properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 2000); + properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 5000); + configService = NacosFactory.createConfigService(properties); + } + + @Test + public void test() throws InterruptedException, NacosException { + + synchronized (this) { + + configService.addListener("test", "DEFAULT_GROUP", new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configInfo) { + System.out.println(configInfo); + } + }); + + configService.addListener("test-1", "DEFAULT_GROUP", new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configInfo) { + System.out.println(configInfo); + } + }); + + wait(); + } + + } + +} diff --git a/client/src/test/java/com/alibaba/nacos/client/RandomTest.java b/client/src/test/java/com/alibaba/nacos/client/RandomTest.java deleted file mode 100644 index 39139fad6..000000000 --- a/client/src/test/java/com/alibaba/nacos/client/RandomTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.client; - -import org.junit.Before; -import org.junit.Test; - -import java.util.Random; - -/** - * @author liaochuntao - * @date 2019-06-05 20:55 - **/ -public class RandomTest { - - private Random random; - - @Before - public void before() { - random = new Random(); - } - - @Test - public void test() throws InterruptedException { - long[] tmp = new long[10]; - for (int i = 0; i < 10; i++) { - tmp[i] = random.nextInt(Integer.MAX_VALUE); - } - Thread.sleep(100); - for (int i = 0; i < 10; i++) { - System.out.println(tmp[i] == random.nextInt(Integer.MAX_VALUE)); - } - } - -} diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java index 7b11bea96..d9caa2c2c 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java @@ -243,7 +243,7 @@ public class LongPollingService extends AbstractEventListener { // I think there is a problem with the task delay setting here that could easily cause a client read timeout scheduler.execute( - new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, delayTime, appName, tag)); + new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); } @Override diff --git a/test/src/test/java/com/alibaba/nacos/test/naming/DeregisterInstance_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/naming/DeregisterInstance_ITCase.java index b52ba01f6..2493e4fa9 100644 --- a/test/src/test/java/com/alibaba/nacos/test/naming/DeregisterInstance_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/naming/DeregisterInstance_ITCase.java @@ -76,7 +76,7 @@ public class DeregisterInstance_ITCase { */ @Test public void dregDomTest() throws Exception { - String serviceName = randomDomainName() + "-dom"; + String serviceName = randomDomainName(); System.out.println(serviceName); naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT); naming.registerInstance(serviceName, "127.0.0.2", TEST_PORT); @@ -109,11 +109,10 @@ public class DeregisterInstance_ITCase { * * @throws Exception */ - @Repeat(value = 20) @Test public void dregDomClusterTest() throws Exception { - String serviceName = randomDomainName() + "-cluster"; + String serviceName = randomDomainName(); System.out.println(serviceName); naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1"); @@ -121,7 +120,6 @@ public class DeregisterInstance_ITCase { List instances; instances = naming.getAllInstances(serviceName); - System.out.println("before : " + instances.toString()); verifyInstanceList(instances, 2, serviceName); instances = naming.getAllInstances(serviceName); @@ -132,7 +130,6 @@ public class DeregisterInstance_ITCase { TimeUnit.SECONDS.sleep(5); instances = naming.getAllInstances(serviceName); - System.out.println("after : " + instances.toString()); Assert.assertEquals(1, instances.size()); @@ -153,7 +150,7 @@ public class DeregisterInstance_ITCase { @Test public void dregLastDomTest() throws Exception { - String serviceName = randomDomainName() + "-last"; + String serviceName = randomDomainName(); naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1"); naming.registerInstance(serviceName, "127.0.0.2", TEST_PORT, "c2"); From ecc015402e342c237225f8e44631674c44dd7e68 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 7 Jun 2019 21:25:09 +0800 Subject: [PATCH 07/11] =?UTF-8?q?refactor(nacos-client):=20=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E4=B8=8E=E6=9C=ACpr=E6=97=A0=E5=85=B3=E7=9A=84?= =?UTF-8?q?=E6=B3=A8=E9=87=8A=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/alibaba/nacos/client/config/impl/ClientWorker.java | 4 +--- .../nacos/config/server/service/LongPollingService.java | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index 6aac641cb..98ae4c467 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -344,8 +344,6 @@ public class ClientWorker { List params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); - // Take a custom timeout parameter to circumvent network errors caused by network latency - List headers = new ArrayList(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); @@ -560,6 +558,6 @@ public class ClientWorker { ConfigFilterChainManager configFilterChainManager; private boolean isHealthServer = true; private long timeout; - private volatile double currentLongingTaskCount = 0; + private double currentLongingTaskCount = 0; private int taskPenaltyTime; } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java index d9caa2c2c..ec8ae8fff 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java @@ -241,7 +241,6 @@ public class LongPollingService extends AbstractEventListener { // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制 asyncContext.setTimeout(0L); - // I think there is a problem with the task delay setting here that could easily cause a client read timeout scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); } From d43e079a97678993d35ce24743b400cb54800d1e Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 7 Jun 2019 22:51:16 +0800 Subject: [PATCH 08/11] test(nacos-test:config): add ConfigLongPoll_ITCase --- .../nacos/client/ConfigLongPollTest.java | 83 ----------------- .../test/config/ConfigLongPoll_ITCase.java | 89 +++++++++++++++++++ 2 files changed, 89 insertions(+), 83 deletions(-) delete mode 100644 client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java create mode 100644 test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_ITCase.java diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java deleted file mode 100644 index 845205748..000000000 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigLongPollTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.client; - -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.Properties; -import java.util.concurrent.Executor; - -/** - * @author liaochuntao - * @date 2019-06-07 16:37 - **/ -@Ignore -public class ConfigLongPollTest { - - private ConfigService configService; - - @Before - public void init() throws NacosException { - Properties properties = new Properties(); - properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); - properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 2000); - properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 5000); - configService = NacosFactory.createConfigService(properties); - } - - @Test - public void test() throws InterruptedException, NacosException { - - synchronized (this) { - - configService.addListener("test", "DEFAULT_GROUP", new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configInfo) { - System.out.println(configInfo); - } - }); - - configService.addListener("test-1", "DEFAULT_GROUP", new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configInfo) { - System.out.println(configInfo); - } - }); - - wait(); - } - - } - -} diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_ITCase.java new file mode 100644 index 000000000..63237061a --- /dev/null +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_ITCase.java @@ -0,0 +1,89 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.test.config; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.config.server.Config; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.Properties; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * @author liaochuntao + * @date 2019-06-07 22:24 + **/ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = Config.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class ConfigLongPoll_ITCase { + + @LocalServerPort + private int port; + + private ConfigService configService; + + @Before + public void init() throws NacosException { + Properties properties = new Properties(); + properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:" + port); + properties.put(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT, 2000); + properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 3000); + configService = NacosFactory.createConfigService(properties); + } + + @Test + public void test() throws InterruptedException, NacosException { + + configService.addListener("test", "DEFAULT_GROUP", new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configInfo) { + System.out.println(configInfo); + } + }); + + configService.addListener("test-1", "DEFAULT_GROUP", new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configInfo) { + System.out.println(configInfo); + } + }); + + TimeUnit.SECONDS.sleep(30); + + } + +} From 0e653a9b393df41d9d5d8cedb2f47b88f4e4058f Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 11 Jun 2019 09:12:59 +0800 Subject: [PATCH 09/11] refactor(nacos-client:config): Modify the long poll timeout parameter to add Max retry custom Settin --- .../alibaba/nacos/api/PropertyKeyConst.java | 2 ++ .../alibaba/nacos/api/common/Constants.java | 7 ++++- .../client/config/http/ServerHttpAgent.java | 31 +++++++++++++------ .../client/config/impl/ServerListManager.java | 14 +++++---- 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java b/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java index c00730c13..984591bdf 100644 --- a/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java +++ b/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java @@ -50,6 +50,8 @@ public class PropertyKeyConst { public final static String CONFIG_RETRY_TIME = "config.retry.time"; + public final static String MAX_RETRY = "maxRetry"; + public final static String NAMING_LOAD_CACHE_AT_START = "namingLoadCacheAtStart"; public final static String NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount"; diff --git a/api/src/main/java/com/alibaba/nacos/api/common/Constants.java b/api/src/main/java/com/alibaba/nacos/api/common/Constants.java index c5384fa05..f859a3ede 100644 --- a/api/src/main/java/com/alibaba/nacos/api/common/Constants.java +++ b/api/src/main/java/com/alibaba/nacos/api/common/Constants.java @@ -91,13 +91,18 @@ public class Constants { /** * millisecond */ - public static final int CONFIG_LONG_POLL_TIMEOUT = 5000; + public static final int CONFIG_LONG_POLL_TIMEOUT = 30000; /** * millisecond */ public static final int CONFIG_RETRY_TIME = 2000; + /** + * Maximum number of retries + */ + public static final int MAX_RETRY = 3; + /** * millisecond */ diff --git a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java index 4923a68e0..d6aa01831 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java @@ -31,6 +31,7 @@ import com.alibaba.nacos.client.utils.ParamUtil; import com.alibaba.nacos.client.utils.StringUtils; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.commons.lang3.math.NumberUtils; import org.slf4j.Logger; import java.io.IOException; import java.net.ConnectException; @@ -69,7 +70,7 @@ public class ServerHttpAgent implements HttpAgent { boolean isSSL = false; String currentServerAddr = serverListMgr.getCurrentServerAddr(); - int maxRetry = 3; + int maxRetry = this.maxRetry; do { try { @@ -86,6 +87,8 @@ public class ServerHttpAgent implements HttpAgent { LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}", serverListMgr.getCurrentServerAddr(), result.code); } else { + // Update the currently available server addr + serverListMgr.updateCurrentServerAddr(currentServerAddr); return result; } } catch (ConnectException ce) { @@ -97,8 +100,8 @@ public class ServerHttpAgent implements HttpAgent { throw ioe; } - if (serverListMgr.hasNextServer()) { - currentServerAddr = serverListMgr.getNextServerAddr(); + if (serverListMgr.getIterator().hasNext()) { + currentServerAddr = serverListMgr.getIterator().next(); } else { maxRetry --; if (maxRetry < 0) { @@ -120,7 +123,7 @@ public class ServerHttpAgent implements HttpAgent { boolean isSSL = false; String currentServerAddr = serverListMgr.getCurrentServerAddr(); - int maxRetry = 3; + int maxRetry = this.maxRetry; do { @@ -139,6 +142,8 @@ public class ServerHttpAgent implements HttpAgent { LOGGER.error("[NACOS ConnectException httpPost] currentServerAddr: {}, httpCode: {}", currentServerAddr, result.code); } else { + // Update the currently available server addr + serverListMgr.updateCurrentServerAddr(currentServerAddr); return result; } } catch (ConnectException ce) { @@ -151,8 +156,8 @@ public class ServerHttpAgent implements HttpAgent { throw ioe; } - if (serverListMgr.hasNextServer()) { - currentServerAddr = serverListMgr.getNextServerAddr(); + if (serverListMgr.getIterator().hasNext()) { + currentServerAddr = serverListMgr.getIterator().next(); } else { maxRetry --; if (maxRetry < 0) { @@ -174,7 +179,7 @@ public class ServerHttpAgent implements HttpAgent { boolean isSSL = false; String currentServerAddr = serverListMgr.getCurrentServerAddr(); - int maxRetry = 3; + int maxRetry = this.maxRetry; do { try { @@ -191,6 +196,8 @@ public class ServerHttpAgent implements HttpAgent { LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}", serverListMgr.getCurrentServerAddr(), result.code); } else { + // Update the currently available server addr + serverListMgr.updateCurrentServerAddr(currentServerAddr); return result; } } catch (ConnectException ce) { @@ -202,8 +209,8 @@ public class ServerHttpAgent implements HttpAgent { throw ioe; } - if (serverListMgr.hasNextServer()) { - currentServerAddr = serverListMgr.getNextServerAddr(); + if (serverListMgr.getIterator().hasNext()) { + currentServerAddr = serverListMgr.getIterator().next(); } else { maxRetry --; if (maxRetry < 0) { @@ -247,6 +254,7 @@ public class ServerHttpAgent implements HttpAgent { private void init(Properties properties) { initEncode(properties); initAkSk(properties); + initMaxRetry(properties); } private void initEncode(Properties properties) { @@ -279,6 +287,10 @@ public class ServerHttpAgent implements HttpAgent { } } + private void initMaxRetry(Properties properties) { + maxRetry = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.MAX_RETRY), Constants.MAX_RETRY); + } + @Override public synchronized void start() throws NacosException { serverListMgr.start(); @@ -430,6 +442,7 @@ public class ServerHttpAgent implements HttpAgent { private String accessKey; private String secretKey; private String encode; + private int maxRetry = 3; private volatile STSCredential sTSCredential; final ServerListManager serverListMgr; diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java index f3f8cad2b..887123dda 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java @@ -266,7 +266,8 @@ public class ServerListManager { return; } serverUrls = new ArrayList(newList); - currentServerAddr = iterator().next(); + iterator = iterator(); + currentServerAddr = iterator.next(); EventDispatcher.fireEvent(new ServerlistChangeEvent()); LOGGER.info("[{}] [update-serverlist] serverlist updated to {}", name, serverUrls); @@ -333,7 +334,8 @@ public class ServerListManager { } public void refreshCurrentServerAddr() { - currentServerAddr = iterator().next(); + iterator = iterator(); + currentServerAddr = iterator.next(); } public String getCurrentServerAddr() { @@ -344,12 +346,12 @@ public class ServerListManager { return currentServerAddr; } - public boolean hasNextServer() { - return iterator.hasNext(); + public void updateCurrentServerAddr(String currentServerAddr) { + this.currentServerAddr = currentServerAddr; } - public String getNextServerAddr() { - return iterator.next(); + public Iterator getIterator() { + return iterator; } public String getContentPath() { From be3cb036a25f451e21621643d7db4b7592b361e1 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 11 Jun 2019 10:01:55 +0800 Subject: [PATCH 10/11] refactor(nacos-client:config): remove un use constructor --- .../client/config/impl/ServerListManager.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java index 887123dda..aa3a2fab1 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java @@ -447,21 +447,6 @@ class ServerAddressIterator implements Iterator { iter = sorted.iterator(); } - // Consider adding a penalty mechanism, with the offending node coming last - - public ServerAddressIterator(List source, String errServerIp) { - sorted = new ArrayList(); - for (String address : source) { - if (address.equals(errServerIp)) { - continue; - } - sorted.add(new RandomizedServerAddress(address)); - } - Collections.sort(sorted); - sorted.add(new RandomizedServerAddress(errServerIp)); - iter = sorted.iterator(); - } - @Override public boolean hasNext() { return iter.hasNext(); From aa41a10616b827a7d3a4aab076df64bc750372b9 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 11 Jun 2019 13:26:20 +0800 Subject: [PATCH 11/11] fix(nacos-client:config): Fix failed custom parameter setting error --- .../main/java/com/alibaba/nacos/api/common/Constants.java | 5 +++++ .../alibaba/nacos/client/config/http/ServerHttpAgent.java | 2 +- .../com/alibaba/nacos/client/config/impl/ClientWorker.java | 5 +++-- .../com/alibaba/nacos/client/naming/NacosNamingService.java | 3 +-- .../com/alibaba/nacos/test/config/ConfigLongPoll_ITCase.java | 2 +- 5 files changed, 11 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/com/alibaba/nacos/api/common/Constants.java b/api/src/main/java/com/alibaba/nacos/api/common/Constants.java index f859a3ede..901f6f869 100644 --- a/api/src/main/java/com/alibaba/nacos/api/common/Constants.java +++ b/api/src/main/java/com/alibaba/nacos/api/common/Constants.java @@ -93,6 +93,11 @@ public class Constants { */ public static final int CONFIG_LONG_POLL_TIMEOUT = 30000; + /** + * millisecond + */ + public static final int MIN_CONFIG_LONG_POLL_TIMEOUT = 10000; + /** * millisecond */ diff --git a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java index 609dee8fd..aa2bf35ef 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/http/ServerHttpAgent.java @@ -287,7 +287,7 @@ public class ServerHttpAgent implements HttpAgent { } private void initMaxRetry(Properties properties) { - maxRetry = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.MAX_RETRY), Constants.MAX_RETRY); + maxRetry = NumberUtils.toInt(String.valueOf(properties.get(PropertyKeyConst.MAX_RETRY)), Constants.MAX_RETRY); } @Override diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index c8048b1e6..a7665f051 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -457,9 +457,10 @@ public class ClientWorker { private void init(Properties properties) { - timeout = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), Constants.CONFIG_LONG_POLL_TIMEOUT); + timeout = Math.max(NumberUtils.toInt(String.valueOf(properties.get(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT)), + Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT); - taskPenaltyTime = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME); + taskPenaltyTime = NumberUtils.toInt(String.valueOf(properties.get(PropertyKeyConst.CONFIG_RETRY_TIME)), Constants.CONFIG_RETRY_TIME); } class LongPollingRunnable implements Runnable { diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java index e673d7588..d92492d3a 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java @@ -90,8 +90,7 @@ public class NacosNamingService implements NamingService { serverProxy = new NamingProxy(namespace, endpoint, serverList); serverProxy.setProperties(properties); beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties)); - hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), - initPollingThreadCount(properties)); + hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties)); } private int initClientBeatThreadCount(Properties properties) { diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_ITCase.java index 63237061a..c50fd92da 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_ITCase.java @@ -50,7 +50,7 @@ public class ConfigLongPoll_ITCase { public void init() throws NacosException { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:" + port); - properties.put(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT, 2000); + properties.put(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT, "20000"); properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 3000); configService = NacosFactory.createConfigService(properties); }