Merge pull request #2313 from paderlol/develop

Fixed heath check error with Nacos by standalone #2295
This commit is contained in:
Peter Zhu 2020-01-17 16:47:07 +08:00 committed by GitHub
commit fd5f0fe159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,7 +20,6 @@ import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.service.notify.NotifyService;
import com.alibaba.nacos.config.server.service.notify.NotifyService.HttpResult;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.config.server.utils.RunningConfigUtils;
import com.alibaba.nacos.config.server.utils.event.EventDispatcher;
@ -32,10 +31,9 @@ import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@ -44,9 +42,14 @@ import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.StringReader;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.alibaba.nacos.config.server.utils.LogUtil.defaultLog;
import static com.alibaba.nacos.config.server.utils.LogUtil.fatalLog;
@ -60,17 +63,21 @@ import static com.alibaba.nacos.core.utils.SystemUtils.*;
@Service
public class ServerListService implements ApplicationListener<WebServerInitializedEvent> {
@Autowired
private Environment env;
@Autowired
private ServletContext servletContext;
private final ServletContext servletContext;
@Value("${server.port:8848}")
private int port;
@Value("${useAddressServer}")
private Boolean isUseAddressServer = true;
public ServerListService(ServletContext servletContext) {
this.servletContext = servletContext;
}
@PostConstruct
public void init() {
serverPort = System.getProperty("nacos.server.port", "8848");
String envDomainName = System.getenv("address_server_domain");
if (StringUtils.isBlank(envDomainName)) {
domainName = System.getProperty("address.server.domain", "jmenv.tbsite.net");
@ -88,55 +95,23 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;
envIdUrl = "http://" + domainName + ":" + addressPort + "/env";
defaultLog.info("ServerListService address-server port:" + serverPort);
defaultLog.info("ServerListService address-server port:" + addressPort);
defaultLog.info("ADDRESS_SERVER_URL:" + addressServerUrl);
isHealthCheck = PropertyUtil.isHealthCheck();
maxFailCount = PropertyUtil.getMaxHealthCheckFailCount();
try {
String val = null;
val = env.getProperty("useAddressServer");
if (val != null && FALSE_STR.equals(val)) {
isUseAddressServer = false;
}
fatalLog.warn("useAddressServer:{}", isUseAddressServer);
} catch (Exception e) {
fatalLog.error("read application.properties wrong", e);
}
fatalLog.warn("useAddressServer:{}", isUseAddressServer);
GetServerListTask task = new GetServerListTask();
task.run();
if (null == serverList || serverList.isEmpty()) {
if (CollectionUtils.isEmpty(serverList)) {
fatalLog.error("########## cannot get serverlist, so exit.");
throw new RuntimeException("cannot get serverlist, so exit.");
} else {
TimerTaskService.scheduleWithFixedDelay(task, 0L, 5L, TimeUnit.SECONDS);
}
httpclient.start();
CheckServerHealthTask checkServerHealthTask = new CheckServerHealthTask();
TimerTaskService.scheduleWithFixedDelay(checkServerHealthTask, 0L, 5L, TimeUnit.SECONDS);
}
public String getEnvId() {
String envId = "";
int i = 0;
do {
envId = getEnvIdHttp();
if (StringUtils.isBlank(envId)) {
i++;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LogUtil.defaultLog.error("sleep interrupt");
}
}
} while (StringUtils.isBlank(envId) && i < 5);
if (!StringUtils.isBlank(envId)) {
} else {
LogUtil.defaultLog.error("envId is blank");
}
return envId;
}
public List<String> getServerList() {
return new ArrayList<String>(serverList);
@ -161,21 +136,15 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
/**
* serverList has changed
*/
static public class ServerlistChangeEvent implements EventDispatcher.Event {
static public class ServerListChangeEvent implements EventDispatcher.Event {
}
private void updateIfChanged(List<String> newList) {
if (newList.isEmpty()) {
if (CollectionUtils.isEmpty(newList)||newList.equals(serverList)) {
return;
}
boolean isContainSelfIp = false;
for (String ipPortTmp : newList) {
if (ipPortTmp.contains(LOCAL_IP)) {
isContainSelfIp = true;
break;
}
}
boolean isContainSelfIp = newList.stream().anyMatch(ipPortTmp -> ipPortTmp.contains(LOCAL_IP));
if (isContainSelfIp) {
isInIpList = true;
@ -186,30 +155,23 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
fatalLog.error("########## [serverlist] self ip {} not in serverlist {}", selfAddr, newList);
}
if (newList.equals(serverList)) {
return;
}
serverList = new ArrayList<String>(newList);
List<String> unhealthRemoved = new ArrayList<String>();
for (String unhealthIp : serverListUnhealth) {
if (!newList.contains(unhealthIp)) {
unhealthRemoved.add(unhealthIp);
if(!serverListUnhealth.isEmpty()){
List<String> unhealthyRemoved = serverListUnhealth.stream()
.filter(unhealthyIp -> !newList.contains(unhealthyIp)).collect(Collectors.toList());
serverListUnhealth.removeAll(unhealthyRemoved);
List<String> unhealthyCountRemoved = serverIp2unhealthCount.keySet().stream()
.filter(key -> !newList.contains(key)).collect(Collectors.toList());
for (String unhealthyCountTmp : unhealthyCountRemoved) {
serverIp2unhealthCount.remove(unhealthyCountTmp);
}
}
serverListUnhealth.removeAll(unhealthRemoved);
List<String> unhealthCountRemoved = new ArrayList<String>();
for (Map.Entry<String, Integer> ip2UnhealthCountTmp : serverIp2unhealthCount.entrySet()) {
if (!newList.contains(ip2UnhealthCountTmp.getKey())) {
unhealthCountRemoved.add(ip2UnhealthCountTmp.getKey());
}
}
for (String unhealthCountTmp : unhealthCountRemoved) {
serverIp2unhealthCount.remove(unhealthCountTmp);
}
defaultLog.warn("[serverlist] updated to {}", serverList);
@ -217,7 +179,7 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
/**
* 非并发fireEvent
*/
EventDispatcher.fireEvent(new ServerlistChangeEvent());
EventDispatcher.fireEvent(new ServerListChangeEvent());
}
/**
@ -256,7 +218,7 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
if (HttpServletResponse.SC_OK == result.code) {
isAddressServerHealth = true;
addressServerFailCcount = 0;
addressServerFailCount = 0;
List<String> lines = IoUtils.readLines(new StringReader(result.content));
List<String> ips = new ArrayList<String>(lines.size());
for (String serverAddr : lines) {
@ -266,16 +228,16 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
}
return ips;
} else {
addressServerFailCcount++;
if (addressServerFailCcount >= maxFailCount) {
addressServerFailCount++;
if (addressServerFailCount >= maxFailCount) {
isAddressServerHealth = false;
}
defaultLog.error("[serverlist] failed to get serverlist, error code {}", result.code);
return Collections.emptyList();
}
} catch (IOException e) {
addressServerFailCcount++;
if (addressServerFailCcount >= maxFailCount) {
addressServerFailCount++;
if (addressServerFailCount >= maxFailCount) {
isAddressServerHealth = false;
}
defaultLog.error("[serverlist] exception, " + e.toString(), e);
@ -302,23 +264,7 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
}
}
private String getEnvIdHttp() {
try {
// "http://jmenv.tbsite.net:8080/env";
HttpResult result = NotifyService.invokeURL(envIdUrl, null, null);
if (HttpServletResponse.SC_OK == result.code) {
return result.content.trim();
} else {
defaultLog.error("[envId] failed to get envId, error code {}", result.code);
return "";
}
} catch (IOException e) {
defaultLog.error("[envId] exception, " + e.toString(), e);
return "";
}
}
class GetServerListTask implements Runnable {
@Override
@ -338,18 +284,18 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
String url = "http://" + serverIp + servletContext.getContextPath() + Constants.HEALTH_CONTROLLER_PATH;
// "/nacos/health";
HttpGet request = new HttpGet(url);
httpclient.execute(request, new AyscCheckServerHealthCallBack(serverIp));
httpclient.execute(request, new AsyncCheckServerHealthCallBack(serverIp));
}
long endCheckTime = System.currentTimeMillis();
long cost = endCheckTime - startCheckTime;
defaultLog.debug("checkServerHealth cost: {}", cost);
}
class AyscCheckServerHealthCallBack implements FutureCallback<HttpResponse> {
class AsyncCheckServerHealthCallBack implements FutureCallback<HttpResponse> {
private String serverIp;
public AyscCheckServerHealthCallBack(String serverIp) {
public AsyncCheckServerHealthCallBack(String serverIp) {
this.serverIp = serverIp;
}
@ -363,29 +309,16 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
@Override
public void failed(Exception ex) {
int failCount = serverIp2unhealthCount.compute(serverIp,(key,oldValue)->{
if(oldValue == null){
return 1;
}
return oldValue+1;
});
if (failCount > maxFailCount) {
if (!serverListUnhealth.contains(serverIp)) {
serverListUnhealth.add(serverIp);
}
defaultLog.error("unhealthIp:{}, unhealthCount:{}", serverIp, failCount);
MetricsMonitor.getUnhealthException().increment();
}
computeFailCount();
}
@Override
public void cancelled() {
int failCount = serverIp2unhealthCount.compute(serverIp,(key,oldValue)->{
if(oldValue == null){
return 1;
computeFailCount();
}
return oldValue+1;
});
private void computeFailCount() {
int failCount = serverIp2unhealthCount.compute(serverIp,(key,oldValue)->oldValue == null?1:oldValue+1);
if (failCount > maxFailCount) {
if (!serverListUnhealth.contains(serverIp)) {
serverListUnhealth.add(serverIp);
@ -435,15 +368,15 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
static final int TIMEOUT = 5000;
private int maxFailCount = 12;
private static volatile List<String> serverList = new ArrayList<String>();
private static volatile List<String> serverListUnhealth = new ArrayList<String>();
private static volatile List<String> serverListUnhealth = Collections.synchronizedList(new ArrayList<String>());;
private static volatile boolean isAddressServerHealth = true;
private static volatile int addressServerFailCcount = 0;
private static volatile int addressServerFailCount = 0;
private static volatile boolean isInIpList = true;
/**
* ip unhealth count
*/
private static volatile Map<String, Integer> serverIp2unhealthCount = new HashMap<String, Integer>();
private static Map<String, Integer> serverIp2unhealthCount = new ConcurrentHashMap<>();
private RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(PropertyUtil.getNotifyConnectTimeout())
.setSocketTimeout(PropertyUtil.getNotifySocketTimeout()).build();
@ -451,29 +384,20 @@ public class ServerListService implements ApplicationListener<WebServerInitializ
private CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig)
.build();
/**
* server之间通信的端口
*/
public String serverPort;
public String domainName;
public String addressPort;
public String addressUrl;
public String envIdUrl;
public String addressServerUrl;
private Boolean isUseAddressServer = true;
private boolean isHealthCheck = true;
private final static String FALSE_STR = "false";
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
if (port == 0) {
port = event.getWebServer().getPort();
List<String> newList = new ArrayList<String>();
for (String serverAddrTmp : serverList) {
newList.add(getFormatServerAddr(serverAddrTmp));
}
setServerList(new ArrayList<String>(newList));
}
httpclient.start();
CheckServerHealthTask checkServerHealthTask = new CheckServerHealthTask();
TimerTaskService.scheduleWithFixedDelay(checkServerHealthTask, 0L, 5L, TimeUnit.SECONDS);
}
}