Merge pull request #2934 from alibaba/hotfix_naming_compatibility

[#2933] fixed compatibility issues with older versions
This commit is contained in:
yanlinly 2020-06-02 10:55:01 +08:00 committed by GitHub
commit dc9d1368fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 456 additions and 140 deletions

View File

@ -65,6 +65,7 @@ public abstract class BaseHttpClient {
try {
final String body = EntityUtils.toString(response.getEntity());
RestResult<T> data = ResponseHandler.convert(body, type);
data.setCode(response.getStatusLine().getStatusCode());
callback.onReceive(data);
}
catch (Throwable e) {

View File

@ -21,7 +21,6 @@ import com.alibaba.nacos.common.utils.StringUtils;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;

View File

@ -17,12 +17,14 @@
package com.alibaba.nacos.common.utils;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import com.alibaba.nacos.api.common.Constants;
import org.apache.commons.lang3.StringUtils;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@SuppressWarnings("all")
public final class ByteUtils {
public static final byte[] EMPTY = new byte[0];
@ -31,7 +33,7 @@ public final class ByteUtils {
if (s == null) {
return EMPTY;
}
return s.getBytes(Charset.forName(StandardCharsets.UTF_8.name()));
return s.getBytes(Charset.forName(Constants.ENCODE));
}
public static byte[] toBytes(Object s) {
@ -45,7 +47,7 @@ public final class ByteUtils {
if (bytes == null) {
return StringUtils.EMPTY;
}
return new String(bytes, Charset.forName(StandardCharsets.UTF_8.name()));
return new String(bytes, Charset.forName(Constants.ENCODE));
}
public static boolean isEmpty(byte[] data) {

View File

@ -21,7 +21,6 @@ package com.alibaba.nacos.common.utils;
import java.util.Collection;
import java.util.Dictionary;
import java.util.Map;
import java.util.Objects;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>

View File

@ -15,13 +15,12 @@
*/
package com.alibaba.nacos.common.utils;
import org.apache.commons.lang3.CharSequenceUtils;
import com.alibaba.nacos.api.common.Constants;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Locale;
@ -41,7 +40,7 @@ public class StringUtils {
public static final String EMPTY = "";
public static String newString4UTF8(byte[] bytes) {
return new String(bytes, Charset.forName(StandardCharsets.UTF_8.name()));
return new String(bytes, Charset.forName(Constants.ENCODE));
}
public static boolean isBlank(String str) {

View File

@ -18,7 +18,6 @@ package com.alibaba.nacos.common.utils;
import org.slf4j.Logger;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

View File

@ -28,7 +28,7 @@ public class VersionUtils {
/**
* 获取当前version
*/
public static final String VERSION_DEFAULT = "${project.version}";
public static final String VERSION_PLACEHOLDER = "${project.version}";
static {
@ -39,7 +39,7 @@ public class VersionUtils {
Properties props = new Properties();
props.load(in);
String val = props.getProperty("version");
if (val != null && !VERSION_DEFAULT.equals(val)) {
if (val != null && !VERSION_PLACEHOLDER.equals(val)) {
VERSION = val;
}
} catch (Exception e) {

View File

@ -132,7 +132,7 @@ nacos.istio.mcp.server.enabled=false
### MemberLookup
### Addressing pattern category, If set, the priority is highest
# nacos.core.member.lookup.type=[file,address-server,discovery]
# nacos.core.member.lookup.type=[file,address-server]
## Set the cluster list with a configuration file or command-line argument
# nacos.member.list=192.168.16.101:8847?raft_port=8807,192.168.16.101?raft_port=8808,192.168.16.101:8849?raft_port=8809
## for AddressServerMemberLookup

View File

@ -25,19 +25,25 @@ public class MemberMetaDataConstants {
/**
* Raft portThis parameter is dropped when GRPC is used as a whole
*/
public static final String RAFT_PORT = "raft_port";
public static final String RAFT_PORT = "raftPort";
public static final String SITE_KEY = "site";
public static final String AD_WEIGHT = "adweight";
public static final String AD_WEIGHT = "adWeight";
public static final String WEIGHT = "weight";
public static final String LAST_REFRESH_TIME = "lastRefreshTime";
public static final String VERSION = "version";
public static final String[] META_KEY_LIST = new String[]{
RAFT_PORT,
SITE_KEY,
AD_WEIGHT,
WEIGHT,
LAST_REFRESH_TIME,
VERSION,
};
}

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.core.cluster;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Loggers;
import java.util.concurrent.ThreadLocalRandom;
@ -74,6 +75,7 @@ public class MemberUtils {
Map<String, Object> extendInfo = new HashMap<>(4);
// The Raft Port information needs to be set by default
extendInfo.put(MemberMetaDataConstants.RAFT_PORT, String.valueOf(calculateRaftPort(target)));
extendInfo.put(MemberMetaDataConstants.VERSION, VersionUtils.VERSION);
target.setExtendInfo(extendInfo);
return target;
}
@ -134,9 +136,7 @@ public class MemberUtils {
@SuppressWarnings("PMD.UndefineMagicConstantRule")
public static Collection<Member> kRandom(Collection<Member> members,
Predicate<Member> filter) {
int k = ApplicationUtils
.getProperty("nacos.core.member.report.random-num", Integer.class, 3);
Predicate<Member> filter, int k) {
Set<Member> kMembers = new HashSet<>();

View File

@ -27,12 +27,14 @@ import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.core.cluster.lookup.LookupFactory;
import com.alibaba.nacos.core.notify.Event;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.notify.listener.Subscribe;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Commons;
import com.alibaba.nacos.core.utils.Constants;
import com.alibaba.nacos.core.utils.GenericType;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.InetUtils;
@ -40,6 +42,7 @@ import com.alibaba.nacos.core.utils.Loggers;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@ -123,14 +126,15 @@ public class ServerMemberManager
*/
private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask();
public ServerMemberManager(ServletContext servletContext) {
public ServerMemberManager(ServletContext servletContext) throws Exception {
this.serverList = new ConcurrentSkipListMap();
ApplicationUtils.setContextPath(servletContext.getContextPath());
MemberUtils.setManager(this);
init();
}
@PostConstruct
public void init() throws NacosException {
protected void init() throws NacosException {
Loggers.CORE.info("Nacos-related cluster resource initialization");
this.port = ApplicationUtils.getProperty("server.port", Integer.class, 8848);
this.localAddress = InetUtils.getSelfIp() + ":" + port;
@ -198,11 +202,7 @@ public class ServerMemberManager
Loggers.CLUSTER.debug("Node information update : {}", newMember);
String address = newMember.getAddress();
if (Objects.equals(newMember, self)) {
serverList.put(newMember.getAddress(), newMember);
return true;
}
newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());
if (!serverList.containsKey(address)) {
return false;
@ -238,6 +238,10 @@ public class ServerMemberManager
return this.self;
}
public Member find(String address) {
return serverList.get(address);
}
public Collection<Member> allMembers() {
// We need to do a copy to avoid affecting the real data
HashSet<Member> set = new HashSet<>(serverList.values());
@ -408,10 +412,15 @@ public class ServerMemberManager
"/cluster/report");
try {
asyncHttpClient.post(url, Header.EMPTY, Query.EMPTY, getSelf(),
asyncHttpClient.post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER,
VersionUtils.VERSION), Query.EMPTY, getSelf(),
reference.getType(), new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value() || result.getCode() == HttpStatus.NOT_FOUND.value()) {
Loggers.CLUSTER.warn("{} version is too low, it is recommended to upgrade the version : {}", target, VersionUtils.VERSION);
return;
}
if (result.ok()) {
MemberUtils.onSuccess(target);
}

View File

@ -204,7 +204,7 @@ public class JRaftProtocol
private void injectProtocolMetaData(ProtocolMetaData metaData) {
Member member = memberManager.getSelf();
member.setExtendVal("raft_meta_data", metaData);
member.setExtendVal("raftMetaData", metaData);
memberManager.update(member);
}

View File

@ -67,11 +67,11 @@ public class NotifyCenter {
static {
// Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
// this value needs to be increased appropriately. default value is 16384
String ringBufferSizeProperty = "com.alibaba.nacos.core.notify.ringBufferSize";
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
RING_BUFFER_SIZE = Integer.getInteger(ringBufferSizeProperty, 16384);
// The size of the public publisher's message staging queue buffer
String shareBufferSizeProperty = "com.alibaba.nacos.core.notify.shareBufferSize";
String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
SHATE_BUFFER_SIZE = Integer.getInteger(shareBufferSizeProperty, 1024);
ServiceLoader<EventPublisher> loader = ServiceLoader.load(EventPublisher.class);

View File

@ -1,2 +0,0 @@
/Users/liaochuntao/.sdkman/candidates/java/current/bin/java -DembeddedStorage=true -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/Volumes/resources/github/nacos/distribution/logs/java_heapdump.hprof -XX:-UseLargePages -Dnacos.member.list=127.0.0.1:8080 -Djava.ext.dirs=/Users/liaochuntao/.sdkman/candidates/java/current/jre/lib/ext:/Users/liaochuntao/.sdkman/candidates/java/current/lib/ext -Xloggc:/Volumes/resources/github/nacos/distribution/logs/nacos_gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dloader.path=/Volumes/resources/github/nacos/distribution/plugins/health,/Volumes/resources/github/nacos/distribution/plugins/cmdb,/Volumes/resources/github/nacos/distribution/plugins/mysql -Dnacos.home=/Volumes/resources/github/nacos/distribution -jar /Volumes/resources/github/nacos/distribution/target/nacos-server.jar --spring.config.location=classpath:/,classpath:/config/,file:./,file:./config/,file:/Volumes/resources/github/nacos/distribution/conf/ --logging.config=/Volumes/resources/github/nacos/distribution/conf/nacos-logback.xml --server.max-http-header-size=524288
Error: Unable to access jarfile /Volumes/resources/github/nacos/distribution/target/nacos-server.jar

View File

@ -0,0 +1,231 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeEvent;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.Message;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.ServerStatusSynchronizer;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.Synchronizer;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
/**
* The manager to globally refresh and operate server list.
*
* @author nkorange
* @since 1.0.0
* @deprecated 1.3.0 This object will be deleted sometime after version 1.3.0
*/
@Component("serverListManager")
public class ServerListManager implements MemberChangeListener {
private final static String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE;
private final SwitchDomain switchDomain;
private final ServerMemberManager memberManager;
private final Synchronizer synchronizer = new ServerStatusSynchronizer();
private volatile List<Member> servers;
public ServerListManager(final SwitchDomain switchDomain,
final ServerMemberManager memberManager) {
this.switchDomain = switchDomain;
this.memberManager = memberManager;
NotifyCenter.registerSubscribe(this);
this.servers = new ArrayList<>(memberManager.allMembers());
}
@PostConstruct
public void init() {
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());
}
public boolean contains(String s) {
for (Member server : getServers()) {
if (Objects.equals(s, server.getAddress())) {
return true;
}
}
return false;
}
public List<Member> getServers() {
return servers;
}
@Override
public void onEvent(MemberChangeEvent event) {
this.servers = new ArrayList<>(event.getMembers());
}
/**
* Compatible with older version logic, In version 1.2.1 and before
*
* @param configInfo site:ip:lastReportTime:weight
*/
public synchronized void onReceiveServerStatus(String configInfo) {
Loggers.SRV_LOG.info("receive config info: {}", configInfo);
String[] configs = configInfo.split("\r\n");
if (configs.length == 0) {
return;
}
for (String config : configs) {
// site:ip:lastReportTime:weight
String[] params = config.split("#");
if (params.length <= 3) {
Loggers.SRV_LOG.warn("received malformed distro map data: {}", config);
continue;
}
Member server = Optional.ofNullable(memberManager.find(params[1])).orElse(Member.builder()
.ip(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[0])
.state(NodeState.UP)
.port(Integer.parseInt(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[1]))
.build());
server.setExtendVal(MemberMetaDataConstants.SITE_KEY, params[0]);
server.setExtendVal(MemberMetaDataConstants.WEIGHT, params.length == 4 ? Integer.parseInt(params[3]) : 1);
memberManager.update(server);
if (!contains(server.getAddress())) {
throw new IllegalArgumentException("server: " + server.getAddress() + " is not in serverlist");
}
}
}
private class ServerInfoUpdater implements Runnable {
private int cursor = 0;
@Override
public void run() {
List<Member> members = servers;
if (members.isEmpty()) {
return;
}
this.cursor = (this.cursor + 1) % members.size();
Member target = members.get(cursor);
if (Objects.equals(target.getAddress(), ApplicationUtils.getLocalAddress())) {
return;
}
// This metadata information exists from 1.3.0 onwards "version"
if (target.getExtendVal(MemberMetaDataConstants.VERSION) != null) {
return;
}
final String path = UtilsAndCommons.NACOS_NAMING_OPERATOR_CONTEXT + UtilsAndCommons.NACOS_NAMING_CLUSTER_CONTEXT + "/state";
final Map<String, String> params = Maps.newHashMapWithExpectedSize(2);
final String server = target.getAddress();
try {
String content = NamingProxy.reqCommon(path, params, server, false);
if (!StringUtils.EMPTY.equals(content)) {
RaftPeer raftPeer = JacksonUtils.toObj(content, RaftPeer.class);
if (null != raftPeer) {
String json = JacksonUtils.toJson(raftPeer);
Map map = JacksonUtils.toObj(json, HashMap.class);
target.setExtendVal("naming", map);
memberManager.update(target);
}
}
} catch (Exception ignore) {
//
}
}
}
private class ServerStatusReporter implements Runnable {
@Override
public void run() {
try {
if (ApplicationUtils.getPort() <= 0) {
return;
}
int weight = Runtime.getRuntime().availableProcessors() / 2;
if (weight <= 0) {
weight = 1;
}
long curTime = System.currentTimeMillis();
String status = LOCALHOST_SITE + "#" + ApplicationUtils.getLocalAddress() + "#" + curTime + "#" + weight + "\r\n";
List<Member> allServers = getServers();
if (!contains(ApplicationUtils.getLocalAddress())) {
Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}", ApplicationUtils.getLocalAddress(), allServers);
return;
}
if (allServers.size() > 0 && !ApplicationUtils.getLocalAddress().contains(UtilsAndCommons.LOCAL_HOST_IP)) {
for (Member server : allServers) {
if (Objects.equals(server.getAddress(), ApplicationUtils.getLocalAddress())) {
continue;
}
// This metadata information exists from 1.3.0 onwards "version"
if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) {
return;
}
Message msg = new Message();
msg.setData(status);
synchronizer.send(server.getAddress(), msg);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);
} finally {
GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
}
}
}
}

View File

@ -19,7 +19,6 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.pojo.Record;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;
@ -33,11 +32,15 @@ import org.springframework.stereotype.Service;
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {
@Autowired
private PersistentConsistencyService persistentConsistencyService;
private final PersistentConsistencyService persistentConsistencyService;
private final EphemeralConsistencyService ephemeralConsistencyService;
@Autowired
private EphemeralConsistencyService ephemeralConsistencyService;
public DelegateConsistencyServiceImpl(
PersistentConsistencyService persistentConsistencyService,
EphemeralConsistencyService ephemeralConsistencyService) {
this.persistentConsistencyService = persistentConsistencyService;
this.ephemeralConsistencyService = ephemeralConsistencyService;
}
@Override
public void put(String key, Record value) throws NacosException {

View File

@ -45,22 +45,23 @@ import java.util.concurrent.ConcurrentHashMap;
@DependsOn("ProtocolManager")
public class DataSyncer {
@Autowired
private DataStore dataStore;
private final DataStore dataStore;
private final GlobalConfig partitionConfig;
private final Serializer serializer;
private final DistroMapper distroMapper;
private final ServerMemberManager memberManager;
@Autowired
private GlobalConfig partitionConfig;
private Map<String, String> taskMap = new ConcurrentHashMap<>(16);
@Autowired
private Serializer serializer;
@Autowired
private DistroMapper distroMapper;
@Autowired
private ServerMemberManager memberManager;
private Map<String, String> taskMap = new ConcurrentHashMap<>();
public DataSyncer(DataStore dataStore, GlobalConfig partitionConfig,
Serializer serializer, DistroMapper distroMapper,
ServerMemberManager memberManager) {
this.dataStore = dataStore;
this.partitionConfig = partitionConfig;
this.serializer = serializer;
this.distroMapper = distroMapper;
this.memberManager = memberManager;
}
@PostConstruct
public void init() {

View File

@ -17,6 +17,7 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.utils.Objects;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.ApplicationUtils;
@ -34,12 +35,10 @@ import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Record;
import org.apache.commons.lang3.StringUtils;
import org.javatuples.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import javax.annotation.PostConstruct;
@ -49,7 +48,7 @@ import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* A consistency protocol algorithm called <b>Distro</b>
@ -69,26 +68,19 @@ import java.util.concurrent.CopyOnWriteArrayList;
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
@Autowired
private DistroMapper distroMapper;
private final DistroMapper distroMapper;
@Autowired
private DataStore dataStore;
private final DataStore dataStore;
@Autowired
private TaskDispatcher taskDispatcher;
private final TaskDispatcher taskDispatcher;
@Autowired
private Serializer serializer;
private final Serializer serializer;
@Autowired
private ServerMemberManager memberManager;
private final ServerMemberManager memberManager;
@Autowired
private SwitchDomain switchDomain;
private final SwitchDomain switchDomain;
@Autowired
private GlobalConfig globalConfig;
private final GlobalConfig globalConfig;
private boolean initialized = false;
@ -96,10 +88,23 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
private LoadDataTask loadDataTask = new LoadDataTask();
private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);
public DistroConsistencyServiceImpl(DistroMapper distroMapper, DataStore dataStore,
TaskDispatcher taskDispatcher, Serializer serializer,
ServerMemberManager memberManager, SwitchDomain switchDomain,
GlobalConfig globalConfig) {
this.distroMapper = distroMapper;
this.dataStore = dataStore;
this.taskDispatcher = taskDispatcher;
this.serializer = serializer;
this.memberManager = memberManager;
this.switchDomain = switchDomain;
this.globalConfig = globalConfig;
}
@PostConstruct
public void init() {
GlobalExecutor.submit(loadDataTask);
@ -115,6 +120,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
if (!initialized) {
GlobalExecutor
.submit(this, globalConfig.getLoadDataRetryDelayMillis());
} else {
Loggers.DISTRO.info("load data success");
}
}
catch (Exception e) {
@ -136,7 +143,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
for (Map.Entry<String, Member> entry : memberManager.getServerList().entrySet()) {
final String address = entry.getValue().getAddress();
if (NetUtils.localServer().equals(address)) {
if (ApplicationUtils.getLocalAddress().equals(address)) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
@ -237,10 +244,8 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}",
Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}",
toRemoveKeys, toUpdateKeys, server);
}
for (String key : toRemoveKeys) {
onRemove(key);
@ -269,8 +274,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
try {
byte[] data = NamingProxy.getAllData(server);
processData(data);
return true;
return processData(data);
}
catch (Exception e) {
Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
@ -278,7 +282,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
}
}
public void processData(byte[] data) throws Exception {
public boolean processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String, Datum<Instances>> datumMap = serializer
.deserializeMap(data, Instances.class);
@ -300,7 +304,13 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0).onChange(
// The Listener corresponding to the key value must not be empty
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
if (Objects.isNull(listener)) {
return false;
}
listener.onChange(
KeyBuilder.buildServiceMetaKey(namespaceId, serviceName),
service);
}
@ -331,12 +341,13 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
dataStore.put(entry.getKey(), entry.getValue());
}
}
return true;
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
listeners.put(key, new CopyOnWriteArrayList<>());
listeners.put(key, new ConcurrentLinkedQueue<>());
}
if (listeners.get(key).contains(listener)) {

View File

@ -23,9 +23,9 @@ import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.ServerStatusManager;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
@ -36,7 +36,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
@ -53,29 +52,30 @@ import java.util.List;
@RequestMapping({UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator", UtilsAndCommons.NACOS_NAMING_CONTEXT + "/ops"})
public class OperatorController {
@Autowired
private PushService pushService;
private final PushService pushService;
private final SwitchManager switchManager;
private final ServerListManager serverListManager;
private final ServiceManager serviceManager;
private final ServerMemberManager memberManager;
private final ServerStatusManager serverStatusManager;
private final SwitchDomain switchDomain;
private final DistroMapper distroMapper;
private final RaftCore raftCore;
@Autowired
private SwitchManager switchManager;
@Autowired
private ServiceManager serviceManager;
@Autowired
private ServerMemberManager memberManager;
@Autowired
private ServerStatusManager serverStatusManager;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private DistroMapper distroMapper;
@Autowired
private RaftCore raftCore;
public OperatorController(PushService pushService, SwitchManager switchManager,
ServerListManager serverListManager, ServiceManager serviceManager, ServerMemberManager memberManager,
ServerStatusManager serverStatusManager, SwitchDomain switchDomain,
DistroMapper distroMapper, RaftCore raftCore) {
this.pushService = pushService;
this.switchManager = switchManager;
this.serverListManager = serverListManager;
this.serviceManager = serviceManager;
this.memberManager = memberManager;
this.serverStatusManager = serverStatusManager;
this.switchDomain = switchDomain;
this.distroMapper = distroMapper;
this.raftCore = raftCore;
}
@RequestMapping("/push/state")
public ObjectNode pushState(@RequestParam(required = false) boolean detail, @RequestParam(required = false) boolean reset) {
@ -200,12 +200,33 @@ public class OperatorController {
return result;
}
/**
* This interface will be removed in a future release
*
* @deprecated 1.3.0 This function will be deleted sometime after version 1.3.0
* @param serverStatus server status
* @return "ok"
*/
@Deprecated
@RequestMapping("/server/status")
public String serverStatus(@RequestParam String serverStatus) {
serverListManager.onReceiveServerStatus(serverStatus);
return "ok";
}
@PutMapping("/log")
public String setLogLevel(@RequestParam String logName, @RequestParam String logLevel) {
Loggers.setLogLevel(logName, logLevel);
return "ok";
}
/**
* This interface will be removed in a future release
*
* @deprecated 1.3.0 This function will be deleted sometime after version 1.3.0
* @return {@link JsonNode}
*/
@Deprecated
@RequestMapping(value = "/cluster/state", method = RequestMethod.GET)
public JsonNode getClusterStates() {
return JacksonUtils.transferToJsonNode(serviceManager.getMySelfClusterState());

View File

@ -22,7 +22,6 @@ import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
@ -31,7 +30,6 @@ import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* @author nkorange
@ -85,8 +83,8 @@ public class DistroMapper implements MemberChangeListener {
return false;
}
int index = servers.indexOf(NetUtils.localServer());
int lastIndex = servers.lastIndexOf(NetUtils.localServer());
int index = servers.indexOf(ApplicationUtils.getLocalAddress());
int lastIndex = servers.lastIndexOf(ApplicationUtils.getLocalAddress());
if (lastIndex < 0 || index < 0) {
return true;
}
@ -99,24 +97,30 @@ public class DistroMapper implements MemberChangeListener {
final List<String> servers = healthyList;
if (CollectionUtils.isEmpty(servers) || !switchDomain.isDistroEnabled()) {
return NetUtils.localServer();
return ApplicationUtils.getLocalAddress();
}
try {
return servers.get(distroHash(serviceName) % servers.size());
int index = distroHash(serviceName) % servers.size();
return servers.get(index);
} catch (Throwable e) {
Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e);
return NetUtils.localServer();
Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + ApplicationUtils.getLocalAddress(), e);
return ApplicationUtils.getLocalAddress();
}
}
public int distroHash(String serviceName) {
return Math.abs(Objects.hash(serviceName) % Integer.MAX_VALUE);
return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
}
@Override
public void onEvent(MemberChangeEvent event) {
healthyList = Collections.unmodifiableList(MemberUtils.simpleMembers(event.getMembers()));
// Here, the node list must be sorted to ensure that all nacos-server's
// node list is in the same order
List<String> list = MemberUtils.simpleMembers(event.getMembers());
Collections.sort(list);
healthyList = Collections.unmodifiableList(list);
System.out.println(healthyList);
}
@Override

View File

@ -58,7 +58,6 @@ import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@ -85,34 +84,39 @@ public class ServiceManager implements RecordListener<Service> {
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
@Autowired
private SwitchDomain switchDomain;
private final SwitchDomain switchDomain;
@Autowired
private DistroMapper distroMapper;
private final DistroMapper distroMapper;
@Autowired
private ServerMemberManager memberManager;
private final ServerMemberManager memberManager;
@Autowired
private PushService pushService;
private final PushService pushService;
@Autowired
private RaftPeerSet raftPeerSet;
@Value("${nacos.naming.empty-service.auto-clean:false}")
private boolean emptyServiceAutoClean;
private final RaftPeerSet raftPeerSet;
private int maxFinalizeCount = 3;
private final Object putServiceLock = new Object();
@Value("${nacos.naming.empty-service.auto-clean:false}")
private boolean emptyServiceAutoClean;
@Value("${nacos.naming.empty-service.clean.initial-delay-ms:60000}")
private int cleanEmptyServiceDelay;
@Value("${nacos.naming.empty-service.clean.period-time-ms:20000}")
private int cleanEmptyServicePeriod;
public ServiceManager(SwitchDomain switchDomain, DistroMapper distroMapper,
ServerMemberManager memberManager, PushService pushService,
RaftPeerSet raftPeerSet) {
this.switchDomain = switchDomain;
this.distroMapper = distroMapper;
this.memberManager = memberManager;
this.pushService = pushService;
this.raftPeerSet = raftPeerSet;
}
@PostConstruct
public void init() {

View File

@ -30,8 +30,6 @@ public class GlobalExecutor {
public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L);
private static final long NACOS_SERVER_LIST_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(5);
private static final long PARTITION_DATA_TIMED_SYNC_INTERVAL = TimeUnit.SECONDS.toMillis(5);
private static final long SERVER_STATUS_UPDATE_PERIOD = TimeUnit.SECONDS.toMillis(5);
@ -150,8 +148,8 @@ public class GlobalExecutor {
executorService.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}
public static void registerServerListUpdater(Runnable runnable) {
executorService.scheduleAtFixedRate(runnable, 0, NACOS_SERVER_LIST_REFRESH_INTERVAL, TimeUnit.MILLISECONDS);
public static void registerServerInfoUpdater(Runnable runnable) {
executorService.scheduleAtFixedRate(runnable, 0, 2, TimeUnit.SECONDS);
}
public static void registerServerStatusReporter(Runnable runnable, long delay) {

View File

@ -27,6 +27,7 @@ import java.util.Map;
/**
* Report local server status to other server
*
* @deprecated 1.3.0 This object will be deleted sometime after version 1.3.0
* @author nacos
*/
public class ServerStatusSynchronizer implements Synchronizer {
@ -55,7 +56,6 @@ public class ServerStatusSynchronizer implements Synchronizer {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}",
serverIP);
return 1;
}
return 0;

View File

@ -187,6 +187,23 @@
</encoder>
</appender>
<appender name="naming-distro"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/naming-distro.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/naming-distro.log.%d{yyyy-MM-dd}.%i</fileNamePattern>
<maxFileSize>1GB</maxFileSize>
<MaxHistory>7</MaxHistory>
<totalSizeCap>3GB</totalSizeCap>
<cleanHistoryOnStart>true</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<Pattern>%date %level %msg%n%n</Pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="com.alibaba.nacos.naming.main" additivity="false">
<level value="INFO"/>
<appender-ref ref="naming-server"/>
@ -195,6 +212,10 @@
<level value="DEBUG"/>
<appender-ref ref="naming-raft"/>
</logger>
<logger name="com.alibaba.nacos.naming.distro" additivity="false">
<level value="INFO"/>
<appender-ref ref="naming-distro"/>
</logger>
<logger name="com.alibaba.nacos.naming.event" additivity="false">
<level value="INFO"/>
<appender-ref ref="naming-event"/>

View File

@ -55,19 +55,13 @@ public class MemberLookup_ITCase extends BaseTest {
static final String name = "cluster.conf";
static final ServerMemberManager memberManager = new ServerMemberManager(
new MockServletContext());
ServerMemberManager memberManager;
@Before
public void before() throws Exception {
System.setProperty("nacos.home", path);
ApplicationUtils.injectEnvironment(new StandardEnvironment());
ApplicationUtils.setIsStandalone(false);
try {
memberManager.init();
}
catch (Throwable ignore) {
}
System.out.println(ApplicationUtils.getStandaloneMode());
System.out.println(Arrays.toString(LookupFactory.LookupType.values()));
@ -78,6 +72,14 @@ public class MemberLookup_ITCase extends BaseTest {
String ip = InetUtils.getSelfIp();
DiskUtils.writeFile(file, (ip + ":8848," + ip + ":8847," + ip + ":8849").getBytes(
StandardCharsets.UTF_8), false);
try {
memberManager = new ServerMemberManager(
new MockServletContext());
}
catch (Exception e) {
e.printStackTrace();
}
}
@After

View File

@ -48,13 +48,21 @@ import java.util.concurrent.atomic.AtomicInteger;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ServerMemberManager_ITCase {
private ServerMemberManager memberManager = new ServerMemberManager(new MockServletContext());
private ServerMemberManager memberManager;
{
try {
memberManager = new ServerMemberManager(new MockServletContext());
}
catch (Exception e) {
e.printStackTrace();
}
}
@Before
public void init() throws Exception {
ApplicationUtils.setIsStandalone(true);
ApplicationUtils.injectEnvironment(new StandardEnvironment());
memberManager.init();
}
@After

View File

@ -47,7 +47,7 @@ public class NotifyCenter_ITCase {
}
static {
System.setProperty("com.alibaba.nacos.core.notify.shareBufferSize", "8");
System.setProperty("nacos.core.notify.share-buffer-size", "8");
}
@Test