This commit is contained in:
chuntaojun 2019-06-05 22:31:09 +08:00
parent 82961fe721
commit 72a3a9b213
8 changed files with 189 additions and 16 deletions

View File

@ -44,6 +44,8 @@ public class PropertyKeyConst {
public final static String ENCODE = "encode";
public final static String TIMEOUT = "timeout";
public final static String NAMING_LOAD_CACHE_AT_START = "namingLoadCacheAtStart";
public final static String NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount";

View File

@ -81,7 +81,7 @@ public class NacosConfigService implements ConfigService {
initNamespace(properties);
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
agent.start();
worker = new ClientWorker(agent, configFilterChainManager);
worker = new ClientWorker(agent, configFilterChainManager, properties);
}
private void initNamespace(Properties properties) {

View File

@ -42,6 +42,8 @@ import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Server Agent
@ -106,35 +108,49 @@ public class ServerHttpAgent implements HttpAgent {
long readTimeoutMs) throws IOException {
final long endTime = System.currentTimeMillis() + readTimeoutMs;
boolean isSSL = false;
do {
String currentServerAddr = serverListMgr.getCurrentServerAddr();
try {
List<String> newHeaders = getSpasHeaders(paramValues);
if (headers != null) {
newHeaders.addAll(headers);
}
HttpResult result = HttpSimpleClient.httpPost(
getUrl(serverListMgr.getCurrentServerAddr(), path, isSSL), newHeaders, paramValues, encoding,
getUrl(currentServerAddr, path, isSSL), newHeaders, paramValues, encoding,
readTimeoutMs, isSSL);
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR
|| result.code == HttpURLConnection.HTTP_BAD_GATEWAY
|| result.code == HttpURLConnection.HTTP_UNAVAILABLE) {
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
serverListMgr.getCurrentServerAddr(), result.code);
currentServerAddr, result.code);
} else {
serverListMgr.delErrServerRecord(currentServerAddr);
return result;
}
} catch (ConnectException ce) {
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}", serverListMgr.getCurrentServerAddr());
serverListMgr.refreshCurrentServerAddr();
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}", currentServerAddr);
serverListMgr.addErrServerRecord(currentServerAddr);
serverListMgr.refreshCurrentServerAddr(currentServerAddr);
} catch (SocketTimeoutException stoe) {
LOGGER.error("[NACOS SocketTimeoutException]", "currentServerAddr: {}",
serverListMgr.getCurrentServerAddr());
LOGGER.error("[NACOS SocketTimeoutException] currentServerAddr: {} err : {}",
currentServerAddr, stoe.getMessage());
serverListMgr.addErrServerRecord(currentServerAddr);
serverListMgr.refreshCurrentServerAddr();
} catch (IOException ioe) {
LOGGER.error("[NACOS IOException] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe);
LOGGER.error("[NACOS IOException] currentServerAddr: " + currentServerAddr, ioe);
throw ioe;
}
// 保证依旧可以重试
if (serverListMgr.isMaxRetry()) {
LOGGER.error("The maximum number of tolerable server reconnection errors has been reached");
return null;
}
} while (System.currentTimeMillis() <= endTime);
LOGGER.error("no available server");

View File

@ -15,6 +15,7 @@
*/
package com.alibaba.nacos.client.config.impl;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
@ -29,6 +30,7 @@ import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.client.utils.ParamUtil;
import com.alibaba.nacos.client.utils.StringUtils;
import com.alibaba.nacos.client.utils.TemplateUtils;
import org.slf4j.Logger;
import java.io.File;
@ -42,6 +44,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -341,7 +345,10 @@ public class ClientWorker {
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) {
List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
long timeout = TimeUnit.SECONDS.toMillis(30L);
// Take a custom timeout parameter to circumvent network errors caused by network latency
long timeout = TimeUnit.SECONDS.toMillis(this.timeout);
List<String> headers = new ArrayList<String>(2);
headers.add("Long-Pulling-Timeout");
@ -361,6 +368,11 @@ public class ClientWorker {
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), timeout);
// The maximum number of tolerable server reconnection errors has been reached
if (result == null) {
return Collections.emptyList();
}
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.content);
@ -413,10 +425,14 @@ public class ClientWorker {
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager) {
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -427,7 +443,7 @@ public class ClientWorker {
}
});
executorService = Executors.newCachedThreadPool(new ThreadFactory() {
executorService = Executors.newScheduledThreadPool(Integer.MAX_VALUE, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
@ -448,6 +464,20 @@ public class ClientWorker {
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
private void init(Properties properties) {
try {
String timeoutStr = TemplateUtils.stringBlankAndThenExecute(properties.getProperty(PropertyKeyConst.TIMEOUT), new Callable<String>() {
@Override
public String call() throws Exception {
return String.valueOf(Constants.SO_TIMEOUT);
}
});
timeout = Long.parseLong(timeoutStr);
} catch (NumberFormatException nfe) {
timeout = Constants.SO_TIMEOUT;
}
}
class LongPollingRunnable implements Runnable {
private int taskId;
@ -507,10 +537,14 @@ public class ClientWorker {
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
LOGGER.error("longPolling error", e);
} finally {
executorService.execute(this);
// If the server fails, punish the task execution and delay the next task execution
// to avoid the client sending a large number of requests when the server cannot
// respond to the request
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
@ -526,7 +560,7 @@ public class ClientWorker {
}
final ScheduledExecutorService executor;
final ExecutorService executorService;
final ScheduledExecutorService executorService;
/**
* groupKey -> cacheData
*/
@ -536,5 +570,7 @@ public class ClientWorker {
HttpAgent agent;
ConfigFilterChainManager configFilterChainManager;
private boolean isHealthServer = true;
private long timeout;
private double currentLongingTaskCount = 0;
private long taskPenaltyTime = 2000;
}

View File

@ -28,7 +28,9 @@ import java.io.StringReader;
import java.net.HttpURLConnection;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Serverlist Manager
@ -43,6 +45,8 @@ public class ServerListManager {
isFixed = false;
isStarted = false;
name = DEFAULT_NAME;
errServerSet = new CopyOnWriteArraySet();
}
public ServerListManager(List<String> fixed) {
@ -69,6 +73,9 @@ public class ServerListManager {
name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()])) + "-"
+ namespace;
}
errServerSet = new CopyOnWriteArraySet();
}
public ServerListManager(String host, int port) {
@ -76,6 +83,9 @@ public class ServerListManager {
isStarted = false;
name = CUSTOM_NAME + "-" + host + "-" + port;
addressServerUrl = String.format("http://%s:%d/%s/%s", host, port, contentPath, serverListName);
errServerSet = new CopyOnWriteArraySet();
}
public ServerListManager(String endpoint) throws NacosException {
@ -106,6 +116,9 @@ public class ServerListManager {
addressServerUrl = String.format("http://%s:%d/%s/%s?namespace=%s", endpoint, endpointPort, contentPath,
serverListName, namespace);
}
errServerSet = new CopyOnWriteArraySet();
}
public ServerListManager(Properties properties) throws NacosException {
@ -151,6 +164,9 @@ public class ServerListManager {
contentPath, serverListName, namespace);
}
}
errServerSet = new CopyOnWriteArraySet();
}
private void initParam(Properties properties) {
@ -225,6 +241,15 @@ public class ServerListManager {
return new ServerAddressIterator(serverUrls);
}
// Consider adding a penalty mechanism, with the offending node coming last
Iterator<String> iterator(String errServerIp) {
if (serverUrls.isEmpty()) {
LOGGER.error("[{}] [iterator-serverlist] No server address defined!", name);
}
return new ServerAddressIterator(serverUrls, errServerIp);
}
class GetServerListTask implements Runnable {
final String url;
@ -328,6 +353,12 @@ public class ServerListManager {
currentServerAddr = iterator().next();
}
// Consider adding a penalty mechanism, with the offending node coming last
public void refreshCurrentServerAddr(String errServerIp) {
currentServerAddr = iterator(errServerIp).next();
}
public String getCurrentServerAddr() {
if (StringUtils.isBlank(currentServerAddr)) {
currentServerAddr = iterator().next();
@ -335,6 +366,27 @@ public class ServerListManager {
return currentServerAddr;
}
public void addErrServerRecord(String errServerIp) {
errServerSet.add(errServerIp);
if (errServerSet.size() == serverUrls.size()) {
retryCnt.incrementAndGet();
// 对错误 errServerSet 进行清空进行新一轮的重试
errServerSet.clear();
}
}
public void delErrServerRecord(String errServerIp) {
errServerSet.remove(errServerIp);
// 如果当前的 errServerSet.size() serverUrls.size() 的比例介于 (0, 1/2] 之间清除重试次数信息
if (errServerSet.size() * 2 <= serverUrls.size()) {
retryCnt.set(0);
}
}
public boolean isMaxRetry() {
return retryCnt.get() >= 4;
}
public String getContentPath() {
return contentPath;
}
@ -374,6 +426,9 @@ public class ServerListManager {
private String serverListName = ParamUtil.getDefaultNodesPath();
volatile List<String> serverUrls = new ArrayList<String>();
private final CopyOnWriteArraySet<String> errServerSet;
private final AtomicInteger retryCnt = new AtomicInteger(0);
private volatile String currentServerAddr;
public String serverPort = ParamUtil.getDefaultServerPort();
@ -426,6 +481,21 @@ class ServerAddressIterator implements Iterator<String> {
iter = sorted.iterator();
}
// Consider adding a penalty mechanism, with the offending node coming last
public ServerAddressIterator(List<String> source, String errServerIp) {
sorted = new ArrayList<RandomizedServerAddress>();
for (String address : source) {
if (address.equals(errServerIp)) {
continue;
}
sorted.add(new RandomizedServerAddress(address));
}
Collections.sort(sorted);
sorted.add(new RandomizedServerAddress(errServerIp));
iter = sorted.iterator();
}
public boolean hasNext() {
return iter.hasNext();
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client;
import org.junit.Before;
import org.junit.Test;
import java.util.Random;
/**
* @author liaochuntao
* @date 2019-06-05 20:55
**/
public class RandomTest {
private Random random;
@Before
public void before() {
random = new Random();
}
@Test
public void test() throws InterruptedException {
long[] tmp = new long[10];
for (int i = 0; i < 10; i++) {
tmp[i] = random.nextInt(Integer.MAX_VALUE);
}
Thread.sleep(100);
for (int i = 0; i < 10; i++) {
System.out.println(tmp[i] == random.nextInt(Integer.MAX_VALUE));
}
}
}

View File

@ -406,6 +406,7 @@ public class LongPollingService extends AbstractEventListener {
}
}
//TODO 为什么任务延迟执行时间设置为超时时间会不会导致 read timeout
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);

View File

@ -70,8 +70,7 @@ public class ConfigAPI_ITCase {
@Before
public void setUp() throws Exception {
Properties properties = new Properties();
properties.put(PropertyKeyConst.ENDPOINT, "127.0.0.1");
properties.put(PropertyKeyConst.ENDPOINT_PORT, "8080");
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1"+":"+port);
iconfig = NacosFactory.createConfigService(properties);
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));