Merge pull request #1241 from alibaba/hotfix_server_list_inconsistent
Fix #1091
This commit is contained in:
commit
63c98e1aad
@ -17,7 +17,6 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro;
|
||||
|
||||
import com.alibaba.nacos.naming.cluster.ServerListManager;
|
||||
import com.alibaba.nacos.naming.cluster.servers.Server;
|
||||
import com.alibaba.nacos.naming.cluster.servers.ServerChangeListener;
|
||||
import com.alibaba.nacos.naming.cluster.transport.Serializer;
|
||||
import com.alibaba.nacos.naming.consistency.Datum;
|
||||
import com.alibaba.nacos.naming.consistency.KeyBuilder;
|
||||
@ -43,7 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
*/
|
||||
@Component
|
||||
@DependsOn("serverListManager")
|
||||
public class DataSyncer implements ServerChangeListener {
|
||||
public class DataSyncer {
|
||||
|
||||
@Autowired
|
||||
private DataStore dataStore;
|
||||
@ -62,11 +61,8 @@ public class DataSyncer implements ServerChangeListener {
|
||||
|
||||
private Map<String, String> taskMap = new ConcurrentHashMap<>();
|
||||
|
||||
private List<Server> servers;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
serverListManager.listen(this);
|
||||
startTimedSync();
|
||||
}
|
||||
|
||||
@ -97,7 +93,7 @@ public class DataSyncer implements ServerChangeListener {
|
||||
public void run() {
|
||||
|
||||
try {
|
||||
if (servers == null || servers.isEmpty()) {
|
||||
if (getServers() == null || getServers().isEmpty()) {
|
||||
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
|
||||
return;
|
||||
}
|
||||
@ -148,7 +144,7 @@ public class DataSyncer implements ServerChangeListener {
|
||||
Server server = new Server();
|
||||
server.setIp(syncTask.getTargetServer().split(":")[0]);
|
||||
server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
|
||||
if (!servers.contains(server)) {
|
||||
if (!getServers().contains(server)) {
|
||||
// if server is no longer in healthy server list, ignore this task:
|
||||
return;
|
||||
}
|
||||
@ -167,6 +163,11 @@ public class DataSyncer implements ServerChangeListener {
|
||||
public void run() {
|
||||
|
||||
try {
|
||||
|
||||
if (Loggers.EPHEMERAL.isDebugEnabled()) {
|
||||
Loggers.EPHEMERAL.debug("server list is: {}", getServers());
|
||||
}
|
||||
|
||||
// send local timestamps to other servers:
|
||||
Map<String, String> keyChecksums = new HashMap<>(64);
|
||||
for (String key : dataStore.keys()) {
|
||||
@ -185,7 +186,7 @@ public class DataSyncer implements ServerChangeListener {
|
||||
Loggers.EPHEMERAL.debug("sync checksums: {}", keyChecksums);
|
||||
}
|
||||
|
||||
for (Server member : servers) {
|
||||
for (Server member : getServers()) {
|
||||
if (NetUtils.localServer().equals(member.getKey())) {
|
||||
continue;
|
||||
}
|
||||
@ -198,20 +199,10 @@ public class DataSyncer implements ServerChangeListener {
|
||||
}
|
||||
|
||||
public List<Server> getServers() {
|
||||
return servers;
|
||||
return serverListManager.getHealthyServers();
|
||||
}
|
||||
|
||||
public String buildKey(String key, String targetServer) {
|
||||
return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onChangeServerList(List<Server> latestMembers) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onChangeHealthyServerList(List<Server> healthServers) {
|
||||
servers = healthServers;
|
||||
}
|
||||
}
|
||||
|
@ -77,8 +77,8 @@ public class ClientBeatCheckTask implements Runnable {
|
||||
if (!instance.isMarked()) {
|
||||
if (instance.isHealthy()) {
|
||||
instance.setHealthy(false);
|
||||
Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
|
||||
instance.getIp(), instance.getPort(), instance.getClusterName(),
|
||||
Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
|
||||
instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
|
||||
UtilsAndCommons.LOCALHOST_SITE, ClientBeatProcessor.CLIENT_BEAT_TIMEOUT, instance.getLastBeat());
|
||||
getPushService().serviceChanged(service.getNamespaceId(), service.getName());
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ public class ServerStatusSynchronizer implements Synchronizer {
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: " + serverIP, e);
|
||||
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@ package com.alibaba.nacos.naming.web;
|
||||
|
||||
import com.alibaba.nacos.api.common.Constants;
|
||||
import com.alibaba.nacos.api.naming.CommonParams;
|
||||
import com.alibaba.nacos.naming.boot.SpringContext;
|
||||
import com.alibaba.nacos.naming.core.DistroMapper;
|
||||
import com.alibaba.nacos.naming.misc.HttpClient;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
@ -97,6 +98,16 @@ public class DistroFilter implements Filter {
|
||||
// proxy request to other server if necessary:
|
||||
if (method.isAnnotationPresent(CanDistro.class) && !distroMapper.responsible(groupedServiceName)) {
|
||||
|
||||
String userAgent = req.getHeader("User-Agent");
|
||||
|
||||
if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {
|
||||
// This request is sent from peer server, should not be redirected again:
|
||||
Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr());
|
||||
resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
|
||||
"receive invalid redirect request from peer " + req.getRemoteAddr());
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> headerList = new ArrayList<>(16);
|
||||
Enumeration<String> headers = req.getHeaderNames();
|
||||
while (headers.hasMoreElements()) {
|
||||
|
Loading…
Reference in New Issue
Block a user