fix: fix naming request redirect

This commit is contained in:
chuntaojun 2020-05-03 20:26:35 +08:00
parent a82bcc1d6e
commit 2526c5d8f2
33 changed files with 1064 additions and 846 deletions

1
.gitignore vendored
View File

@ -8,6 +8,7 @@ target
.DS_Store
.factorypath
/logs
/lib
*.iml
node_modules
test/derby.log

View File

@ -187,8 +187,7 @@ public class Instance {
}
Instance host = (Instance) obj;
return strEquals(toString(), host.toString());
return strEquals(host.toString(), toString());
}
@Override

View File

@ -23,9 +23,6 @@ import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* Use the same HttpClient object in the same space
*
@ -41,131 +38,35 @@ public class HttpClientManager {
private static final RequestConfig DEFAULT_CONFIG = RequestConfig.custom()
.setConnectTimeout(TIMEOUT).setSocketTimeout(TIMEOUT << 1).build();
private static final Object SYNC_MONITOR = new Object();
private static final Object ASYNC_MONITOR = new Object();
private static final Map<String, NSyncHttpClient> HTTP_SYNC_CLIENT_MAP = new HashMap<String, NSyncHttpClient>(
8);
private static final Map<String, NAsyncHttpClient> HTTP_ASYNC_CLIENT_MAP = new HashMap<String, NAsyncHttpClient>(
8);
private static final NSyncHttpClient SHARE_SYNC_HTTP_CLIENT = new NacosSyncHttpClient(
private static final NSyncHttpClient SYNC_HTTP_CLIENT = new NacosSyncHttpClient(
HttpClients.custom().setDefaultRequestConfig(DEFAULT_CONFIG).build());
private static final NAsyncHttpClient SHARE_ASYNC_HTTP_CLIENT = new NacosAsyncHttpClient(
private static final NAsyncHttpClient ASYNC_HTTP_CLIENT = new NacosAsyncHttpClient(
HttpAsyncClients.custom().setDefaultRequestConfig(DEFAULT_CONFIG).build());
static {
ShutdownUtils.addShutdownHook(new Runnable() {
@Override
public void run() {
logger.warn("[NSyncHttpClient] Start destroying HttpClient");
logger.warn("[HttpClientManager] Start destroying HttpClient");
try {
for (Map.Entry<String, NSyncHttpClient> entry : HTTP_SYNC_CLIENT_MAP
.entrySet()) {
entry.getValue().close();
}
SHARE_SYNC_HTTP_CLIENT.close();
HTTP_SYNC_CLIENT_MAP.clear();
SYNC_HTTP_CLIENT.close();
ASYNC_HTTP_CLIENT.close();
}
catch (Exception ignore) {
}
logger.warn("[NSyncHttpClient] Destruction of the end");
}
});
ShutdownUtils.addShutdownHook(new Runnable() {
@Override
public void run() {
logger.warn("[NAsyncHttpClient] Start destroying HttpClient");
try {
for (Map.Entry<String, NAsyncHttpClient> entry : HTTP_ASYNC_CLIENT_MAP
.entrySet()) {
entry.getValue().close();
}
SHARE_ASYNC_HTTP_CLIENT.close();
HTTP_ASYNC_CLIENT_MAP.clear();
}
catch (Exception ignore) {
}
logger.warn("[NAsyncHttpClient] Destruction of the end");
logger.warn("[HttpClientManager] Destruction of the end");
}
});
}
public static NSyncHttpClient getShareSyncHttpClient() {
return SHARE_SYNC_HTTP_CLIENT;
public static NSyncHttpClient getSyncHttpClient() {
return SYNC_HTTP_CLIENT;
}
public static NAsyncHttpClient getShareAsyncHttpClient() {
return SHARE_ASYNC_HTTP_CLIENT;
}
public static NSyncHttpClient newSyncHttpClient(String namespace) {
synchronized (SYNC_MONITOR) {
NSyncHttpClient nSyncHttpClient = HTTP_SYNC_CLIENT_MAP.get(namespace);
if (nSyncHttpClient != null) {
return nSyncHttpClient;
}
nSyncHttpClient = new NacosSyncHttpClient(
HttpClients.custom().setDefaultRequestConfig(DEFAULT_CONFIG).build());
HTTP_SYNC_CLIENT_MAP.put(namespace, nSyncHttpClient);
return nSyncHttpClient;
}
}
public static NAsyncHttpClient newAsyncHttpClient(String namespace) {
synchronized (ASYNC_MONITOR) {
NAsyncHttpClient nAsyncHttpClient = HTTP_ASYNC_CLIENT_MAP.get(namespace);
if (nAsyncHttpClient != null) {
return nAsyncHttpClient;
}
nAsyncHttpClient = new NacosAsyncHttpClient(
HttpAsyncClients.custom().setDefaultRequestConfig(DEFAULT_CONFIG)
.build());
HTTP_ASYNC_CLIENT_MAP.put(namespace, nAsyncHttpClient);
return nAsyncHttpClient;
}
}
public static NSyncHttpClient newSyncHttpClient(String namespace,
RequestConfig requestConfig) {
synchronized (SYNC_MONITOR) {
NSyncHttpClient nSyncHttpClient = HTTP_SYNC_CLIENT_MAP.get(namespace);
if (nSyncHttpClient != null) {
return nSyncHttpClient;
}
nSyncHttpClient = new NacosSyncHttpClient(
HttpClients.custom().setDefaultRequestConfig(requestConfig).build());
HTTP_SYNC_CLIENT_MAP.put(namespace, nSyncHttpClient);
return nSyncHttpClient;
}
}
public static NAsyncHttpClient newAsyncHttpClient(String namespace,
RequestConfig requestConfig) {
synchronized (ASYNC_MONITOR) {
NAsyncHttpClient nAsyncHttpClient = HTTP_ASYNC_CLIENT_MAP.get(namespace);
if (nAsyncHttpClient != null) {
return nAsyncHttpClient;
}
nAsyncHttpClient = new NacosAsyncHttpClient(
HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig)
.build());
HTTP_ASYNC_CLIENT_MAP.put(namespace, nAsyncHttpClient);
return nAsyncHttpClient;
}
public static NAsyncHttpClient getAsyncHttpClient() {
return ASYNC_HTTP_CLIENT;
}
}

View File

@ -41,6 +41,12 @@ public class RestResultUtils {
.build();
}
public static <T> RestResult<T> failed() {
return RestResult.<T>builder()
.withCode(500)
.build();
}
public static <T> RestResult<T> failed(String errMsg) {
return RestResult.<T>builder()
.withCode(500)
@ -48,12 +54,6 @@ public class RestResultUtils {
.build();
}
public static <T> RestResult<T> failed() {
return RestResult.<T>builder()
.withCode(500)
.build();
}
public static <T> RestResult<T> failed(int code, T data) {
return RestResult.<T>builder()
.withCode(code)
@ -69,6 +69,13 @@ public class RestResultUtils {
.build();
}
public static <T> RestResult<T> failedWithMsg(int code, String errMsg) {
return RestResult.<T>builder()
.withCode(code)
.withMsg(errMsg)
.build();
}
public static <T> RestResult<T> failedWithData(T data) {
return RestResult.<T>builder()
.withCode(500)

View File

@ -16,10 +16,14 @@
package com.alibaba.nacos.config.server.filter;
import com.alibaba.nacos.config.server.constant.Constants;
import org.springframework.core.annotation.Order;
import javax.servlet.*;
import javax.servlet.annotation.WebFilter;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import java.io.IOException;
import static com.alibaba.nacos.config.server.utils.LogUtil.defaultLog;

View File

@ -102,6 +102,8 @@ public class TransferToLeaderFilter implements Filter {
ReuseHttpRequest req = null;
HttpServletResponse resp = (HttpServletResponse) response;
String urlString = ((HttpServletRequest) request).getRequestURI();
if (StringUtils.containsIgnoreCase(request.getContentType(),
MediaType.MULTIPART_FORM_DATA)) {
req = new ReuseUploadFileHttpServletRequest((HttpServletRequest) request);
@ -110,12 +112,9 @@ public class TransferToLeaderFilter implements Filter {
req = new ReuseHttpServletRequest((HttpServletRequest) request);
}
String urlString = req.getRequestURI();
if (StringUtils.isNotBlank(req.getQueryString())) {
urlString += "?" + req.getQueryString();
}
try {
String path = new URI(req.getRequestURI()).getPath();
Method method = controllerMethodsCache.getMethod(req.getMethod(), path);

View File

@ -226,13 +226,7 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
// if use raft+derby, Reduce leader read pressure
public static boolean isDirectRead() {
boolean isDirectRead = ApplicationUtils.getStandaloneMode() && isEmbeddedStorage();
if (isDirectRead) {
LogUtil.defaultLog.info("Read config from storage");
} else {
LogUtil.defaultLog.info("Read config from disk file cache");
}
return isDirectRead;
return ApplicationUtils.getStandaloneMode() && isEmbeddedStorage();
}
public static void setEmbeddedStorage(boolean embeddedStorage) {
@ -262,6 +256,8 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
setDefaultMaxAggrSize(getInt("defaultMaxAggrSize", defaultMaxAggrSize));
setCorrectUsageDelay(getInt("correctUsageDelay", correctUsageDelay));
setInitialExpansionPercent(getInt("initialExpansionPercent", initialExpansionPercent));
// External data sources are used by default in cluster mode
setUseExternalDB("mysql".equalsIgnoreCase(getString("spring.datasource.platform", "")));
// must initialize after setUseExternalDB
@ -274,12 +270,13 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
} else {
boolean embeddedStorage = PropertyUtil.embeddedStorage || Boolean.getBoolean("embeddedStorage");
setEmbeddedStorage(embeddedStorage);
}
if (!isEmbeddedStorage() && !isUseExternalDB()) {
throw new IllegalArgumentException("please choose one storage type");
// If the embedded data source storage is not turned on, it is automatically
// upgraded to the external data source storage, as before
if (!embeddedStorage) {
setUseExternalDB(true);
}
}
} catch (Exception e) {
logger.error("read application.properties failed", e);
throw e;

View File

@ -14,7 +14,7 @@ server.port=8848
#*************** Config Module Related Configurations ***************#
### If user MySQL as datasource:
#spring.datasource.platform=mysql
# spring.datasource.platform=mysql
### Count of DB:
# db.num=1

View File

@ -27,7 +27,7 @@ import java.util.TreeMap;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class Member implements Cloneable {
public class Member implements Comparable<Member>, Cloneable {
private String ip;
@ -144,6 +144,11 @@ public class Member implements Cloneable {
return Objects.hash(ip, port);
}
@Override
public int compareTo(Member o) {
return getAddress().compareTo(o.getAddress());
}
public static final class MemberBuilder {
private String ip;
private int port;

View File

@ -38,7 +38,7 @@ public class MemberChangeEvent implements Event {
private static final long serialVersionUID = 7308126651076668976L;
private Collection<Member> allMembers;
private Collection<Member> members;
private long no = SEQUENCE.getAndIncrement();
@ -46,12 +46,12 @@ public class MemberChangeEvent implements Event {
return new MemberChangeEventBuilder();
}
public Collection<Member> getAllMembers() {
return allMembers;
public Collection<Member> getMembers() {
return members;
}
public void setAllMembers(Collection<Member> allMembers) {
this.allMembers = allMembers;
public void setMembers(Collection<Member> members) {
this.members = members;
}
@Override
@ -65,14 +65,14 @@ public class MemberChangeEvent implements Event {
private MemberChangeEventBuilder() {
}
public MemberChangeEventBuilder allNodes(Collection<Member> allMembers) {
public MemberChangeEventBuilder members(Collection<Member> allMembers) {
this.allMembers = allMembers;
return this;
}
public MemberChangeEvent build() {
MemberChangeEvent memberChangeEvent = new MemberChangeEvent();
memberChangeEvent.setAllMembers(allMembers);
memberChangeEvent.setMembers(allMembers);
return memberChangeEvent;
}
}

View File

@ -80,13 +80,12 @@ import java.util.function.BiFunction;
public class ServerMemberManager
implements ApplicationListener<WebServerInitializedEvent> {
private final NAsyncHttpClient asyncHttpClient = HttpClientManager
.newAsyncHttpClient(ServerMemberManager.class.getCanonicalName());
private final NAsyncHttpClient asyncHttpClient = HttpClientManager.getAsyncHttpClient();
/**
* Cluster node list
*/
private volatile ConcurrentSkipListMap<String, Member> serverList = new ConcurrentSkipListMap<>();
private volatile ConcurrentSkipListMap<String, Member> serverList;
/**
* Is this node in the cluster list
@ -108,6 +107,9 @@ public class ServerMemberManager
*/
private MemberLookup lookup;
/**
* self member obj
*/
private Member self;
/**
@ -121,6 +123,7 @@ public class ServerMemberManager
private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask();
public ServerMemberManager(ServletContext servletContext) {
this.serverList = new ConcurrentSkipListMap();
ApplicationUtils.setContextPath(servletContext.getContextPath());
MemberUtils.setManager(this);
}
@ -226,7 +229,9 @@ public class ServerMemberManager
public Collection<Member> allMembers() {
// We need to do a copy to avoid affecting the real data
return new ArrayList<>(serverList.values());
List<Member> list = new ArrayList<>(serverList.values());
Collections.sort(list);
return list;
}
public List<Member> allMembersWithoutSelf() {
@ -255,7 +260,7 @@ public class ServerMemberManager
}
boolean hasChange = false;
ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap();
Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
for (Member member : members) {
final String address = member.getAddress();
@ -284,7 +289,7 @@ public class ServerMemberManager
if (hasChange) {
MemberUtils.syncToFile(members);
Loggers.CLUSTER.warn("member has changed : {}", members);
Event event = MemberChangeEvent.builder().allNodes(members).build();
Event event = MemberChangeEvent.builder().members(allMembers()).build();
NotifyCenter.publishEvent(event);
}
@ -292,15 +297,13 @@ public class ServerMemberManager
}
public synchronized boolean memberJoin(Collection<Member> members) {
Set<Member> set = new HashSet<>();
set.addAll(members);
Set<Member> set = new HashSet<>(members);
set.addAll(allMembers());
return memberChange(set);
}
public synchronized boolean memberLeave(Collection<Member> members) {
Set<Member> set = new HashSet<>();
set.addAll(allMembers());
Set<Member> set = new HashSet<>(allMembers());
set.removeAll(members);
return memberChange(set);
}
@ -385,7 +388,7 @@ public class ServerMemberManager
this.cursor = (this.cursor + 1) % members.size();
Member target = members.get(cursor);
Loggers.CLUSTER.debug("report the metadata to the node : {}", target);
Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());
final String url = HttpUtils.buildUrl(false, target.getAddress(),
ApplicationUtils.getContextPath(), Commons.NACOS_CORE_CONTEXT,
@ -402,7 +405,7 @@ public class ServerMemberManager
else {
Loggers.CLUSTER
.warn("failed to report new info to target node : {}, result : {}",
target, result);
target.getAddress(), result);
MemberUtils.onFail(target);
}
}
@ -411,7 +414,7 @@ public class ServerMemberManager
public void onError(Throwable throwable) {
Loggers.CLUSTER
.error("failed to report new info to target node : {}, error : {}",
target, throwable);
target.getAddress(), throwable);
MemberUtils.onFail(target);
}
});
@ -419,7 +422,7 @@ public class ServerMemberManager
catch (Throwable ex) {
Loggers.CLUSTER
.error("failed to report new info to target node : {}, error : {}",
target, ex);
target.getAddress(), ex);
}
}

View File

@ -54,7 +54,7 @@ public class AddressServerMemberLookup extends AbstractMemberLookup {
private volatile boolean isAddressServerHealth = true;
private int addressServerFailCount = 0;
private int maxFailCount = 12;
private NSyncHttpClient syncHttpClient = HttpClientManager.getShareSyncHttpClient();
private NSyncHttpClient syncHttpClient = HttpClientManager.getSyncHttpClient();
private volatile boolean shutdown = false;
@Override

View File

@ -23,13 +23,11 @@ import com.alibaba.nacos.common.http.NAsyncHttpClient;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.TimerContext;
import com.alibaba.nacos.core.utils.TimerContext;
import com.alibaba.nacos.core.cluster.AbstractMemberLookup;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.cluster.Task;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Commons;
@ -40,7 +38,6 @@ import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@ -83,8 +80,7 @@ import java.util.Set;
*/
public class DiscoveryMemberLookup extends AbstractMemberLookup {
NAsyncHttpClient asyncHttpClient = HttpClientManager
.newAsyncHttpClient(ServerMemberManager.class.getCanonicalName());
NAsyncHttpClient asyncHttpClient = HttpClientManager.getAsyncHttpClient();
MemberListSyncTask syncTask;
@ -186,7 +182,7 @@ public class DiscoveryMemberLookup extends AbstractMemberLookup {
Loggers.CLUSTER.error("node state report task has error : {}", e);
}
finally {
TimerContext.end(Loggers.CLUSTER);
TimerContext.end();
}
}

View File

@ -28,7 +28,6 @@ import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.cluster.lookup.LookupFactory;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Commons;
import com.alibaba.nacos.core.utils.GenericType;
@ -36,12 +35,10 @@ import com.alibaba.nacos.core.utils.Loggers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@ -49,8 +46,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@ -58,8 +53,11 @@ import javax.annotation.PostConstruct;
@RequestMapping(Commons.NACOS_CORE_CONTEXT + "/cluster")
public class NacosClusterController {
@Autowired
private ServerMemberManager memberManager;
private final ServerMemberManager memberManager;
public NacosClusterController(ServerMemberManager memberManager) {
this.memberManager = memberManager;
}
@GetMapping(value = "/self")
public RestResult<Member> self() {
@ -67,7 +65,7 @@ public class NacosClusterController {
}
@GetMapping(value = "/nodes")
public RestResult<Collection<Member>> listAllNode(@RequestParam(value = "keyword", required = false) String ipKeyWord) {
public RestResult<Collection<Member>> listNodes(@RequestParam(value = "keyword", required = false) String ipKeyWord) {
Collection<Member> members = memberManager.allMembers();
Collection<Member> result = new ArrayList<>();
@ -90,7 +88,7 @@ public class NacosClusterController {
@GetMapping(value = "/simple/nodes")
public RestResult<Collection<String>> listSimpleNodes() {
return RestResultUtils.success(MemberUtils.simpleMembers(memberManager.allMembers()));
return RestResultUtils.success(memberManager.getMemberAddressInfos());
}
@GetMapping("/health")
@ -101,7 +99,7 @@ public class NacosClusterController {
@PostMapping(value = {"/report"})
public RestResult<String> report(@RequestBody Member node) {
if (!node.check()) {
return RestResultUtils.failedWithData("Node information is illegal");
return RestResultUtils.failedWithMsg(400, "Node information is illegal");
}
Loggers.CLUSTER.debug("node state report, receive info : {}", node);
node.setState(NodeState.UP);
@ -125,8 +123,7 @@ public class NacosClusterController {
public RestResult<String> leave(@RequestBody Collection<String> params) throws Exception {
Collection<Member> memberList = MemberUtils.multiParse(params);
memberManager.memberLeave(memberList);
final NAsyncHttpClient asyncHttpClient = HttpClientManager
.newAsyncHttpClient(ServerMemberManager.class.getCanonicalName());
final NAsyncHttpClient asyncHttpClient = HttpClientManager.getAsyncHttpClient();
final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {};
final Collection<Member> notifyList = memberManager.allMembersWithoutSelf();
notifyList.removeAll(memberList);

View File

@ -171,7 +171,7 @@ public class ProtocolManager
// time T2 after a period of time.
// (T1 < T2)
Set<Member> copy = new HashSet<>(event.getAllMembers());
Set<Member> copy = new HashSet<>(event.getMembers());
// Node change events between different protocols should not block each other
if (Objects.nonNull(apProtocol)) {

View File

@ -280,33 +280,32 @@ public class NotifyCenter {
public static class Publisher extends Thread {
private final Class<? extends Event> eventType;
private final CopyOnWriteArraySet<Subscribe> subscribes = new CopyOnWriteArraySet<>();
private int queueMaxSize = -1;
private volatile boolean initialized = false;
private volatile boolean canOpen = false;
private volatile boolean shutdown = false;
private final Class<? extends Event> eventType;
private final CopyOnWriteArraySet<Subscribe> subscribes = new CopyOnWriteArraySet<>();
private int queueMaxSize = -1;
private BlockingQueue<Event> queue;
private Supplier<? extends Event> supplier;
private long lastEventSequence = -1L;
// judge the subscribe can deal Event
private BiPredicate<Event, Subscribe> filter = new BiPredicate<Event, Subscribe>() {
@Override
public boolean test(Event event, Subscribe subscribe) {
return true;
}
};
private BiPredicate<Event, Subscribe> filter;
Publisher(final Class<? extends Event> eventType) {
this(eventType, RING_BUFFER_SIZE);
}
Publisher(final Class<? extends Event> eventType, final int queueMaxSize) {
this.eventType = eventType;
this.queueMaxSize = queueMaxSize;
this.queue = new ArrayBlockingQueue<>(queueMaxSize);
this(eventType, new BiPredicate<Event, Subscribe>() {
@Override
public boolean test(Event event, Subscribe subscribe) {
return true;
}
}, queueMaxSize);
}
Publisher(final Class<? extends Event> eventType,
@ -399,9 +398,6 @@ public class NotifyCenter {
if (!initialized) {
throw new IllegalStateException("Publisher does not start");
}
if (shutdown) {
throw new IllegalStateException("Publisher already shutdown");
}
}
void shutdown() {

View File

@ -33,4 +33,6 @@ public class Loggers {
public static final Logger RAFT = LoggerFactory.getLogger("com.alibaba.nacos.core.protocol.raft");
public static final Logger CLUSTER = LoggerFactory.getLogger("com.alibaba.nacos.core.cluster");
public static final Logger JOB_TIMER = LoggerFactory.getLogger("com.alibaba.nacos.core.job.timer");
}

View File

@ -14,9 +14,11 @@
* limitations under the License.
*/
package com.alibaba.nacos.common.utils;
package com.alibaba.nacos.core.utils;
import org.slf4j.Logger;
import com.alibaba.nacos.common.utils.Pair;
import java.util.concurrent.Callable;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
@ -30,11 +32,29 @@ public class TimerContext {
TIME_RECORD.set(Pair.with(name, startTime));
}
public static void end(Logger logger) {
public static void end() {
long endTime = System.currentTimeMillis();
Pair<String, Long> record = TIME_RECORD.get();
logger.debug("{} cost time : {} ms", record.getFirst(), (endTime - record.getSecond()));
Loggers.JOB_TIMER.info("{} cost time : {} ms", record.getFirst(), (endTime - record.getSecond()));
TIME_RECORD.remove();
}
public static void run(Runnable job, String name) {
start(name);
try {
job.run();
} finally {
end();
}
}
public static <V> V run(Callable<V> job, String name) throws Exception {
start(name);
try {
return job.call();
} finally {
end();
}
}
}

View File

@ -137,6 +137,23 @@
</encoder>
</appender>
<appender name="nacos-job-timer"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/nacos-job-timer.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/nacos-job-timer.log.%d{yyyy-MM-dd}.%i</fileNamePattern>
<maxFileSize>2GB</maxFileSize>
<MaxHistory>7</MaxHistory>
<totalSizeCap>7GB</totalSizeCap>
<cleanHistoryOnStart>true</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<Pattern>%date %level %msg%n%n</Pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root>
<level value="INFO"/>
<appender-ref ref="rootFile"/>
@ -172,6 +189,11 @@
<appender-ref ref="nacos-cluster"/>
</logger>
<logger name="com.alibaba.nacos.core.job.timer" additivity="false">
<level value="INFO"/>
<appender-ref ref="nacos-job-timer"/>
</logger>
<springProfile name="standalone">
<logger name="org.springframework">

View File

@ -99,7 +99,7 @@ nacos.security.ignore.urls=/,/error,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/
nacos.core.auth.system.type=nacos
### If turn on auth system:
nacos.core.auth.enabled=true
nacos.core.auth.enabled=false
### The token expiration in seconds:
nacos.core.auth.default.token.expire.seconds=18000

View File

@ -478,6 +478,23 @@
</encoder>
</appender>
<appender name="nacos-job-timer"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/nacos-job-timer.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/nacos-job-timer.log.%d{yyyy-MM-dd}.%i</fileNamePattern>
<maxFileSize>2GB</maxFileSize>
<MaxHistory>7</MaxHistory>
<totalSizeCap>7GB</totalSizeCap>
<cleanHistoryOnStart>true</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<Pattern>%date %level %msg%n%n</Pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="com.alibaba.nacos.address.main" additivity="false">
<level value="INFO"/>
@ -579,6 +596,11 @@
<appender-ref ref="alipay-jraft"/>
</logger>
<logger name="com.alibaba.nacos.core.job.timer" additivity="false">
<level value="INFO"/>
<appender-ref ref="nacos-job-timer"/>
</logger>
<logger name="com.alibaba.nacos.core.protocol.distro" additivity="false">
<level value="INFO"/>
<appender-ref ref="protocol-distro"/>

View File

@ -30,7 +30,12 @@ import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyServic
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.*;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Record;
import org.apache.commons.lang3.StringUtils;
import org.javatuples.Pair;
@ -41,10 +46,11 @@ import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* A consistency protocol algorithm called <b>Distro</b>
@ -64,366 +70,391 @@ import java.util.concurrent.LinkedBlockingQueue;
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
@Autowired
private DistroMapper distroMapper;
@Autowired
private DataStore dataStore;
@Autowired
private TaskDispatcher taskDispatcher;
@Autowired
private Serializer serializer;
@Autowired
private ServerMemberManager memberManager;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private GlobalConfig globalConfig;
private boolean initialized = false;
private volatile Notifier notifier = new Notifier();
private LoadDataTask loadDataTask = new LoadDataTask();
private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);
@PostConstruct
public void init() {
GlobalExecutor.submit(loadDataTask);
GlobalExecutor.submitDistroNotifyTask(notifier);
}
private class LoadDataTask implements Runnable {
@Override
public void run() {
try {
load();
if (!initialized) {
GlobalExecutor.submit(this, globalConfig.getLoadDataRetryDelayMillis());
}
} catch (Exception e) {
Loggers.DISTRO.error("load data failed.", e);
}
}
}
public void load() throws Exception {
if (ApplicationUtils.getStandaloneMode()) {
initialized = true;
return;
}
// size = 1 means only myself in the list, we need at least one another server alive:
while (memberManager.getServerList().size() <= 1) {
Thread.sleep(1000L);
Loggers.DISTRO.info("waiting server list init...");
}
for (Map.Entry<String, Member> entry : memberManager.getServerList().entrySet()) {
final String address = entry.getValue().getAddress();
if (NetUtils.localServer().equals(address)) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync from " + address);
}
// try sync data from remote server:
if (syncAllDataFromRemote(address)) {
initialized = true;
return;
}
}
}
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
taskDispatcher.addTask(key);
}
@Override
public void remove(String key) throws NacosException {
onRemove(key);
listeners.remove(key);
}
@Override
public Datum get(String key) throws NacosException {
return dataStore.get(key);
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, ApplyAction.CHANGE);
}
public void onRemove(String key) {
dataStore.remove(key);
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, ApplyAction.DELETE);
}
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
if (syncChecksumTasks.containsKey(server)) {
// Already in process of this server:
Loggers.DISTRO.warn("sync checksum task already in process with {}", server);
return;
}
syncChecksumTasks.put(server, "1");
try {
List<String> toUpdateKeys = new ArrayList<>();
List<String> toRemoveKeys = new ArrayList<>();
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
// this key should not be sent from remote server:
Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
// abort the procedure:
return;
}
if (!dataStore.contains(entry.getKey()) ||
dataStore.get(entry.getKey()).value == null ||
!dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
toUpdateKeys.add(entry.getKey());
}
}
for (String key : dataStore.keys()) {
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue;
}
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);
}
for (String key : toRemoveKeys) {
onRemove(key);
}
if (toUpdateKeys.isEmpty()) {
return;
}
try {
byte[] result = NamingProxy.getData(toUpdateKeys, server);
processData(result);
} catch (Exception e) {
Loggers.DISTRO.error("get data from " + server + " failed!", e);
}
} finally {
// Remove this 'in process' flag:
syncChecksumTasks.remove(server);
}
}
public boolean syncAllDataFromRemote(String server) {
try {
byte[] data = NamingProxy.getAllData(server);
processData(data);
return true;
} catch (Exception e) {
Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
return false;
}
}
public void processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String, Datum<Instances>> datumMap =
serializer.deserializeMap(data, Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}
// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
listeners.put(key, new CopyOnWriteArrayList<>());
}
if (listeners.get(key).contains(listener)) {
return;
}
listeners.get(key).add(listener);
}
@Override
public void unlisten(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
return;
}
for (RecordListener recordListener : listeners.get(key)) {
if (recordListener.equals(listener)) {
listeners.get(key).remove(listener);
break;
}
}
}
@Override
public boolean isAvailable() {
return isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus());
}
public boolean isInitialized() {
return initialized || !globalConfig.isDataWarmup();
}
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
public void addTask(String datumKey, ApplyAction action) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.add(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
while (true) {
try {
Pair pair = tasks.take();
if (pair == null) {
continue;
}
String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
continue;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
@Autowired
private DistroMapper distroMapper;
@Autowired
private DataStore dataStore;
@Autowired
private TaskDispatcher taskDispatcher;
@Autowired
private Serializer serializer;
@Autowired
private ServerMemberManager memberManager;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private GlobalConfig globalConfig;
private boolean initialized = false;
private volatile Notifier notifier = new Notifier();
private LoadDataTask loadDataTask = new LoadDataTask();
private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);
@PostConstruct
public void init() {
GlobalExecutor.submit(loadDataTask);
GlobalExecutor.submitDistroNotifyTask(notifier);
}
private class LoadDataTask implements Runnable {
@Override
public void run() {
try {
load();
if (!initialized) {
GlobalExecutor
.submit(this, globalConfig.getLoadDataRetryDelayMillis());
}
}
catch (Exception e) {
Loggers.DISTRO.error("load data failed.", e);
}
}
}
public void load() throws Exception {
if (ApplicationUtils.getStandaloneMode()) {
initialized = true;
return;
}
// size = 1 means only myself in the list, we need at least one another server alive:
while (memberManager.getServerList().size() <= 1) {
Thread.sleep(1000L);
Loggers.DISTRO.info("waiting server list init...");
}
for (Map.Entry<String, Member> entry : memberManager.getServerList().entrySet()) {
final String address = entry.getValue().getAddress();
if (NetUtils.localServer().equals(address)) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync from " + address);
}
// try sync data from remote server:
if (syncAllDataFromRemote(address)) {
initialized = true;
return;
}
}
}
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
taskDispatcher.addTask(key);
}
@Override
public void remove(String key) throws NacosException {
onRemove(key);
listeners.remove(key);
}
@Override
public Datum get(String key) throws NacosException {
return dataStore.get(key);
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, ApplyAction.CHANGE);
}
public void onRemove(String key) {
dataStore.remove(key);
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, ApplyAction.DELETE);
}
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
if (syncChecksumTasks.containsKey(server)) {
// Already in process of this server:
Loggers.DISTRO.warn("sync checksum task already in process with {}", server);
return;
}
syncChecksumTasks.put(server, "1");
try {
List<String> toUpdateKeys = new ArrayList<>();
List<String> toRemoveKeys = new ArrayList<>();
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
// this key should not be sent from remote server:
Loggers.DISTRO.error("receive responsible key timestamp of " + entry
.getKey() + " from " + server);
// abort the procedure:
return;
}
if (!dataStore.contains(entry.getKey())
|| dataStore.get(entry.getKey()).value == null || !dataStore
.get(entry.getKey()).value.getChecksum()
.equals(entry.getValue())) {
toUpdateKeys.add(entry.getKey());
}
}
for (String key : dataStore.keys()) {
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue;
}
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}",
toRemoveKeys, toUpdateKeys, server);
}
for (String key : toRemoveKeys) {
onRemove(key);
}
if (toUpdateKeys.isEmpty()) {
return;
}
try {
byte[] result = NamingProxy.getData(toUpdateKeys, server);
processData(result);
}
catch (Exception e) {
Loggers.DISTRO.error("get data from " + server + " failed!", e);
}
}
finally {
// Remove this 'in process' flag:
syncChecksumTasks.remove(server);
}
}
public boolean syncAllDataFromRemote(String server) {
try {
byte[] data = NamingProxy.getAllData(server);
processData(data);
return true;
}
catch (Exception e) {
Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
return false;
}
}
public void processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String, Datum<Instances>> datumMap = serializer
.deserializeMap(data, Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0).onChange(
KeyBuilder.buildServiceMetaKey(namespaceId, serviceName),
service);
}
}
}
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
}
catch (Exception e) {
Loggers.DISTRO
.error("[NACOS-DISTRO] error while execute listener of key: {}",
entry.getKey(), e);
continue;
}
// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
listeners.put(key, new CopyOnWriteArrayList<>());
}
if (listeners.get(key).contains(listener)) {
return;
}
listeners.get(key).add(listener);
}
@Override
public void unlisten(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
return;
}
for (RecordListener recordListener : listeners.get(key)) {
if (recordListener.equals(listener)) {
listeners.get(key).remove(listener);
break;
}
}
}
@Override
public boolean isAvailable() {
return isInitialized() || ServerStatus.UP.name()
.equals(switchDomain.getOverriddenServerStatus());
}
public boolean isInitialized() {
return initialized || !globalConfig.isDataWarmup();
}
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(
10 * 1024);
private BlockingQueue<Pair<String, ApplyAction>> tasks = new ArrayBlockingQueue<>(
1024 * 1024);
public void addTask(String datumKey, ApplyAction action) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for ( ; ; ) {
try {
Pair<String, ApplyAction> pair = tasks.take();
handle(pair);
}
catch (Throwable e) {
Loggers.DISTRO
.error("[NACOS-DISTRO] Error while handling notifying task",
e);
}
}
}
private void handle(Pair<String, ApplyAction> pair) {
try {
String datumKey = pair.getValue0();
ApplyAction action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
}
catch (Throwable e) {
Loggers.DISTRO
.error("[NACOS-DISTRO] error while notifying listener of key: {}",
datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
}
catch (Throwable e) {
Loggers.DISTRO
.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}

View File

@ -30,7 +30,6 @@ import com.ning.http.client.Response;
import org.apache.commons.collections.SortedBag;
import org.apache.commons.collections.bag.TreeBag;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
@ -44,7 +43,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -54,20 +52,20 @@ import java.util.concurrent.atomic.AtomicLong;
@DependsOn("ProtocolManager")
public class RaftPeerSet implements MemberChangeListener {
@Autowired
private ServerMemberManager memberManager;
private final ServerMemberManager memberManager;
private AtomicLong localTerm = new AtomicLong(0L);
private RaftPeer leader = null;
private volatile Map<String, RaftPeer> peers = new ConcurrentHashMap<>();
private volatile Map<String, RaftPeer> peers = new HashMap<>(8);
private Set<String> sites = new HashSet<>();
private volatile boolean ready = false;
public RaftPeerSet() {
public RaftPeerSet(ServerMemberManager memberManager) {
this.memberManager = memberManager;
}
@PostConstruct
@ -254,15 +252,18 @@ public class RaftPeerSet implements MemberChangeListener {
@Override
public void onEvent(MemberChangeEvent event) {
changePeers(event.getAllMembers());
changePeers(event.getMembers());
}
private void changePeers(Collection<Member> members) {
Map<String, RaftPeer> tmpPeers = new HashMap<>(members.size());
for (Member member : members) {
final String address = member.getAddress();
if (peers.containsKey(address)) {
peers.put(address, peers.get(address));
tmpPeers.put(address, peers.get(address));
continue;
}
@ -274,10 +275,12 @@ public class RaftPeerSet implements MemberChangeListener {
raftPeer.term.set(localTerm.get());
}
peers.put(address, raftPeer);
tmpPeers.put(address, raftPeer);
}
// replace raft peer set:
peers = tmpPeers;
ready = true;
Loggers.RAFT.info("raft peers changed: " + members);
}

View File

@ -23,6 +23,7 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.api.naming.NamingResponseCode;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.core.utils.TimerContext;
import com.alibaba.nacos.core.auth.ActionTypes;
import com.alibaba.nacos.core.auth.Secured;
import com.alibaba.nacos.core.utils.WebUtils;
@ -95,10 +96,15 @@ public class InstanceController {
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
final Instance instance = parseInstance(request);
TimerContext.run(() -> {
serviceManager.registerInstance(namespaceId, serviceName, instance);
return true;
}, instance.generateInstanceId() + " register, ephemeral : " + instance.isEphemeral());
return "ok";
}
@ -117,7 +123,10 @@ public class InstanceController {
return "ok";
}
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
TimerContext.run(() -> {
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
return true;
}, instance.getInstanceId() + " deregister, ephemeral : " + instance.isEphemeral());
return "ok";
}
@ -126,19 +135,24 @@ public class InstanceController {
@PutMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String update(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final Instance instance = parseInstance(request);
String agent = WebUtils.getUserAgent(request);
ClientInfo clientInfo = new ClientInfo(agent);
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
serviceManager.updateInstance(namespaceId, serviceName, parseInstance(request));
} else {
serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
}
TimerContext.run(() -> {
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
serviceManager.updateInstance(namespaceId, serviceName, instance);
} else {
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
return true;
}, instance.generateInstanceId() + " update, ephemeral : " + instance.isEphemeral());
return "ok";
}
@ -182,8 +196,10 @@ public class InstanceController {
}
instance.setLastBeat(System.currentTimeMillis());
instance.validate();
serviceManager.updateInstance(namespaceId, serviceName, instance);
TimerContext.run(() -> {
serviceManager.updateInstance(namespaceId, serviceName, instance);
return true;
}, instance.generateInstanceId() + " patch, ephemeral : " + instance.isEphemeral());
return "ok";
}
@ -288,9 +304,7 @@ public class InstanceController {
port = clientBeat.getPort();
}
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
}
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
@ -299,17 +313,26 @@ public class InstanceController {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
TimerContext.start("[CLIENT-BEAT] : " + serviceName + "@@" + ip + ":" + port + ", ephemeral : " + clientBeat.isEphemeral());
try {
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, perform data compensation operations, beat: {}, serviceName: {}",
clientBeat, serviceName);
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
} finally {
TimerContext.end();
}
}
Service service = serviceManager.getService(namespaceId, serviceName);

View File

@ -15,9 +15,9 @@
*/
package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberChangeEvent;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.utils.ApplicationUtils;
@ -25,12 +25,13 @@ import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* @author nkorange
@ -38,24 +39,28 @@ import java.util.List;
@Component("distroMapper")
public class DistroMapper implements MemberChangeListener {
private List<String> healthyList = new ArrayList<>();
private volatile List<String> healthyList = new ArrayList<>();
private final SwitchDomain switchDomain;
private final ServerMemberManager memberManager;
public DistroMapper(ServerMemberManager memberManager, SwitchDomain switchDomain) {
this.memberManager = memberManager;
this.switchDomain = switchDomain;
}
public List<String> getHealthyList() {
return healthyList;
}
@Autowired
private SwitchDomain switchDomain;
@Autowired
private ServerMemberManager memberManager;
/**
* init server list
*/
@PostConstruct
public void init() {
NotifyCenter.registerSubscribe(this);
this.healthyList = MemberUtils.simpleMembers(memberManager.allMembers());
}
public boolean responsible(Cluster cluster, Instance instance) {
@ -66,50 +71,53 @@ public class DistroMapper implements MemberChangeListener {
}
public boolean responsible(String serviceName) {
final List<String> servers = healthyList;
if (!switchDomain.isDistroEnabled() || ApplicationUtils.getStandaloneMode()) {
return true;
}
if (CollectionUtils.isEmpty(healthyList)) {
if (CollectionUtils.isEmpty(servers)) {
// means distro config is not ready yet
return false;
}
int index = healthyList.indexOf(NetUtils.localServer());
int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
int index = servers.indexOf(NetUtils.localServer());
int lastIndex = servers.lastIndexOf(NetUtils.localServer());
if (lastIndex < 0 || index < 0) {
return true;
}
int target = distroHash(serviceName) % healthyList.size();
int target = distroHash(serviceName) % servers.size();
return target >= index && target <= lastIndex;
}
public String mapSrv(String serviceName) {
if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) {
final List<String> servers = healthyList;
if (CollectionUtils.isEmpty(servers) || !switchDomain.isDistroEnabled()) {
return NetUtils.localServer();
}
try {
return healthyList.get(distroHash(serviceName) % healthyList.size());
} catch (Exception e) {
return servers.get(distroHash(serviceName) % servers.size());
} catch (Throwable e) {
Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e);
return NetUtils.localServer();
}
}
public int distroHash(String serviceName) {
return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
return Math.abs(Objects.hash(serviceName) % Integer.MAX_VALUE);
}
@Override
public void onEvent(MemberChangeEvent event) {
List<String> newHealthyList = new ArrayList<>();
for (Member server : event.getAllMembers()) {
newHealthyList.add(server.getAddress());
}
healthyList = newHealthyList;
healthyList = Collections.unmodifiableList(MemberUtils.simpleMembers(event.getMembers()));
}
@Override
public boolean ignoreExpireEvent() {
return true;
}
}

View File

@ -25,19 +25,8 @@ import org.apache.commons.lang3.StringUtils;
*/
public class NetUtils {
private static String serverAddress = null;
public static String localServer() {
return getLocalAddress() + UtilsAndCommons.IP_PORT_SPLITER + ApplicationUtils.getPort();
}
public static String getLocalAddress() {
if (StringUtils.isNotBlank(serverAddress)) {
return serverAddress;
}
serverAddress = InetUtils.getSelfIp();
return serverAddress;
return InetUtils.getSelfIp() + UtilsAndCommons.IP_PORT_SPLITER + ApplicationUtils.getPort();
}
public static String num2ip(int ip) {

View File

@ -19,9 +19,12 @@ import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.common.constant.HttpHeaderConsts;
import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.core.code.ControllerMethodsCache;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.core.utils.OverrideParameterRequestWrapper;
import com.alibaba.nacos.core.utils.ReuseHttpRequest;
import com.alibaba.nacos.core.utils.ReuseHttpServletRequest;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.Loggers;
@ -29,6 +32,11 @@ import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.apache.commons.codec.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
@ -39,15 +47,18 @@ import java.net.URI;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* @author nacos
*/
public class DistroFilter implements Filter {
private static final int PROXY_CONNECT_TIMEOUT = 2000;
private static final int PROXY_READ_TIMEOUT = 2000;
private static final String SLASH= "/";
private static final String DOUBLE_SLASH = "://";
@Autowired
private DistroMapper distroMapper;
@ -55,6 +66,8 @@ public class DistroFilter implements Filter {
@Autowired
private ControllerMethodsCache controllerMethodsCache;
private final RestTemplate restTemplate = new RestTemplate();
@Override
public void init(FilterConfig filterConfig) throws ServletException {
@ -62,7 +75,7 @@ public class DistroFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) servletRequest;
ReuseHttpRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest);
HttpServletResponse resp = (HttpServletResponse) servletResponse;
String urlString = req.getRequestURI();
@ -112,25 +125,37 @@ public class DistroFilter implements Filter {
return;
}
List<String> headerList = new ArrayList<>(16);
Enumeration<String> headers = req.getHeaderNames();
while (headers.hasMoreElements()) {
String headerName = headers.nextElement();
headerList.add(headerName);
headerList.add(req.getHeader(headerName));
if (urlString.startsWith(SLASH)) {
urlString = urlString.substring(1);
}
String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name());
final String targetServer = distroMapper.mapSrv(groupedServiceName);
HttpClient.HttpResult result =
HttpClient.request("http://" + distroMapper.mapSrv(groupedServiceName) + req.getRequestURI(), headerList,
HttpClient.translateParameterMap(req.getParameterMap()),
body, PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod());
final String reqUrl =
req.getScheme() + DOUBLE_SLASH + targetServer + SLASH + urlString;
HttpHeaders headers = new HttpHeaders();
Enumeration<String> headerNames = req.getHeaderNames();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
headers.set(headerName, req.getHeader(headerName));
}
headers.set("Content-Type", "application/x-www-form-urlencoded;charset="
+ Charsets.UTF_8.name());
headers.set("Accept-Charset", Charsets.UTF_8.name());
headers.set(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.VERSION);
headers.set(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
try {
HttpEntity<Object> httpEntity = new HttpEntity<>(req.getBody(), headers);
ResponseEntity<String> result = restTemplate
.exchange(reqUrl, Objects.requireNonNull(
HttpMethod.resolve(req.getMethod()), "req.getMethod() is null"), httpEntity,
String.class);
resp.setCharacterEncoding("UTF-8");
resp.getWriter().write(result.content);
resp.setStatus(result.code);
resp.getWriter().write(Objects.requireNonNull(result.getBody(), "result.getBody() is null"));
resp.setStatus(result.getStatusCodeValue());
} catch (Exception ignore) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(groupedServiceName) + urlString);
}
@ -147,7 +172,7 @@ public class DistroFilter implements Filter {
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,
"no such api:" + req.getMethod() + ":" + req.getRequestURI());
return;
} catch (Exception e) {
} catch (Throwable e) {
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Server failed," + ExceptionUtil.getAllExceptionMsg(e));
return;

25
pom.xml
View File

@ -417,6 +417,31 @@
</plugins>
</build>
</profile>
<profile>
<id>cluster-test</id>
<build>
<plugins>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<argLine>@{failsafeArgLine}</argLine>
<includes>
<include>**/*DITCase.java</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>sonar-apache</id>
<properties>

View File

@ -16,12 +16,7 @@
package com.alibaba.nacos.test.config;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.AbstractListener;
import com.alibaba.nacos.common.http.HttpClientManager;
import com.alibaba.nacos.common.http.NSyncHttpClient;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
@ -29,46 +24,27 @@ import com.alibaba.nacos.config.server.model.event.RaftDBErrorEvent;
import com.alibaba.nacos.config.server.model.event.RaftDBErrorRecoverEvent;
import com.alibaba.nacos.config.server.service.repository.EmbeddedStoragePersistServiceImpl;
import com.alibaba.nacos.config.server.service.repository.PersistService;
import com.alibaba.nacos.config.server.service.repository.DistributedDatabaseOperateImpl;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.consistency.cp.Constants;
import com.alibaba.nacos.core.distributed.id.IdGeneratorManager;
import com.alibaba.nacos.core.distributed.raft.utils.JRaftConstants;
import com.alibaba.nacos.core.notify.Event;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.notify.listener.Subscribe;
import com.alibaba.nacos.common.utils.DiskUtils;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.GenericType;
import com.alibaba.nacos.core.utils.InetUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.test.base.HttpClient4Test;
import org.junit.AfterClass;
import com.alibaba.nacos.test.core.BaseClusterTest;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.web.context.support.StandardServletEnvironment;
import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -76,111 +52,9 @@ import java.util.concurrent.atomic.AtomicReference;
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@SuppressWarnings("all")
@Ignore
@FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
public class ConfigDerbyRaft_ITCase
extends HttpClient4Test {
private static final String CONFIG_INFO_ID = "config-info-id";
private static ConfigService iconfig7;
private static ConfigService iconfig8;
private static ConfigService iconfig9;
private static final NSyncHttpClient httpClient = HttpClientManager.getShareSyncHttpClient();
private static final AtomicBoolean[] finished = new AtomicBoolean[]{new AtomicBoolean(false), new AtomicBoolean(false), new AtomicBoolean(false)};
private static Map<String, ConfigurableApplicationContext> applications = new HashMap<>();
private static String clusterInfo;
static {
System.getProperties().setProperty("nacos.core.auth.enabled", "false");
System.getProperties().setProperty("embeddedStorage", "true");
String ip = InetUtils.getSelfIp();
clusterInfo = "nacos.member.list=" + ip + ":8847," + ip
+ ":8848," + ip + ":8849";
NotifyCenter.registerSubscribe(new Subscribe<RaftDBErrorEvent>() {
@Override
public void onEvent(RaftDBErrorEvent event) {
System.out.print(event.getEx());
}
@Override
public Class<? extends Event> subscribeType() {
return RaftDBErrorEvent.class;
}
});
}
@BeforeClass
public static void before() throws Exception {
CountDownLatch latch = new CountDownLatch(3);
Runnable runnable = () -> {
for (int i = 0; i < 3; i++) {
try {
URL runnerUrl = new File("../console/target/classes").toURI().toURL();
URL[] urls = new URL[] { runnerUrl };
URLClassLoader cl = new URLClassLoader(urls);
Class<?> runnerClass = cl.loadClass("com.alibaba.nacos.Nacos");
run(i, latch, runnerClass);
} catch (Exception e) {
latch.countDown();
}
}
};
new Thread(runnable).start();
latch.await();
System.out.println("The cluster node initialization is complete");
Properties setting7 = new Properties();
String serverIp7 = "127.0.0.1:8847";
setting7.put(PropertyKeyConst.SERVER_ADDR, serverIp7);
setting7.put(PropertyKeyConst.USERNAME, "nacos");
setting7.put(PropertyKeyConst.PASSWORD, "nacos");
iconfig7 = NacosFactory.createConfigService(setting7);
Properties setting8 = new Properties();
String serverIp8 = "127.0.0.1:8848";
setting8.put(PropertyKeyConst.SERVER_ADDR, serverIp8);
setting8.put(PropertyKeyConst.USERNAME, "nacos");
setting8.put(PropertyKeyConst.PASSWORD, "nacos");
iconfig8 = NacosFactory.createConfigService(setting8);
Properties setting9 = new Properties();
String serverIp9 = "127.0.0.1:8849";
setting9.put(PropertyKeyConst.SERVER_ADDR, serverIp9);
setting9.put(PropertyKeyConst.USERNAME, "nacos");
setting9.put(PropertyKeyConst.PASSWORD, "nacos");
iconfig9 = NacosFactory.createConfigService(setting9);
TimeUnit.SECONDS.sleep(20L);
}
@AfterClass
public static void after() throws Exception {
CountDownLatch latch = new CountDownLatch(applications.size());
for (ConfigurableApplicationContext context : applications.values()) {
new Thread(() -> {
try {
System.out.println("start close : " + context);
context.close();
} catch (Exception ignore) {
} finally {
System.out.println("finished close : " + context);
latch.countDown();
}
}).start();
}
latch.await();
}
public class ConfigDerbyRaft_DITCase
extends BaseClusterTest {
@Test
public void test_a_publish_config() throws Exception {
@ -535,69 +409,4 @@ public class ConfigDerbyRaft_ITCase
}
private static void run(final int index, final CountDownLatch latch, final Class<?> cls) {
Runnable runnable = () -> {
try {
ApplicationUtils.setIsStandalone(false);
final String path = Paths.get(System.getProperty("user.home"), "/nacos-" + index + "/").toString();
DiskUtils.deleteDirectory(path);
System.setProperty("nacos.home", path);
System.out.println("nacos.home is : [" + path + "]");
Map<String, Object> properties = new HashMap<>();
properties.put("server.port", "884" + (7 + index));
properties.put("nacos.home", path);
properties.put("nacos.logs.path",
Paths.get(System.getProperty("user.home"), "nacos-" + index, "/logs/").toString());
properties.put("spring.jmx.enabled", false);
properties.put("nacos.core.snowflake.worker-id", index + 1);
MapPropertySource propertySource = new MapPropertySource(
"nacos_cluster_test", properties);
ConfigurableEnvironment environment = new StandardServletEnvironment();
environment.getPropertySources().addFirst(propertySource);
SpringApplication cluster = new SpringApplicationBuilder(cls).web(
WebApplicationType.SERVLET).environment(environment)
.properties(clusterInfo).properties("embeddedStorage=true").build();
ConfigurableApplicationContext context = cluster.run();
context.stop();
DistributedDatabaseOperateImpl operate = context.getBean(DistributedDatabaseOperateImpl.class);
CPProtocol protocol = context.getBean(CPProtocol.class);
protocol.protocolMetaData()
.subscribe(operate.group(), Constants.LEADER_META_DATA,
(o, arg) -> {
System.out.println("node : 884" + (7 + index) + "-> select leader is : " + arg);
if (finished[index].compareAndSet(false, true)) {
latch.countDown();
}
});
new Thread(() -> {
try {
Thread.sleep(5000L);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (finished[index].compareAndSet(false, true)) {
latch.countDown();
}
}
});
applications.put(String.valueOf(properties.get("server.port")), context);
}
catch (Exception e) {
e.printStackTrace();
latch.countDown();
}
};
runnable.run();
}
}

View File

@ -0,0 +1,236 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.test.core;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.common.http.HttpClientManager;
import com.alibaba.nacos.common.http.NSyncHttpClient;
import com.alibaba.nacos.common.utils.DiskUtils;
import com.alibaba.nacos.config.server.model.event.RaftDBErrorEvent;
import com.alibaba.nacos.config.server.service.repository.DistributedDatabaseOperateImpl;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.consistency.cp.Constants;
import com.alibaba.nacos.core.notify.Event;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.notify.listener.Subscribe;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.InetUtils;
import com.alibaba.nacos.test.base.HttpClient4Test;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.web.context.support.StandardServletEnvironment;
import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class BaseClusterTest extends HttpClient4Test {
protected static final String CONFIG_INFO_ID = "config-info-id";
protected static ConfigService iconfig7;
protected static ConfigService iconfig8;
protected static ConfigService iconfig9;
protected static NamingService inaming7;
protected static NamingService inaming8;
protected static NamingService inaming9;
protected static final NSyncHttpClient httpClient = HttpClientManager.getSyncHttpClient();
protected static final AtomicBoolean[] finished = new AtomicBoolean[]{new AtomicBoolean(false), new AtomicBoolean(false), new AtomicBoolean(false)};
protected static Map<String, ConfigurableApplicationContext> applications = new HashMap<>();
protected static String clusterInfo;
static {
System.getProperties().setProperty("nacos.core.auth.enabled", "false");
System.getProperties().setProperty("embeddedStorage", "true");
String ip = InetUtils.getSelfIp();
clusterInfo = "nacos.member.list=" + ip + ":8847," + ip
+ ":8848," + ip + ":8849";
NotifyCenter.registerSubscribe(new Subscribe<RaftDBErrorEvent>() {
@Override
public void onEvent(RaftDBErrorEvent event) {
System.out.print(event.getEx());
}
@Override
public Class<? extends Event> subscribeType() {
return RaftDBErrorEvent.class;
}
});
}
@BeforeClass
public static void before() throws Exception {
CountDownLatch latch = new CountDownLatch(3);
Runnable runnable = () -> {
for (int i = 0; i < 3; i++) {
try {
URL runnerUrl = new File("../console/target/classes").toURI().toURL();
URL[] urls = new URL[] { runnerUrl };
URLClassLoader cl = new URLClassLoader(urls);
Class<?> runnerClass = cl.loadClass("com.alibaba.nacos.Nacos");
run(i, latch, runnerClass);
} catch (Exception e) {
latch.countDown();
}
}
};
new Thread(runnable).start();
latch.await();
System.out.println("The cluster node initialization is complete");
Properties setting7 = new Properties();
String serverIp7 = "127.0.0.1:8847";
setting7.put(PropertyKeyConst.SERVER_ADDR, serverIp7);
setting7.put(PropertyKeyConst.USERNAME, "nacos");
setting7.put(PropertyKeyConst.PASSWORD, "nacos");
iconfig7 = NacosFactory.createConfigService(setting7);
inaming7 = NacosFactory.createNamingService(setting7);
Properties setting8 = new Properties();
String serverIp8 = "127.0.0.1:8848";
setting8.put(PropertyKeyConst.SERVER_ADDR, serverIp8);
setting8.put(PropertyKeyConst.USERNAME, "nacos");
setting8.put(PropertyKeyConst.PASSWORD, "nacos");
iconfig8 = NacosFactory.createConfigService(setting8);
inaming8 = NacosFactory.createNamingService(setting7);
Properties setting9 = new Properties();
String serverIp9 = "127.0.0.1:8849";
setting9.put(PropertyKeyConst.SERVER_ADDR, serverIp9);
setting9.put(PropertyKeyConst.USERNAME, "nacos");
setting9.put(PropertyKeyConst.PASSWORD, "nacos");
iconfig9 = NacosFactory.createConfigService(setting9);
inaming9 = NacosFactory.createNamingService(setting7);
TimeUnit.SECONDS.sleep(20L);
}
@AfterClass
public static void after() throws Exception {
CountDownLatch latch = new CountDownLatch(applications.size());
for (ConfigurableApplicationContext context : applications.values()) {
new Thread(() -> {
try {
System.out.println("start close : " + context);
context.close();
} catch (Exception ignore) {
} finally {
System.out.println("finished close : " + context);
latch.countDown();
}
}).start();
}
latch.await();
}
private static void run(final int index, final CountDownLatch latch, final Class<?> cls) {
Runnable runnable = () -> {
try {
ApplicationUtils.setIsStandalone(false);
final String path = Paths
.get(System.getProperty("user.home"), "/nacos-" + index + "/").toString();
DiskUtils.deleteDirectory(path);
System.setProperty("nacos.home", path);
System.out.println("nacos.home is : [" + path + "]");
Map<String, Object> properties = new HashMap<>();
properties.put("server.port", "884" + (7 + index));
properties.put("nacos.home", path);
properties.put("nacos.logs.path",
Paths.get(System.getProperty("user.home"), "nacos-" + index, "/logs/").toString());
properties.put("spring.jmx.enabled", false);
properties.put("nacos.core.snowflake.worker-id", index + 1);
MapPropertySource propertySource = new MapPropertySource(
"nacos_cluster_test", properties);
ConfigurableEnvironment environment = new StandardServletEnvironment();
environment.getPropertySources().addFirst(propertySource);
SpringApplication cluster = new SpringApplicationBuilder(cls).web(
WebApplicationType.SERVLET).environment(environment)
.properties(clusterInfo).properties("embeddedStorage=true")
.build();
ConfigurableApplicationContext context = cluster.run();
DistributedDatabaseOperateImpl operate = context.getBean(DistributedDatabaseOperateImpl.class);
CPProtocol protocol = context.getBean(CPProtocol.class);
protocol.protocolMetaData()
.subscribe(operate.group(), Constants.LEADER_META_DATA,
(o, arg) -> {
System.out.println("node : 884" + (7 + index) + "-> select leader is : " + arg);
if (finished[index].compareAndSet(false, true)) {
latch.countDown();
}
});
new Thread(() -> {
try {
Thread.sleep(5000L);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (finished[index].compareAndSet(false, true)) {
latch.countDown();
}
}
});
applications.put(String.valueOf(properties.get("server.port")), context);
}
catch (Throwable e) {
e.printStackTrace();
} finally {
latch.countDown();
}
};
runnable.run();
}
}

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.test.core.cluster;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeEvent;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.notify.Event;
import com.alibaba.nacos.core.notify.NotifyCenter;
@ -32,7 +33,11 @@ import org.junit.runners.MethodSorters;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.mock.web.MockServletContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@ -56,6 +61,32 @@ public class ServerMemberManager_ITCase {
memberManager.shutdown();
}
@Test
public void test_k_isFirst() {
String firstIp = "127.0.0.1:8847";
String secondIp = "127.0.0.1:8847";
String thirdIp = "127.0.0.1:8847";
ConcurrentSkipListMap<String, Member> map = new ConcurrentSkipListMap<>();
map.put(secondIp, Member.builder()
.ip("127.0.0.1")
.port(8847)
.build());
map.put(firstIp, Member.builder()
.ip("127.0.0.1")
.port(8848)
.build());
map.put(thirdIp, Member.builder()
.ip("127.0.0.1")
.port(8849)
.build());
List<Member> members = new ArrayList<Member>(map.values());
Collections.sort(members);
List<String> ss = MemberUtils.simpleMembers(members);
Assert.assertEquals(ss.get(0), members.get(0).getAddress());
}
@Test
public void test_a_member_change() throws Exception {

View File

@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@Ignore
@FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
public class NotifyCenter_ITCase {
@ -103,7 +102,7 @@ public class NotifyCenter_ITCase {
System.out.println("TestEvent event num : " + NotifyCenter.getPublisher(TestEvent.class).currentEventSize());
System.out.println("TestSlowEvent event num : " + NotifyCenter.getPublisher(TestSlowEvent.class).currentEventSize());
latch.await();
latch.await(5_000L, TimeUnit.MILLISECONDS);
Assert.assertEquals(2, count.get());
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.test.naming;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.test.core.BaseClusterTest;
import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
public class NamingRaft_DITCase extends BaseClusterTest {
@Test
public void test_register_instance() throws Exception {
String serviceName = NamingBase.randomDomainName();
Instance instance = new Instance();
instance.setEphemeral(true); //是否临时实例
instance.setServiceName(serviceName);
instance.setClusterName("c1");
instance.setIp("11.11.11.11");
instance.setPort(80);
try {
inaming7.registerInstance(serviceName, instance);
} catch (Throwable ex) {
ex.printStackTrace();
CountDownLatch latch = new CountDownLatch(1);
latch.await(60_000L, TimeUnit.MILLISECONDS);
}
ThreadUtils.sleep(5_000L);
List<Instance> list = inaming8.getAllInstances(serviceName);
Assert.assertEquals(1, list.size());
Instance host = list.get(0);
Assert.assertEquals(host.getIp(), instance.getIp());
Assert.assertEquals(host.getPort(), instance.getPort());
Assert.assertEquals(host.getServiceName(), NamingUtils.getGroupedName(instance.getServiceName(), "DEFAULT_GROUP"));
Assert.assertEquals(host.getClusterName(), instance.getClusterName());
}
}