Extract server list maintain logic to ServerListManager (#3389)

This commit is contained in:
杨翊 SionYang 2020-07-20 17:52:58 +08:00 committed by GitHub
parent 39986a867a
commit 2e4d50c4f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 249 additions and 177 deletions

View File

@ -25,10 +25,10 @@ import com.alibaba.nacos.api.naming.pojo.Service;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.api.selector.ExpressionSelector;
import com.alibaba.nacos.api.selector.NoneSelector;
import com.alibaba.nacos.client.naming.core.ServerListManager;
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
import com.alibaba.nacos.client.naming.utils.InitUtils;
import com.alibaba.nacos.client.utils.ValidatorUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import java.util.Map;
import java.util.Properties;
@ -44,10 +44,6 @@ public class NacosNamingMaintainService implements NamingMaintainService {
private String namespace;
private String endpoint;
private String serverList;
private NamingHttpClientProxy serverProxy;
public NacosNamingMaintainService(String serverList) throws NacosException {
@ -64,17 +60,9 @@ public class NacosNamingMaintainService implements NamingMaintainService {
ValidatorUtils.checkInitParam(properties);
namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
initServerAddr(properties);
InitUtils.initWebRootContext();
serverProxy = new NamingHttpClientProxy(namespace, endpoint, serverList, properties);
}
private void initServerAddr(Properties properties) {
serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
endpoint = InitUtils.initEndpoint(properties);
if (StringUtils.isNotEmpty(endpoint)) {
serverList = "";
}
ServerListManager serverListManager = new ServerListManager(properties);
serverProxy = new NamingHttpClientProxy(namespace, serverListManager, properties);
}
@Override

View File

@ -30,12 +30,12 @@ import com.alibaba.nacos.client.naming.beat.BeatReactor;
import com.alibaba.nacos.client.naming.core.Balancer;
import com.alibaba.nacos.client.naming.core.EventDispatcher;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.naming.core.ServerListManager;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.InitUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.client.remote.ServerListFactory;
import com.alibaba.nacos.client.utils.ValidatorUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.StringUtils;
@ -44,7 +44,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Nacos Naming Service.
@ -59,10 +58,6 @@ public class NacosNamingService implements NamingService {
*/
private String namespace;
private String endpoint;
private String serverList;
private String cacheDir;
private String logName;
@ -91,34 +86,18 @@ public class NacosNamingService implements NamingService {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
initServerAddr(properties);
InitUtils.initWebRootContext();
initCacheDir();
initLogName(properties);
ServerListManager serverListManager = new ServerListManager(properties);
this.eventDispatcher = new EventDispatcher();
this.serverProxy = new NamingHttpClientProxy(this.namespace, this.endpoint, this.serverList, properties);
this.serverProxy = new NamingHttpClientProxy(this.namespace, serverListManager, properties);
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
isLoadCacheAtStart(properties), initPollingThreadCount(properties));
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, hostReactor);
grpcClientProxy.start(new ServerListFactory() {
private final AtomicInteger index = new AtomicInteger();
private final String[] serverLists = serverList.split(",");
@Override
public String genNextServer() {
int nextIndex = index.getAndIncrement() % serverLists.length;
return serverLists[nextIndex];
}
@Override
public String getCurrentServer() {
return serverLists[index.get() % serverLists.length];
}
});
grpcClientProxy.start(serverListManager);
}
private int initClientBeatThreadCount(Properties properties) {
@ -151,14 +130,6 @@ public class NacosNamingService implements NamingService {
return loadCacheAtStart;
}
private void initServerAddr(Properties properties) {
serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
endpoint = InitUtils.initEndpoint(properties);
if (StringUtils.isNotEmpty(endpoint)) {
serverList = "";
}
}
private void initLogName(Properties properties) {
logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
if (StringUtils.isEmpty(logName)) {

View File

@ -0,0 +1,180 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.core;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientManager;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.InitUtils;
import com.alibaba.nacos.client.naming.utils.NamingHttpUtil;
import com.alibaba.nacos.client.remote.ServerListFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.http.HttpRestResult;
import com.alibaba.nacos.common.http.client.NacosRestTemplate;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
/**
* Server list manager.
*
* @author xiweng.yy
*/
public class ServerListManager implements ServerListFactory, Closeable {
private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();
private final long refreshServerListInternal = TimeUnit.SECONDS.toMillis(30);
private final AtomicInteger currentIndex = new AtomicInteger();
private List<String> serversFromEndpoint = new ArrayList<String>();
private List<String> serverList = new ArrayList<String>();
private ScheduledExecutorService refreshServerListExecutor;
private String endpoint;
private String nacosDomain;
private long lastServerListRefreshTime = 0L;
public ServerListManager(Properties properties) {
initServerAddr(properties);
}
private void initServerAddr(Properties properties) {
this.endpoint = InitUtils.initEndpoint(properties);
if (StringUtils.isNotEmpty(endpoint)) {
this.serversFromEndpoint = getServerListFromEndpoint();
refreshServerListExecutor = new ScheduledThreadPoolExecutor(1,
new NameThreadFactory("com.alibaba.nacos.client.naming.server.list.refresher"));
refreshServerListExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
refreshServerListIfNeed();
}
}, 0, refreshServerListInternal, TimeUnit.MILLISECONDS);
} else {
String serverListFromProps = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
if (StringUtils.isNotEmpty(serverListFromProps)) {
this.serverList.addAll(Arrays.asList(serverListFromProps.split(",")));
if (this.serverList.size() == 1) {
this.nacosDomain = serverListFromProps;
}
}
}
}
private List<String> getServerListFromEndpoint() {
try {
String urlString = "http://" + endpoint + "/nacos/serverlist";
Header header = NamingHttpUtil.builderHeader();
HttpRestResult<String> restResult = nacosRestTemplate.get(urlString, header, Query.EMPTY, String.class);
if (!restResult.ok()) {
throw new IOException(
"Error while requesting: " + urlString + "'. Server returned: " + restResult.getCode());
}
String content = restResult.getData();
List<String> list = new ArrayList<String>();
for (String line : IoUtils.readLines(new StringReader(content))) {
if (!line.trim().isEmpty()) {
list.add(line.trim());
}
}
return list;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private void refreshServerListIfNeed() {
try {
if (!CollectionUtils.isEmpty(serverList)) {
NAMING_LOGGER.debug("server list provided by user: " + serverList);
return;
}
if (System.currentTimeMillis() - lastServerListRefreshTime < refreshServerListInternal) {
return;
}
List<String> list = getServerListFromEndpoint();
if (CollectionUtils.isEmpty(list)) {
throw new Exception("Can not acquire Nacos list");
}
if (!CollectionUtils.isEqualCollection(list, serversFromEndpoint)) {
NAMING_LOGGER.info("[SERVER-LIST] server list is updated: " + list);
}
serversFromEndpoint = list;
lastServerListRefreshTime = System.currentTimeMillis();
} catch (Throwable e) {
NAMING_LOGGER.warn("failed to update server list", e);
}
}
public boolean isDomain() {
return StringUtils.isNotBlank(nacosDomain);
}
public String getNacosDomain() {
return nacosDomain;
}
public List<String> getServerList() {
return serverList.isEmpty() ? serversFromEndpoint : serverList;
}
@Override
public String genNextServer() {
int index = currentIndex.incrementAndGet() % getServerList().size();
return getServerList().get(index);
}
@Override
public String getCurrentServer() {
return getServerList().get(currentIndex.get() % getServerList().size());
}
@Override
public void shutdown() throws NacosException {
String className = this.getClass().getName();
NAMING_LOGGER.info("{} do shutdown begin", className);
if (null != refreshServerListExecutor) {
ThreadUtils.shutdownThreadPool(refreshServerListExecutor, NAMING_LOGGER);
}
NamingHttpClientManager.getInstance().shutdown();
NAMING_LOGGER.info("{} do shutdown stop", className);
}
}

View File

@ -32,35 +32,27 @@ import com.alibaba.nacos.api.selector.SelectorType;
import com.alibaba.nacos.client.config.impl.SpasAdapter;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.core.ServerListManager;
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.NamingHttpUtil;
import com.alibaba.nacos.client.naming.utils.NetUtils;
import com.alibaba.nacos.client.naming.utils.SignUtil;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.client.security.SecurityProxy;
import com.alibaba.nacos.client.utils.AppNameUtils;
import com.alibaba.nacos.client.utils.ClientCommonUtils;
import com.alibaba.nacos.client.utils.TemplateUtils;
import com.alibaba.nacos.common.constant.HttpHeaderConsts;
import com.alibaba.nacos.common.http.HttpRestResult;
import com.alibaba.nacos.common.http.client.NacosRestTemplate;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.utils.HttpMethod;
import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.utils.UuidUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.http.HttpStatus;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -90,45 +82,28 @@ public class NamingHttpClientProxy implements NamingClientProxy {
private final String namespaceId;
private final String endpoint;
private String nacosDomain;
private List<String> serverList;
private List<String> serversFromEndpoint = new ArrayList<String>();
private final SecurityProxy securityProxy;
private long lastSrvRefTime = 0L;
private final long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30);
private final long securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5);
private final ServerListManager serverListManager;
private Properties properties;
private ScheduledExecutorService executorService;
public NamingHttpClientProxy(String namespaceId, String endpoint, String serverList, Properties properties) {
public NamingHttpClientProxy(String namespaceId, ServerListManager serverListManager, Properties properties) {
this.serverListManager = serverListManager;
this.securityProxy = new SecurityProxy(properties, nacosRestTemplate);
this.properties = properties;
this.setServerPort(DEFAULT_SERVER_PORT);
this.namespaceId = namespaceId;
this.endpoint = endpoint;
if (StringUtils.isNotEmpty(serverList)) {
this.serverList = Arrays.asList(serverList.split(","));
if (this.serverList.size() == 1) {
this.nacosDomain = serverList;
}
}
this.initRefreshTask();
}
private void initRefreshTask() {
this.executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
@ -141,76 +116,11 @@ public class NamingHttpClientProxy implements NamingClientProxy {
this.executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
refreshSrvIfNeed();
}
}, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
this.executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
securityProxy.login(getServerList());
securityProxy.login(serverListManager.getServerList());
}
}, 0, securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
refreshSrvIfNeed();
this.securityProxy.login(getServerList());
}
public List<String> getServerListFromEndpoint() {
try {
String urlString = "http://" + endpoint + "/nacos/serverlist";
Header header = builderHeader();
HttpRestResult<String> restResult = nacosRestTemplate.get(urlString, header, Query.EMPTY, String.class);
if (!restResult.ok()) {
throw new IOException(
"Error while requesting: " + urlString + "'. Server returned: " + restResult.getCode());
}
String content = restResult.getData();
List<String> list = new ArrayList<String>();
for (String line : IoUtils.readLines(new StringReader(content))) {
if (!line.trim().isEmpty()) {
list.add(line.trim());
}
}
return list;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private void refreshSrvIfNeed() {
try {
if (!CollectionUtils.isEmpty(serverList)) {
NAMING_LOGGER.debug("server list provided by user: " + serverList);
return;
}
if (System.currentTimeMillis() - lastSrvRefTime < vipSrvRefInterMillis) {
return;
}
List<String> list = getServerListFromEndpoint();
if (CollectionUtils.isEmpty(list)) {
throw new Exception("Can not acquire Nacos list");
}
if (!CollectionUtils.isEqualCollection(list, serversFromEndpoint)) {
NAMING_LOGGER.info("[SERVER-LIST] server list is updated: " + list);
}
serversFromEndpoint = list;
lastSrvRefTime = System.currentTimeMillis();
} catch (Throwable e) {
NAMING_LOGGER.warn("failed to update server list", e);
}
this.securityProxy.login(serverListManager.getServerList());
}
@Override
@ -472,7 +382,7 @@ public class NamingHttpClientProxy implements NamingClientProxy {
public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
throws NacosException {
return reqApi(api, params, body, getServerList(), method);
return reqApi(api, params, body, serverListManager.getServerList(), method);
}
/**
@ -491,7 +401,7 @@ public class NamingHttpClientProxy implements NamingClientProxy {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
@ -516,14 +426,14 @@ public class NamingHttpClientProxy implements NamingClientProxy {
}
}
if (StringUtils.isNotBlank(nacosDomain)) {
if (serverListManager.isDomain()) {
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
return callServer(api, params, body, serverListManager.getNacosDomain(), method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
NAMING_LOGGER.debug("request {} failed.", serverListManager.getNacosDomain(), e);
}
}
}
@ -537,14 +447,6 @@ public class NamingHttpClientProxy implements NamingClientProxy {
}
private List<String> getServerList() {
List<String> snapshot = serversFromEndpoint;
if (!CollectionUtils.isEmpty(serverList)) {
snapshot = serverList;
}
return snapshot;
}
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer)
throws NacosException {
return callServer(api, params, body, curServer, HttpMethod.GET);
@ -566,7 +468,7 @@ public class NamingHttpClientProxy implements NamingClientProxy {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader();
Header header = NamingHttpUtil.builderHeader();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
@ -623,22 +525,6 @@ public class NamingHttpClientProxy implements NamingClientProxy {
}
}
/**
* Build header.
*
* @return header
*/
public Header builderHeader() {
Header header = Header.newInstance();
header.addParam(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
header.addParam(HttpHeaderConsts.USER_AGENT_HEADER, ClientCommonUtils.VERSION);
header.addParam(HttpHeaderConsts.ACCEPT_ENCODING, "gzip,deflate,sdch");
header.addParam(HttpHeaderConsts.CONNECTION, "Keep-Alive");
header.addParam(HttpHeaderConsts.REQUEST_ID, UuidUtils.generateUuid());
header.addParam(HttpHeaderConsts.REQUEST_MODULE, "Naming");
return header;
}
private static String getSignData(String serviceName) {
return StringUtils.isNotEmpty(serviceName) ? System.currentTimeMillis() + "@@" + serviceName
: String.valueOf(System.currentTimeMillis());

View File

@ -0,0 +1,47 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.utils;
import com.alibaba.nacos.client.utils.ClientCommonUtils;
import com.alibaba.nacos.common.constant.HttpHeaderConsts;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.utils.UuidUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
/**
* Naming sdk http util.
*
* @author xiweng.yy
*/
public class NamingHttpUtil {
/**
* Build header.
*
* @return header
*/
public static Header builderHeader() {
Header header = Header.newInstance();
header.addParam(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
header.addParam(HttpHeaderConsts.USER_AGENT_HEADER, ClientCommonUtils.VERSION);
header.addParam(HttpHeaderConsts.ACCEPT_ENCODING, "gzip,deflate,sdch");
header.addParam(HttpHeaderConsts.CONNECTION, "Keep-Alive");
header.addParam(HttpHeaderConsts.REQUEST_ID, UuidUtils.generateUuid());
header.addParam(HttpHeaderConsts.REQUEST_MODULE, "Naming");
return header;
}
}