Merge pull request #1341 from chuntaojun/fix_issue_1214
[#ISSUE 1214] Adjust client error retry mechanism
This commit is contained in:
commit
edf3f66cc6
@ -46,6 +46,12 @@ public class PropertyKeyConst {
|
|||||||
|
|
||||||
public final static String ENCODE = "encode";
|
public final static String ENCODE = "encode";
|
||||||
|
|
||||||
|
public final static String CONFIG_LONG_POLL_TIMEOUT = "config.long-poll.timeout";
|
||||||
|
|
||||||
|
public final static String CONFIG_RETRY_TIME = "config.retry.time";
|
||||||
|
|
||||||
|
public final static String MAX_RETRY = "maxRetry";
|
||||||
|
|
||||||
public final static String NAMING_LOAD_CACHE_AT_START = "namingLoadCacheAtStart";
|
public final static String NAMING_LOAD_CACHE_AT_START = "namingLoadCacheAtStart";
|
||||||
|
|
||||||
public final static String NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount";
|
public final static String NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount";
|
||||||
|
@ -88,6 +88,26 @@ public class Constants {
|
|||||||
*/
|
*/
|
||||||
public static final int SO_TIMEOUT = 60000;
|
public static final int SO_TIMEOUT = 60000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* millisecond
|
||||||
|
*/
|
||||||
|
public static final int CONFIG_LONG_POLL_TIMEOUT = 30000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* millisecond
|
||||||
|
*/
|
||||||
|
public static final int MIN_CONFIG_LONG_POLL_TIMEOUT = 10000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* millisecond
|
||||||
|
*/
|
||||||
|
public static final int CONFIG_RETRY_TIME = 2000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum number of retries
|
||||||
|
*/
|
||||||
|
public static final int MAX_RETRY = 3;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* millisecond
|
* millisecond
|
||||||
*/
|
*/
|
||||||
|
@ -82,7 +82,7 @@ public class NacosConfigService implements ConfigService {
|
|||||||
initNamespace(properties);
|
initNamespace(properties);
|
||||||
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
|
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
|
||||||
agent.start();
|
agent.start();
|
||||||
worker = new ClientWorker(agent, configFilterChainManager);
|
worker = new ClientWorker(agent, configFilterChainManager, properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initNamespace(Properties properties) {
|
private void initNamespace(Properties properties) {
|
||||||
|
@ -31,6 +31,7 @@ import com.alibaba.nacos.client.utils.ParamUtil;
|
|||||||
import com.alibaba.nacos.client.utils.StringUtils;
|
import com.alibaba.nacos.client.utils.StringUtils;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
@ -67,6 +68,9 @@ public class ServerHttpAgent implements HttpAgent {
|
|||||||
final long endTime = System.currentTimeMillis() + readTimeoutMs;
|
final long endTime = System.currentTimeMillis() + readTimeoutMs;
|
||||||
final boolean isSSL = false;
|
final boolean isSSL = false;
|
||||||
|
|
||||||
|
String currentServerAddr = serverListMgr.getCurrentServerAddr();
|
||||||
|
int maxRetry = this.maxRetry;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
try {
|
try {
|
||||||
List<String> newHeaders = getSpasHeaders(paramValues);
|
List<String> newHeaders = getSpasHeaders(paramValues);
|
||||||
@ -74,7 +78,7 @@ public class ServerHttpAgent implements HttpAgent {
|
|||||||
newHeaders.addAll(headers);
|
newHeaders.addAll(headers);
|
||||||
}
|
}
|
||||||
HttpResult result = HttpSimpleClient.httpGet(
|
HttpResult result = HttpSimpleClient.httpGet(
|
||||||
getUrl(serverListMgr.getCurrentServerAddr(), path, isSSL), newHeaders, paramValues, encoding,
|
getUrl(currentServerAddr, path, isSSL), newHeaders, paramValues, encoding,
|
||||||
readTimeoutMs, isSSL);
|
readTimeoutMs, isSSL);
|
||||||
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR
|
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR
|
||||||
|| result.code == HttpURLConnection.HTTP_BAD_GATEWAY
|
|| result.code == HttpURLConnection.HTTP_BAD_GATEWAY
|
||||||
@ -82,18 +86,29 @@ public class ServerHttpAgent implements HttpAgent {
|
|||||||
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
|
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
|
||||||
serverListMgr.getCurrentServerAddr(), result.code);
|
serverListMgr.getCurrentServerAddr(), result.code);
|
||||||
} else {
|
} else {
|
||||||
|
// Update the currently available server addr
|
||||||
|
serverListMgr.updateCurrentServerAddr(currentServerAddr);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
} catch (ConnectException ce) {
|
} catch (ConnectException ce) {
|
||||||
LOGGER.error("[NACOS ConnectException] currentServerAddr:{}", serverListMgr.getCurrentServerAddr());
|
LOGGER.error("[NACOS ConnectException httpGet] currentServerAddr:{}", serverListMgr.getCurrentServerAddr());
|
||||||
serverListMgr.refreshCurrentServerAddr();
|
|
||||||
} catch (SocketTimeoutException stoe) {
|
} catch (SocketTimeoutException stoe) {
|
||||||
LOGGER.error("[NACOS SocketTimeoutException] currentServerAddr:{}", serverListMgr.getCurrentServerAddr());
|
LOGGER.error("[NACOS SocketTimeoutException httpGet] currentServerAddr:{}", serverListMgr.getCurrentServerAddr());
|
||||||
serverListMgr.refreshCurrentServerAddr();
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOGGER.error("[NACOS IOException] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe);
|
LOGGER.error("[NACOS IOException httpGet] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (serverListMgr.getIterator().hasNext()) {
|
||||||
|
currentServerAddr = serverListMgr.getIterator().next();
|
||||||
|
} else {
|
||||||
|
maxRetry --;
|
||||||
|
if (maxRetry < 0) {
|
||||||
|
throw new ConnectException("[NACOS HTTP-GET] The maximum number of tolerable server reconnection errors has been reached");
|
||||||
|
}
|
||||||
|
serverListMgr.refreshCurrentServerAddr();
|
||||||
|
}
|
||||||
|
|
||||||
} while (System.currentTimeMillis() <= endTime);
|
} while (System.currentTimeMillis() <= endTime);
|
||||||
|
|
||||||
LOGGER.error("no available server");
|
LOGGER.error("no available server");
|
||||||
@ -105,39 +120,55 @@ public class ServerHttpAgent implements HttpAgent {
|
|||||||
long readTimeoutMs) throws IOException {
|
long readTimeoutMs) throws IOException {
|
||||||
final long endTime = System.currentTimeMillis() + readTimeoutMs;
|
final long endTime = System.currentTimeMillis() + readTimeoutMs;
|
||||||
boolean isSSL = false;
|
boolean isSSL = false;
|
||||||
|
|
||||||
|
String currentServerAddr = serverListMgr.getCurrentServerAddr();
|
||||||
|
int maxRetry = this.maxRetry;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
List<String> newHeaders = getSpasHeaders(paramValues);
|
List<String> newHeaders = getSpasHeaders(paramValues);
|
||||||
if (headers != null) {
|
if (headers != null) {
|
||||||
newHeaders.addAll(headers);
|
newHeaders.addAll(headers);
|
||||||
}
|
}
|
||||||
|
|
||||||
HttpResult result = HttpSimpleClient.httpPost(
|
HttpResult result = HttpSimpleClient.httpPost(
|
||||||
getUrl(serverListMgr.getCurrentServerAddr(), path, isSSL), newHeaders, paramValues, encoding,
|
getUrl(currentServerAddr, path, isSSL), newHeaders, paramValues, encoding,
|
||||||
readTimeoutMs, isSSL);
|
readTimeoutMs, isSSL);
|
||||||
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR
|
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR
|
||||||
|| result.code == HttpURLConnection.HTTP_BAD_GATEWAY
|
|| result.code == HttpURLConnection.HTTP_BAD_GATEWAY
|
||||||
|| result.code == HttpURLConnection.HTTP_UNAVAILABLE) {
|
|| result.code == HttpURLConnection.HTTP_UNAVAILABLE) {
|
||||||
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
|
LOGGER.error("[NACOS ConnectException httpPost] currentServerAddr: {}, httpCode: {}",
|
||||||
serverListMgr.getCurrentServerAddr(), result.code);
|
currentServerAddr, result.code);
|
||||||
} else {
|
} else {
|
||||||
|
// Update the currently available server addr
|
||||||
|
serverListMgr.updateCurrentServerAddr(currentServerAddr);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
} catch (ConnectException ce) {
|
} catch (ConnectException ce) {
|
||||||
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}", serverListMgr.getCurrentServerAddr());
|
LOGGER.error("[NACOS ConnectException httpPost] currentServerAddr: {}", currentServerAddr);
|
||||||
serverListMgr.refreshCurrentServerAddr();
|
|
||||||
} catch (SocketTimeoutException stoe) {
|
} catch (SocketTimeoutException stoe) {
|
||||||
LOGGER.error("[NACOS SocketTimeoutException]", "currentServerAddr: {}",
|
LOGGER.error("[NACOS SocketTimeoutException httpPost] currentServerAddr: {}, err : {}",
|
||||||
serverListMgr.getCurrentServerAddr());
|
currentServerAddr, stoe.getMessage());
|
||||||
serverListMgr.refreshCurrentServerAddr();
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOGGER.error("[NACOS IOException] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe);
|
LOGGER.error("[NACOS IOException httpPost] currentServerAddr: " + currentServerAddr, ioe);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (serverListMgr.getIterator().hasNext()) {
|
||||||
|
currentServerAddr = serverListMgr.getIterator().next();
|
||||||
|
} else {
|
||||||
|
maxRetry --;
|
||||||
|
if (maxRetry < 0) {
|
||||||
|
throw new ConnectException("[NACOS HTTP-POST] The maximum number of tolerable server reconnection errors has been reached");
|
||||||
|
}
|
||||||
|
serverListMgr.refreshCurrentServerAddr();
|
||||||
|
}
|
||||||
|
|
||||||
} while (System.currentTimeMillis() <= endTime);
|
} while (System.currentTimeMillis() <= endTime);
|
||||||
|
|
||||||
LOGGER.error("no available server");
|
LOGGER.error("no available server, currentServerAddr : {}", currentServerAddr);
|
||||||
throw new ConnectException("no available server");
|
throw new ConnectException("no available server, currentServerAddr : " + currentServerAddr);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -145,6 +176,10 @@ public class ServerHttpAgent implements HttpAgent {
|
|||||||
long readTimeoutMs) throws IOException {
|
long readTimeoutMs) throws IOException {
|
||||||
final long endTime = System.currentTimeMillis() + readTimeoutMs;
|
final long endTime = System.currentTimeMillis() + readTimeoutMs;
|
||||||
boolean isSSL = false;
|
boolean isSSL = false;
|
||||||
|
|
||||||
|
String currentServerAddr = serverListMgr.getCurrentServerAddr();
|
||||||
|
int maxRetry = this.maxRetry;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
try {
|
try {
|
||||||
List<String> newHeaders = getSpasHeaders(paramValues);
|
List<String> newHeaders = getSpasHeaders(paramValues);
|
||||||
@ -152,7 +187,7 @@ public class ServerHttpAgent implements HttpAgent {
|
|||||||
newHeaders.addAll(headers);
|
newHeaders.addAll(headers);
|
||||||
}
|
}
|
||||||
HttpResult result = HttpSimpleClient.httpDelete(
|
HttpResult result = HttpSimpleClient.httpDelete(
|
||||||
getUrl(serverListMgr.getCurrentServerAddr(), path, isSSL), newHeaders, paramValues, encoding,
|
getUrl(currentServerAddr, path, isSSL), newHeaders, paramValues, encoding,
|
||||||
readTimeoutMs, isSSL);
|
readTimeoutMs, isSSL);
|
||||||
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR
|
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR
|
||||||
|| result.code == HttpURLConnection.HTTP_BAD_GATEWAY
|
|| result.code == HttpURLConnection.HTTP_BAD_GATEWAY
|
||||||
@ -160,19 +195,29 @@ public class ServerHttpAgent implements HttpAgent {
|
|||||||
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
|
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
|
||||||
serverListMgr.getCurrentServerAddr(), result.code);
|
serverListMgr.getCurrentServerAddr(), result.code);
|
||||||
} else {
|
} else {
|
||||||
|
// Update the currently available server addr
|
||||||
|
serverListMgr.updateCurrentServerAddr(currentServerAddr);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
} catch (ConnectException ce) {
|
} catch (ConnectException ce) {
|
||||||
LOGGER.error("[NACOS ConnectException] currentServerAddr:{}", serverListMgr.getCurrentServerAddr());
|
LOGGER.error("[NACOS ConnectException httpDelete] currentServerAddr:{}", serverListMgr.getCurrentServerAddr());
|
||||||
serverListMgr.refreshCurrentServerAddr();
|
|
||||||
} catch (SocketTimeoutException stoe) {
|
} catch (SocketTimeoutException stoe) {
|
||||||
LOGGER.error("[NACOS SocketTimeoutException] currentServerAddr:{}", serverListMgr.getCurrentServerAddr());
|
LOGGER.error("[NACOS SocketTimeoutException httpDelete] currentServerAddr:{}", serverListMgr.getCurrentServerAddr());
|
||||||
serverListMgr.refreshCurrentServerAddr();
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOGGER.error("[NACOS IOException] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe);
|
LOGGER.error("[NACOS IOException httpDelete] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (serverListMgr.getIterator().hasNext()) {
|
||||||
|
currentServerAddr = serverListMgr.getIterator().next();
|
||||||
|
} else {
|
||||||
|
maxRetry --;
|
||||||
|
if (maxRetry < 0) {
|
||||||
|
throw new ConnectException("[NACOS HTTP-DELETE] The maximum number of tolerable server reconnection errors has been reached");
|
||||||
|
}
|
||||||
|
serverListMgr.refreshCurrentServerAddr();
|
||||||
|
}
|
||||||
|
|
||||||
} while (System.currentTimeMillis() <= endTime);
|
} while (System.currentTimeMillis() <= endTime);
|
||||||
|
|
||||||
LOGGER.error("no available server");
|
LOGGER.error("no available server");
|
||||||
@ -208,6 +253,7 @@ public class ServerHttpAgent implements HttpAgent {
|
|||||||
private void init(Properties properties) {
|
private void init(Properties properties) {
|
||||||
initEncode(properties);
|
initEncode(properties);
|
||||||
initAkSk(properties);
|
initAkSk(properties);
|
||||||
|
initMaxRetry(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initEncode(Properties properties) {
|
private void initEncode(Properties properties) {
|
||||||
@ -240,6 +286,10 @@ public class ServerHttpAgent implements HttpAgent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void initMaxRetry(Properties properties) {
|
||||||
|
maxRetry = NumberUtils.toInt(String.valueOf(properties.get(PropertyKeyConst.MAX_RETRY)), Constants.MAX_RETRY);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void start() throws NacosException {
|
public synchronized void start() throws NacosException {
|
||||||
serverListMgr.start();
|
serverListMgr.start();
|
||||||
@ -391,6 +441,7 @@ public class ServerHttpAgent implements HttpAgent {
|
|||||||
private String accessKey;
|
private String accessKey;
|
||||||
private String secretKey;
|
private String secretKey;
|
||||||
private String encode;
|
private String encode;
|
||||||
|
private int maxRetry = 3;
|
||||||
private volatile STSCredential sTSCredential;
|
private volatile STSCredential sTSCredential;
|
||||||
final ServerListManager serverListMgr;
|
final ServerListManager serverListMgr;
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package com.alibaba.nacos.client.config.impl;
|
package com.alibaba.nacos.client.config.impl;
|
||||||
|
|
||||||
|
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||||
import com.alibaba.nacos.api.common.Constants;
|
import com.alibaba.nacos.api.common.Constants;
|
||||||
import com.alibaba.nacos.api.config.listener.Listener;
|
import com.alibaba.nacos.api.config.listener.Listener;
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
import com.alibaba.nacos.api.exception.NacosException;
|
||||||
@ -28,6 +29,7 @@ import com.alibaba.nacos.client.monitor.MetricsMonitor;
|
|||||||
import com.alibaba.nacos.client.utils.LogUtils;
|
import com.alibaba.nacos.client.utils.LogUtils;
|
||||||
import com.alibaba.nacos.client.utils.ParamUtil;
|
import com.alibaba.nacos.client.utils.ParamUtil;
|
||||||
import com.alibaba.nacos.client.utils.StringUtils;
|
import com.alibaba.nacos.client.utils.StringUtils;
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import com.alibaba.nacos.client.utils.TenantUtil;
|
import com.alibaba.nacos.client.utils.TenantUtil;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
@ -35,8 +37,18 @@ import java.io.File;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URLDecoder;
|
import java.net.URLDecoder;
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
import java.util.concurrent.*;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static com.alibaba.nacos.api.common.Constants.LINE_SEPARATOR;
|
import static com.alibaba.nacos.api.common.Constants.LINE_SEPARATOR;
|
||||||
@ -302,7 +314,7 @@ public class ClientWorker {
|
|||||||
/**
|
/**
|
||||||
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
|
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
|
||||||
*/
|
*/
|
||||||
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) {
|
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
for (CacheData cacheData : cacheDatas) {
|
for (CacheData cacheData : cacheDatas) {
|
||||||
if (!cacheData.isUseLocalConfigInfo()) {
|
if (!cacheData.isUseLocalConfigInfo()) {
|
||||||
@ -328,10 +340,9 @@ public class ClientWorker {
|
|||||||
/**
|
/**
|
||||||
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
|
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
|
||||||
*/
|
*/
|
||||||
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) {
|
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
|
||||||
|
|
||||||
List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
|
List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
|
||||||
long timeout = TimeUnit.SECONDS.toMillis(30L);
|
|
||||||
|
|
||||||
List<String> headers = new ArrayList<String>(2);
|
List<String> headers = new ArrayList<String>(2);
|
||||||
headers.add("Long-Pulling-Timeout");
|
headers.add("Long-Pulling-Timeout");
|
||||||
@ -361,6 +372,7 @@ public class ClientWorker {
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
setHealthServer(false);
|
setHealthServer(false);
|
||||||
LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
|
LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
@ -403,10 +415,14 @@ public class ClientWorker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("PMD.ThreadPoolCreationRule")
|
@SuppressWarnings("PMD.ThreadPoolCreationRule")
|
||||||
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager) {
|
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
|
||||||
this.agent = agent;
|
this.agent = agent;
|
||||||
this.configFilterChainManager = configFilterChainManager;
|
this.configFilterChainManager = configFilterChainManager;
|
||||||
|
|
||||||
|
// Initialize the timeout parameter
|
||||||
|
|
||||||
|
init(properties);
|
||||||
|
|
||||||
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
|
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
@ -417,17 +433,18 @@ public class ClientWorker {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
executorService = Executors.newCachedThreadPool(new ThreadFactory() {
|
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
Thread t = new Thread(r);
|
Thread t = new Thread(r);
|
||||||
t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
|
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
|
||||||
t.setDaemon(true);
|
t.setDaemon(true);
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
executor.scheduleWithFixedDelay(new Runnable() {
|
executor.scheduleWithFixedDelay(new Runnable() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
checkConfigInfo();
|
checkConfigInfo();
|
||||||
@ -438,6 +455,14 @@ public class ClientWorker {
|
|||||||
}, 1L, 10L, TimeUnit.MILLISECONDS);
|
}, 1L, 10L, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void init(Properties properties) {
|
||||||
|
|
||||||
|
timeout = Math.max(NumberUtils.toInt(String.valueOf(properties.get(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT)),
|
||||||
|
Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);
|
||||||
|
|
||||||
|
taskPenaltyTime = NumberUtils.toInt(String.valueOf(properties.get(PropertyKeyConst.CONFIG_RETRY_TIME)), Constants.CONFIG_RETRY_TIME);
|
||||||
|
}
|
||||||
|
|
||||||
class LongPollingRunnable implements Runnable {
|
class LongPollingRunnable implements Runnable {
|
||||||
private int taskId;
|
private int taskId;
|
||||||
|
|
||||||
@ -445,9 +470,12 @@ public class ClientWorker {
|
|||||||
this.taskId = taskId;
|
this.taskId = taskId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
||||||
|
List<CacheData> cacheDatas = new ArrayList<CacheData>();
|
||||||
|
List<String> inInitializingCacheList = new ArrayList<String>();
|
||||||
try {
|
try {
|
||||||
List<CacheData> cacheDatas = new ArrayList<CacheData>();
|
|
||||||
// check failover config
|
// check failover config
|
||||||
for (CacheData cacheData : cacheMap.get().values()) {
|
for (CacheData cacheData : cacheMap.get().values()) {
|
||||||
if (cacheData.getTaskId() == taskId) {
|
if (cacheData.getTaskId() == taskId) {
|
||||||
@ -463,7 +491,6 @@ public class ClientWorker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List<String> inInitializingCacheList = new ArrayList<String>();
|
|
||||||
// check server config
|
// check server config
|
||||||
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
|
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
|
||||||
|
|
||||||
@ -497,10 +524,14 @@ public class ClientWorker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
inInitializingCacheList.clear();
|
inInitializingCacheList.clear();
|
||||||
} catch (Throwable e) {
|
|
||||||
LOGGER.error("longPolling error", e);
|
|
||||||
} finally {
|
|
||||||
executorService.execute(this);
|
executorService.execute(this);
|
||||||
|
|
||||||
|
} catch (Throwable e) {
|
||||||
|
|
||||||
|
// If the rotation training task is abnormal, the next execution time of the task will be punished
|
||||||
|
LOGGER.error("longPolling error : ", e);
|
||||||
|
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -513,8 +544,9 @@ public class ClientWorker {
|
|||||||
this.isHealthServer = isHealthServer;
|
this.isHealthServer = isHealthServer;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final ScheduledExecutorService executor;
|
final ScheduledExecutorService executor;
|
||||||
private final ExecutorService executorService;
|
final ScheduledExecutorService executorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* groupKey -> cacheData
|
* groupKey -> cacheData
|
||||||
*/
|
*/
|
||||||
@ -524,5 +556,7 @@ public class ClientWorker {
|
|||||||
private final HttpAgent agent;
|
private final HttpAgent agent;
|
||||||
private final ConfigFilterChainManager configFilterChainManager;
|
private final ConfigFilterChainManager configFilterChainManager;
|
||||||
private boolean isHealthServer = true;
|
private boolean isHealthServer = true;
|
||||||
|
private long timeout;
|
||||||
private double currentLongingTaskCount = 0;
|
private double currentLongingTaskCount = 0;
|
||||||
|
private int taskPenaltyTime;
|
||||||
}
|
}
|
||||||
|
@ -152,6 +152,7 @@ public class ServerListManager {
|
|||||||
contentPath, serverListName, namespace);
|
contentPath, serverListName, namespace);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initParam(Properties properties) {
|
private void initParam(Properties properties) {
|
||||||
@ -265,7 +266,8 @@ public class ServerListManager {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
serverUrls = new ArrayList<String>(newList);
|
serverUrls = new ArrayList<String>(newList);
|
||||||
currentServerAddr = iterator().next();
|
iterator = iterator();
|
||||||
|
currentServerAddr = iterator.next();
|
||||||
|
|
||||||
EventDispatcher.fireEvent(new ServerlistChangeEvent());
|
EventDispatcher.fireEvent(new ServerlistChangeEvent());
|
||||||
LOGGER.info("[{}] [update-serverlist] serverlist updated to {}", name, serverUrls);
|
LOGGER.info("[{}] [update-serverlist] serverlist updated to {}", name, serverUrls);
|
||||||
@ -332,16 +334,26 @@ public class ServerListManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void refreshCurrentServerAddr() {
|
public void refreshCurrentServerAddr() {
|
||||||
currentServerAddr = iterator().next();
|
iterator = iterator();
|
||||||
|
currentServerAddr = iterator.next();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getCurrentServerAddr() {
|
public String getCurrentServerAddr() {
|
||||||
if (StringUtils.isBlank(currentServerAddr)) {
|
if (StringUtils.isBlank(currentServerAddr)) {
|
||||||
currentServerAddr = iterator().next();
|
iterator = iterator();
|
||||||
|
currentServerAddr = iterator.next();
|
||||||
}
|
}
|
||||||
return currentServerAddr;
|
return currentServerAddr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateCurrentServerAddr(String currentServerAddr) {
|
||||||
|
this.currentServerAddr = currentServerAddr;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Iterator<String> getIterator() {
|
||||||
|
return iterator;
|
||||||
|
}
|
||||||
|
|
||||||
public String getContentPath() {
|
public String getContentPath() {
|
||||||
return contentPath;
|
return contentPath;
|
||||||
}
|
}
|
||||||
@ -383,11 +395,13 @@ public class ServerListManager {
|
|||||||
|
|
||||||
private volatile String currentServerAddr;
|
private volatile String currentServerAddr;
|
||||||
|
|
||||||
|
private Iterator<String> iterator;
|
||||||
public String serverPort = ParamUtil.getDefaultServerPort();
|
public String serverPort = ParamUtil.getDefaultServerPort();
|
||||||
|
|
||||||
public String addressServerUrl;
|
public String addressServerUrl;
|
||||||
|
|
||||||
private String serverAddrsStr;
|
private String serverAddrsStr;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -433,14 +447,17 @@ class ServerAddressIterator implements Iterator<String> {
|
|||||||
iter = sorted.iterator();
|
iter = sorted.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
return iter.hasNext();
|
return iter.hasNext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String next() {
|
public String next() {
|
||||||
return iter.next().serverIp;
|
return iter.next().serverIp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void remove() {
|
public void remove() {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
@ -406,6 +406,7 @@ public class LongPollingService extends AbstractEventListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}, timeoutTime, TimeUnit.MILLISECONDS);
|
}, timeoutTime, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
allSubs.add(this);
|
allSubs.add(this);
|
||||||
|
@ -0,0 +1,89 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.nacos.test.config;
|
||||||
|
|
||||||
|
import com.alibaba.nacos.api.NacosFactory;
|
||||||
|
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||||
|
import com.alibaba.nacos.api.config.ConfigService;
|
||||||
|
import com.alibaba.nacos.api.config.listener.Listener;
|
||||||
|
import com.alibaba.nacos.api.exception.NacosException;
|
||||||
|
import com.alibaba.nacos.config.server.Config;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.boot.web.server.LocalServerPort;
|
||||||
|
import org.springframework.test.context.junit4.SpringRunner;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author liaochuntao
|
||||||
|
* @date 2019-06-07 22:24
|
||||||
|
**/
|
||||||
|
@RunWith(SpringRunner.class)
|
||||||
|
@SpringBootTest(classes = Config.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||||
|
public class ConfigLongPoll_ITCase {
|
||||||
|
|
||||||
|
@LocalServerPort
|
||||||
|
private int port;
|
||||||
|
|
||||||
|
private ConfigService configService;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() throws NacosException {
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:" + port);
|
||||||
|
properties.put(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT, "20000");
|
||||||
|
properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, 3000);
|
||||||
|
configService = NacosFactory.createConfigService(properties);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws InterruptedException, NacosException {
|
||||||
|
|
||||||
|
configService.addListener("test", "DEFAULT_GROUP", new Listener() {
|
||||||
|
@Override
|
||||||
|
public Executor getExecutor() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void receiveConfigInfo(String configInfo) {
|
||||||
|
System.out.println(configInfo);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
configService.addListener("test-1", "DEFAULT_GROUP", new Listener() {
|
||||||
|
@Override
|
||||||
|
public Executor getExecutor() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void receiveConfigInfo(String configInfo) {
|
||||||
|
System.out.println(configInfo);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
TimeUnit.SECONDS.sleep(30);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -25,6 +25,7 @@ import org.junit.Test;
|
|||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
import org.springframework.boot.web.server.LocalServerPort;
|
import org.springframework.boot.web.server.LocalServerPort;
|
||||||
|
import org.springframework.test.annotation.Repeat;
|
||||||
import org.springframework.test.context.junit4.SpringRunner;
|
import org.springframework.test.context.junit4.SpringRunner;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -112,7 +113,6 @@ public class DeregisterInstance_ITCase {
|
|||||||
public void dregDomClusterTest() throws Exception {
|
public void dregDomClusterTest() throws Exception {
|
||||||
|
|
||||||
String serviceName = randomDomainName();
|
String serviceName = randomDomainName();
|
||||||
|
|
||||||
System.out.println(serviceName);
|
System.out.println(serviceName);
|
||||||
|
|
||||||
naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1");
|
naming.registerInstance(serviceName, "127.0.0.1", TEST_PORT, "c1");
|
||||||
|
Loading…
Reference in New Issue
Block a user