Resolve the issues of codestyle of nacos-config module for phase5. (#3272)

* [#3249]Fix chinese unit method name in config test module.

* [#3249]resolve the code style issue in service package.

* [ISSUE#3249]fix typo.
This commit is contained in:
Hu Zongtang 2020-07-08 09:21:13 +08:00 committed by GitHub
parent 144c1819b0
commit 0086dfe787
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 628 additions and 311 deletions

View File

@ -16,21 +16,16 @@
package com.alibaba.nacos.config.server.monitor;
import com.alibaba.nacos.config.server.service.ClientTrackService;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.notify.AsyncNotifyService;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.config.server.utils.LogUtil.memoryLog;
/**
* Memory monitor
* Memory monitor.
*
* @author Nacos
*/
@ -57,41 +52,4 @@ public class MemoryMonitor {
MetricsMonitor.getConfigMonitor().set(0);
MetricsMonitor.getPublishMonitor().set(0);
}
}
class PrintGetConfigResponeTask implements Runnable {
@Override
public void run() {
memoryLog.info(ResponseMonitor.getStringForPrint());
}
}
class PrintMemoryTask implements Runnable {
@Override
public void run() {
int groupCount = ConfigCacheService.groupCount();
int subClientCount = ClientTrackService.subscribeClientCount();
long subCount = ClientTrackService.subscriberCount();
memoryLog.info("groupCount={}, subscriberClientCount={}, subscriberCount={}", groupCount, subClientCount,
subCount);
MetricsMonitor.getConfigCountMonitor().set(groupCount);
}
}
class NotifyTaskQueueMonitorTask implements Runnable {
final private AsyncNotifyService notifySingleService;
NotifyTaskQueueMonitorTask(AsyncNotifyService notifySingleService) {
this.notifySingleService = notifySingleService;
}
@Override
public void run() {
int size = ((ScheduledThreadPoolExecutor) notifySingleService.getExecutor()).getQueue().size();
memoryLog.info("toNotifyTaskSize={}", size);
MetricsMonitor.getNotifyTaskMonitor().set(size);
}
}
}

View File

@ -16,14 +16,18 @@
package com.alibaba.nacos.config.server.monitor;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.Counter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Metrics Monitor
* Metrics Monitor.
*
* @author Nacos
*/

View File

@ -0,0 +1,44 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.monitor;
import com.alibaba.nacos.config.server.service.notify.AsyncNotifyService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static com.alibaba.nacos.config.server.utils.LogUtil.memoryLog;
/**
* NotifyTaskQueueMonitorTask.
*
* @author zongtanghu
*/
public class NotifyTaskQueueMonitorTask implements Runnable {
private final AsyncNotifyService notifySingleService;
NotifyTaskQueueMonitorTask(AsyncNotifyService notifySingleService) {
this.notifySingleService = notifySingleService;
}
@Override
public void run() {
int size = ((ScheduledThreadPoolExecutor) notifySingleService.getExecutor()).getQueue().size();
memoryLog.info("toNotifyTaskSize = {}", size);
MetricsMonitor.getNotifyTaskMonitor().set(size);
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.monitor;
import static com.alibaba.nacos.config.server.utils.LogUtil.memoryLog;
/**
* PrintGetConfigResponeTask.
*
* @author zongtanghu
*/
public class PrintGetConfigResponeTask implements Runnable {
@Override
public void run() {
memoryLog.info(ResponseMonitor.getStringForPrint());
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.monitor;
import com.alibaba.nacos.config.server.service.ClientTrackService;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import static com.alibaba.nacos.config.server.utils.LogUtil.memoryLog;
/**
* @author zongtanghu
*/
public class PrintMemoryTask implements Runnable {
@Override
public void run() {
int groupCount = ConfigCacheService.groupCount();
int subClientCount = ClientTrackService.subscribeClientCount();
long subCount = ClientTrackService.subscriberCount();
memoryLog.info("groupCount = {}, subscriberClientCount = {}, subscriberCount = {}", groupCount, subClientCount,
subCount);
MetricsMonitor.getConfigCountMonitor().set(groupCount);
}
}

View File

@ -20,7 +20,7 @@ import java.text.DecimalFormat;
import java.util.concurrent.atomic.AtomicLong;
/**
* Response Monitory
* Response Monitory.
*
* @author Nacos
*/
@ -48,12 +48,20 @@ public class ResponseMonitor {
refresh();
}
/**
* Refresh for getting configCountDetail.
*/
public static void refresh() {
for (int i = 0; i < getConfigCountDetail.length; i++) {
getConfigCountDetail[i] = new AtomicLong();
}
}
/**
* AddConfigTime.
*
* @param time config time which is added.
*/
public static void addConfigTime(long time) {
getConfigCount.incrementAndGet();
if (time < MS_50) {

View File

@ -22,6 +22,8 @@ import com.alibaba.nacos.config.server.result.core.IResultCode;
import org.springframework.util.Assert;
/**
* ResultBuilder.
*
* @author klw
* @ClassName: ResultBuilder
* @Description: util for generating {@link RestResult}
@ -29,6 +31,14 @@ import org.springframework.util.Assert;
*/
public class ResultBuilder {
/**
* BuildResult.
*
* @param resultCode resultCode.
* @param resultData resultData.
* @param <T> T.
* @return RestResult.
*/
public static <T extends Object> RestResult<T> buildResult(IResultCode resultCode, T resultData) {
Assert.notNull(resultCode, "the resultCode can not be null");
RestResult<T> rr = new RestResult<>(resultCode.getCode(), resultCode.getCodeMsg(), resultData);
@ -39,6 +49,14 @@ public class ResultBuilder {
return buildResult(ResultCodeEnum.SUCCESS, resultData);
}
/**
* BuildSuccessResult.
*
* @param successMsg successMsg string value.
* @param resultData resultData.
* @param <T> T.
* @return RestResult.
*/
public static <T extends Object> RestResult<T> buildSuccessResult(String successMsg, T resultData) {
RestResult<T> rr = buildResult(ResultCodeEnum.SUCCESS, resultData);
rr.setMessage(successMsg);
@ -49,6 +67,13 @@ public class ResultBuilder {
return buildResult(ResultCodeEnum.SUCCESS, null);
}
/**
* BuildSuccessResult.
*
* @param successMsg successMsg string value.
* @param <T> T.
* @return RestResult.
*/
public static <T extends Object> RestResult<T> buildSuccessResult(String successMsg) {
RestResult<T> rr = buildResult(ResultCodeEnum.SUCCESS, null);
rr.setMessage(successMsg);

View File

@ -19,6 +19,8 @@ package com.alibaba.nacos.config.server.result.code;
import com.alibaba.nacos.config.server.result.core.IResultCode;
/**
* ResultCodeEnum.
*
* @author klw
* @ClassName: ResultCodeEnum
* @Description: result code enum
@ -27,13 +29,13 @@ import com.alibaba.nacos.config.server.result.core.IResultCode;
public enum ResultCodeEnum implements IResultCode {
/**
* common code
* Common code.
**/
SUCCESS(200, "处理成功"),
ERROR(500, "服务器内部错误"),
/**
* config use 100001 ~ 100999
* Config use 100001 ~ 100999.
**/
NAMESPACE_NOT_EXIST(100001, "目标 namespace 不存在"),
@ -45,9 +47,7 @@ public enum ResultCodeEnum implements IResultCode {
DATA_EMPTY(100005, "导入的文件数据为空"),
NO_SELECTED_CONFIG(100006, "没有选择任何配制"),
;
NO_SELECTED_CONFIG(100006, "没有选择任何配制");
private int code;

View File

@ -17,6 +17,8 @@
package com.alibaba.nacos.config.server.result.core;
/**
* IResultCode.
*
* @author klw
* @ClassName: IResultCode
* @Description: result code enum needs to be implemented this interface
@ -25,22 +27,16 @@ package com.alibaba.nacos.config.server.result.core;
public interface IResultCode {
/**
* get the result code
* Get the result code.
*
* @return java.lang.String
* @author klw
* @Date 2019/6/28 14:56
* @Param []
* @return code value.
*/
int getCode();
/**
* get the result code's message
* Get the result code's message.
*
* @return java.lang.String
* @author klw
* @Date 2019/6/28 14:56
* @Param []
* @return code's message.
*/
String getCodeMsg();
}

View File

@ -32,7 +32,7 @@ import static com.alibaba.nacos.config.server.utils.LogUtil.defaultLog;
import static com.alibaba.nacos.config.server.utils.LogUtil.fatalLog;
/**
* 聚合数据白名单
* AggrWhitelist.
*
* @author Nacos
*/
@ -42,7 +42,10 @@ public class AggrWhitelist {
public static final String AGGRIDS_METADATA = "com.alibaba.nacos.metadata.aggrIDs";
/**
* 判断指定的dataId是否在聚合dataId白名单
* Judge whether specified dataId includes aggregation white list.
*
* @param dataId dataId string value.
* @return Whether to match aggregation rules.
*/
public static boolean isAggrDataId(String dataId) {
if (null == dataId) {
@ -58,7 +61,9 @@ public class AggrWhitelist {
}
/**
* 传入内容重新加载聚合白名单
* Load aggregation white lists based content parameter value.
*
* @param content content string value.
*/
public static void load(String content) {
if (StringUtils.isBlank(content)) {
@ -91,8 +96,6 @@ public class AggrWhitelist {
return AGGR_DATAID_WHITELIST.get();
}
// =======================
static final AtomicReference<List<Pattern>> AGGR_DATAID_WHITELIST = new AtomicReference<List<Pattern>>(
new ArrayList<Pattern>());
}

View File

@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static com.alibaba.nacos.config.server.utils.LogUtil.defaultLog;
/**
* Client ip whitelist
* Client ip whitelist.
*
* @author Nacos
*/
@ -36,13 +36,17 @@ import static com.alibaba.nacos.config.server.utils.LogUtil.defaultLog;
public class ClientIpWhiteList {
/**
* 判断指定的ip在白名单中
* Judge whether specified client ip includes in the whitelist.
*
* @param clientIp clientIp string value.
* @return Judge result.
*/
static public boolean isLegalClient(String clientIp) {
public static boolean isLegalClient(String clientIp) {
if (StringUtils.isBlank(clientIp)) {
throw new IllegalArgumentException("clientIp is empty");
}
clientIp = clientIp.trim();
if (CLIENT_IP_WHITELIST.get().contains(clientIp)) {
return true;
}
@ -50,18 +54,20 @@ public class ClientIpWhiteList {
}
/**
* whether start client ip whitelist
* Whether start client ip whitelist.
*
* @return true: enable ; false disable
*/
static public boolean isEnableWhitelist() {
public static boolean isEnableWhitelist() {
return isOpen;
}
/**
* 传入内容重新加载客户端ip白名单
* Load white lists based content parameter value.
*
* @param content content string value.
*/
static public void load(String content) {
public static void load(String content) {
if (StringUtils.isBlank(content)) {
defaultLog.warn("clientIpWhiteList is blank.close whitelist.");
isOpen = false;
@ -78,12 +84,10 @@ public class ClientIpWhiteList {
}
}
// =======================
public static final String CLIENT_IP_WHITELIST_METADATA = "com.alibaba.nacos.metadata.clientIpWhitelist";
static public final String CLIENT_IP_WHITELIST_METADATA = "com.alibaba.nacos.metadata.clientIpWhitelist";
static final AtomicReference<List<String>> CLIENT_IP_WHITELIST = new AtomicReference<List<String>>(
private static final AtomicReference<List<String>> CLIENT_IP_WHITELIST = new AtomicReference<List<String>>(
new ArrayList<String>());
static Boolean isOpen = false;
private static Boolean isOpen = false;
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.service;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* ClientRecord saves records which fetch from client-side.
*
* @author zongtanghu
*/
public class ClientRecord {
private final String ip;
private volatile long lastTime;
private final ConcurrentMap<String, String> groupKey2md5Map;
private final ConcurrentMap<String, Long> groupKey2pollingTsMap;
public ClientRecord(final String clientIp) {
this.ip = clientIp;
this.groupKey2md5Map = new ConcurrentHashMap<String, String>(20, 0.75f, 1);
this.groupKey2pollingTsMap = new ConcurrentHashMap<String, Long>(20, 0.75f, 1);
}
public String getIp() {
return ip;
}
public long getLastTime() {
return lastTime;
}
public void setLastTime(long lastTime) {
this.lastTime = lastTime;
}
public ConcurrentMap<String, String> getGroupKey2md5Map() {
return groupKey2md5Map;
}
public ConcurrentMap<String, Long> getGroupKey2pollingTsMap() {
return groupKey2pollingTsMap;
}
}

View File

@ -24,58 +24,77 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 跟踪客户端md5的服务 一段时间没有比较md5后就删除IP对应的记录
* ClientTrackService which tracks client's md5 service and delete expired ip's records.
*
* @author Nacos
*/
public class ClientTrackService {
/**
* 跟踪客户端md5.
* Track client's md5 value.
*/
static public void trackClientMd5(String ip, Map<String, String> clientMd5Map) {
public static void trackClientMd5(String ip, Map<String, String> clientMd5Map) {
ClientRecord record = getClientRecord(ip);
record.lastTime = System.currentTimeMillis();
record.groupKey2md5Map.putAll(clientMd5Map);
}
static public void trackClientMd5(String ip, Map<String, String> clientMd5Map,
Map<String, Long> clientlastPollingTSMap) {
ClientRecord record = getClientRecord(ip);
record.lastTime = System.currentTimeMillis();
record.groupKey2md5Map.putAll(clientMd5Map);
record.groupKey2pollingTsMap.putAll(clientlastPollingTSMap);
}
static public void trackClientMd5(String ip, String groupKey, String clientMd5) {
ClientRecord record = getClientRecord(ip);
record.lastTime = System.currentTimeMillis();
record.groupKey2md5Map.put(groupKey, clientMd5);
record.groupKey2pollingTsMap.put(groupKey, record.lastTime);
record.setLastTime(System.currentTimeMillis());
record.getGroupKey2md5Map().putAll(clientMd5Map);
}
/**
* 返回订阅者客户端个数
* TrackClientMd5.
*
* @param ip ip string value.
* @param clientMd5Map clientMd5Map.
* @param clientLastPollingTsMap clientLastPollingTsMap.
*/
static public int subscribeClientCount() {
public static void trackClientMd5(String ip, Map<String, String> clientMd5Map,
Map<String, Long> clientLastPollingTsMap) {
ClientRecord record = getClientRecord(ip);
record.setLastTime(System.currentTimeMillis());
record.getGroupKey2md5Map().putAll(clientMd5Map);
record.getGroupKey2pollingTsMap().putAll(clientLastPollingTsMap);
}
/**
* Put the specified value(ip/groupKey/clientMd5) into clientRecords Map.
*
* @param ip ip string value.
* @param groupKey groupKey string value.
* @param clientMd5 clientMd5 string value.
*/
public static void trackClientMd5(String ip, String groupKey, String clientMd5) {
ClientRecord record = getClientRecord(ip);
record.setLastTime(System.currentTimeMillis());
record.getGroupKey2md5Map().put(groupKey, clientMd5);
record.getGroupKey2pollingTsMap().put(groupKey, record.getLastTime());
}
/**
* Get subscribe client count.
*
* @return subscribe client count.
*/
public static int subscribeClientCount() {
return clientRecords.size();
}
/**
* 返回所有订阅者个数
* Get all of subsciber count.
*
* @return all of subsciber count.
*/
static public long subscriberCount() {
public static long subscriberCount() {
long count = 0;
for (ClientRecord record : clientRecords.values()) {
count += record.groupKey2md5Map.size();
count += record.getGroupKey2md5Map().size();
}
return count;
}
/**
* groupkey -> SubscriberStatus
* Groupkey -> SubscriberStatus.
*
*/
static public Map<String, SubscriberStatus> listSubStatus(String ip) {
public static Map<String, SubscriberStatus> listSubStatus(String ip) {
Map<String, SubscriberStatus> status = new HashMap<String, SubscriberStatus>(100);
ClientRecord record = getClientRecord(ip);
@ -83,10 +102,10 @@ public class ClientTrackService {
return status;
}
for (Map.Entry<String, String> entry : record.groupKey2md5Map.entrySet()) {
for (Map.Entry<String, String> entry : record.getGroupKey2md5Map().entrySet()) {
String groupKey = entry.getKey();
String clientMd5 = entry.getValue();
long lastPollingTs = record.groupKey2pollingTsMap.get(groupKey);
long lastPollingTs = record.getGroupKey2pollingTsMap().get(groupKey);
boolean isUpdate = ConfigCacheService.isUptodate(groupKey, clientMd5);
status.put(groupKey, new SubscriberStatus(groupKey, isUpdate, clientMd5, lastPollingTs));
@ -96,18 +115,18 @@ public class ClientTrackService {
}
/**
* ip -> SubscriberStatus
* Ip -> SubscriberStatus.
*/
static public Map<String, SubscriberStatus> listSubsByGroup(String groupKey) {
public static Map<String, SubscriberStatus> listSubsByGroup(String groupKey) {
Map<String, SubscriberStatus> subs = new HashMap<String, SubscriberStatus>(100);
for (ClientRecord clientRec : clientRecords.values()) {
String clientMd5 = clientRec.groupKey2md5Map.get(groupKey);
Long lastPollingTs = clientRec.groupKey2pollingTsMap.get(groupKey);
String clientMd5 = clientRec.getGroupKey2md5Map().get(groupKey);
Long lastPollingTs = clientRec.getGroupKey2pollingTsMap().get(groupKey);
if (null != clientMd5 && lastPollingTs != null) {
if (null != clientMd5 && null != lastPollingTs) {
Boolean isUpdate = ConfigCacheService.isUptodate(groupKey, clientMd5);
subs.put(clientRec.ip, new SubscriberStatus(groupKey, isUpdate, clientMd5, lastPollingTs));
subs.put(clientRec.getIp(), new SubscriberStatus(groupKey, isUpdate, clientMd5, lastPollingTs));
}
}
@ -115,11 +134,13 @@ public class ClientTrackService {
}
/**
* 指定订阅者IP查找数据是否最新 groupKey -> isUptodate
* Specify subscriber's ip and look up whether data is lastest.
* groupKey -> isUptodate.
*
*/
static public Map<String, Boolean> isClientUptodate(String ip) {
public static Map<String, Boolean> isClientUptodate(String ip) {
Map<String, Boolean> result = new HashMap<String, Boolean>(100);
for (Map.Entry<String, String> entry : getClientRecord(ip).groupKey2md5Map.entrySet()) {
for (Map.Entry<String, String> entry : getClientRecord(ip).getGroupKey2md5Map().entrySet()) {
String groupKey = entry.getKey();
String clientMd5 = entry.getValue();
Boolean isuptodate = ConfigCacheService.isUptodate(groupKey, clientMd5);
@ -129,61 +150,46 @@ public class ClientTrackService {
}
/**
* 指定groupKey查找所有订阅者以及数据是否最新 IP -> isUptodate
* Specify groupKey and look up whether subsciber and data is lastest.
* IP -> isUptodate.
*/
static public Map<String, Boolean> listSubscriberByGroup(String groupKey) {
public static Map<String, Boolean> listSubscriberByGroup(String groupKey) {
Map<String, Boolean> subs = new HashMap<String, Boolean>(100);
for (ClientRecord clientRec : clientRecords.values()) {
String clientMd5 = clientRec.groupKey2md5Map.get(groupKey);
String clientMd5 = clientRec.getGroupKey2md5Map().get(groupKey);
if (null != clientMd5) {
Boolean isuptodate = ConfigCacheService.isUptodate(groupKey, clientMd5);
subs.put(clientRec.ip, isuptodate);
subs.put(clientRec.getIp(), isuptodate);
}
}
return subs;
}
/**
* 找到指定clientIp对应的记录
* Get and return the record of specified client ip.
*
* @param clientIp clientIp string value.
* @return the record of specified client ip.
*/
static private ClientRecord getClientRecord(String clientIp) {
private static ClientRecord getClientRecord(String clientIp) {
ClientRecord record = clientRecords.get(clientIp);
if (null != record) {
return record;
}
clientRecords.putIfAbsent(clientIp, new ClientRecord(clientIp));
return clientRecords.get(clientIp);
ClientRecord clientRecord = new ClientRecord(clientIp);
clientRecords.putIfAbsent(clientIp, clientRecord);
return clientRecord;
}
static public void refreshClientRecord() {
public static void refreshClientRecord() {
clientRecords = new ConcurrentHashMap<String, ClientRecord>(50);
}
/**
* 所有客户端记录遍历 >> 新增/删除
* All of client records, adding or deleting.
*/
static volatile ConcurrentMap<String, ClientRecord> clientRecords = new ConcurrentHashMap<String, ClientRecord>();
}
/**
* 保存客户端拉数据的记录
*/
class ClientRecord {
final String ip;
volatile long lastTime;
final ConcurrentMap<String, String> groupKey2md5Map;
final ConcurrentMap<String, Long> groupKey2pollingTsMap;
ClientRecord(String clientIp) {
ip = clientIp;
groupKey2md5Map = new ConcurrentHashMap<String, String>(20, 0.75f, 1);
groupKey2pollingTsMap = new ConcurrentHashMap<String, Long>(20, 0.75f, 1);
}
}

View File

@ -33,14 +33,21 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import static com.alibaba.nacos.config.server.utils.LogUtil.*;
import static com.alibaba.nacos.config.server.utils.LogUtil.dumpLog;
import static com.alibaba.nacos.config.server.utils.LogUtil.fatalLog;
import static com.alibaba.nacos.config.server.utils.LogUtil.defaultLog;
/**
* config service
* Config service.
*
* @author Nacos
*/
@ -49,18 +56,26 @@ public class ConfigCacheService {
@Autowired
private static PersistService persistService;
static public int groupCount() {
public static int groupCount() {
return CACHE.size();
}
static public boolean hasGroupKey(String groupKey) {
public static boolean hasGroupKey(String groupKey) {
return CACHE.containsKey(groupKey);
}
/**
* 保存配置文件并缓存md5.
* Save config file and update md5 value in cache.
*
* @param dataId dataId string value.
* @param group group string value.
* @param tenant tenant string value.
* @param content content string value.
* @param lastModifiedTs lastModifiedTs.
* @param type file type.
* @return dumpChange success or not.
*/
static public boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
String type) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
CacheItem ci = makeSure(groupKey);
@ -91,7 +106,7 @@ public class ConfigCacheService {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
.contains(DISK_QUATA_EN)) {
// 磁盘写满保护代码
// Protect from disk full.
fatalLog.error("磁盘满自杀退出", ioe);
System.exit(0);
}
@ -103,9 +118,17 @@ public class ConfigCacheService {
}
/**
* 保存配置文件并缓存md5.
* Save config file and update md5 value in cache.
*
* @param dataId dataId string value.
* @param group group string value.
* @param tenant tenant string value.
* @param content content string value.
* @param lastModifiedTs lastModifiedTs.
* @param betaIps betaIps string value.
* @return dumpChange success or not.
*/
static public boolean dumpBeta(String dataId, String group, String tenant, String content, long lastModifiedTs,
public static boolean dumpBeta(String dataId, String group, String tenant, String content, long lastModifiedTs,
String betaIps) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
@ -140,9 +163,17 @@ public class ConfigCacheService {
}
/**
* 保存配置文件并缓存md5.
* Save config file and update md5 value in cache.
*
* @param dataId dataId string value.
* @param group group string value.
* @param tenant tenant string value.
* @param content content string value.
* @param lastModifiedTs lastModifiedTs.
* @param tag tag string value.
* @return dumpChange success or not.
*/
static public boolean dumpTag(String dataId, String group, String tenant, String tag, String content,
public static boolean dumpTag(String dataId, String group, String tenant, String tag, String content,
long lastModifiedTs) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
@ -176,9 +207,16 @@ public class ConfigCacheService {
}
/**
* 保存配置文件并缓存md5.
* Save config file and update md5 value in cache.
*
* @param dataId dataId string value.
* @param group group string value.
* @param tenant tenant string value.
* @param content content string value.
* @param lastModifiedTs lastModifiedTs.
* @return dumpChange success or not.
*/
static public boolean dumpChange(String dataId, String group, String tenant, String content, long lastModifiedTs) {
public static boolean dumpChange(String dataId, String group, String tenant, String content, long lastModifiedTs) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
makeSure(groupKey);
@ -212,7 +250,10 @@ public class ConfigCacheService {
}
}
static public void reloadConfig() {
/**
* Reload config.
*/
public static void reloadConfig() {
String aggreds = null;
try {
if (PropertyUtil.isEmbeddedStorage()) {
@ -268,10 +309,13 @@ public class ConfigCacheService {
} catch (IOException e) {
dumpLog.error("reload fail:" + SwitchService.SWITCH_META_DATAID, e);
}
}
static public List<String> checkMd5() {
/**
* Check md5.
* @return return diff result list.
*/
public static List<String> checkMd5() {
List<String> diffList = new ArrayList<String>();
long startTime = System.currentTimeMillis();
for (Entry<String/* groupKey */, CacheItem> entry : CACHE.entrySet()) {
@ -296,21 +340,24 @@ public class ConfigCacheService {
}
/**
* 删除配置文件删除缓存
* Delete config file, and delete cache.
*
* @param dataId dataId string value.
* @param group group string value.
* @param tenant tenant string value.
* @return remove success or not.
*/
static public boolean remove(String dataId, String group, String tenant) {
public static boolean remove(String dataId, String group, String tenant) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
final int lockResult = tryWriteLock(groupKey);
/**
* 数据不存在
*/
// If data is non-existent.
if (0 == lockResult) {
dumpLog.info("[remove-ok] {} not exist.", groupKey);
return true;
}
/**
* 加锁失败
*/
// try to lock failed
if (lockResult < 0) {
dumpLog.warn("[remove-error] write lock failed. {}", groupKey);
return false;
@ -330,21 +377,24 @@ public class ConfigCacheService {
}
/**
* 删除配置文件删除缓存
* Delete beta config file, and delete cache.
*
* @param dataId dataId string value.
* @param group group string value.
* @param tenant tenant string value.
* @return remove success or not.
*/
static public boolean removeBeta(String dataId, String group, String tenant) {
public static boolean removeBeta(String dataId, String group, String tenant) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
final int lockResult = tryWriteLock(groupKey);
/**
* 数据不存在
*/
// If data is non-existent.
if (0 == lockResult) {
dumpLog.info("[remove-ok] {} not exist.", groupKey);
return true;
}
/**
* 加锁失败
*/
// try to lock failed
if (lockResult < 0) {
dumpLog.warn("[remove-error] write lock failed. {}", groupKey);
return false;
@ -365,21 +415,25 @@ public class ConfigCacheService {
}
/**
* 删除配置文件删除缓存
* Delete tag config file, and delete cache.
*
* @param dataId dataId string value.
* @param group group string value.
* @param tenant tenant string value.
* @param tag tag string value.
* @return remove success or not.
*/
static public boolean removeTag(String dataId, String group, String tenant, String tag) {
public static boolean removeTag(String dataId, String group, String tenant, String tag) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
final int lockResult = tryWriteLock(groupKey);
/**
* 数据不存在
*/
// If data is non-existent.
if (0 == lockResult) {
dumpLog.info("[remove-ok] {} not exist.", groupKey);
return true;
}
/**
* 加锁失败
*/
// try to lock failed
if (lockResult < 0) {
dumpLog.warn("[remove-error] write lock failed. {}", groupKey);
return false;
@ -400,6 +454,13 @@ public class ConfigCacheService {
}
}
/**
* Update md5 value.
*
* @param groupKey groupKey string value.
* @param md5 md5 string value.
* @param lastModifiedTs lastModifiedTs long value.
*/
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
@ -409,6 +470,14 @@ public class ConfigCacheService {
}
}
/**
* Update Beta md5 value.
*
* @param groupKey groupKey string value.
* @param md5 md5 string value.
* @param ips4Beta ips4Beta List.
* @param lastModifiedTs lastModifiedTs long value.
*/
public static void updateBetaMd5(String groupKey, String md5, List<String> ips4Beta, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md54Beta == null || !cache.md54Beta.equals(md5)) {
@ -420,6 +489,14 @@ public class ConfigCacheService {
}
}
/**
* Update tag md5 value.
*
* @param groupKey groupKey string value.
* @param tag tag string value.
* @param md5 md5 string value.
* @param lastModifiedTs lastModifiedTs long value.
*/
public static void updateTagMd5(String groupKey, String tag, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.tagMd5 == null) {
@ -444,51 +521,14 @@ public class ConfigCacheService {
}
/**
* 返回cache的md5零长度字符串表示没有该数据
* Get and return content md5 value from cache. Empty string represents no data.
*/
static public String getContentMd5(String groupKey) {
public static String getContentMd5(String groupKey) {
CacheItem item = CACHE.get(groupKey);
return (null != item) ? item.md5 : Constants.NULL;
}
/**
* 返回cache的md5零长度字符串表示没有该数据
*/
static public String getContentBetaMd5(String groupKey) {
CacheItem item = CACHE.get(groupKey);
return (null != item) ? item.md54Beta : Constants.NULL;
}
/**
* 返回cache的md5零长度字符串表示没有该数据
*/
static public String getContentTagMd5(String groupKey, String tag) {
CacheItem item = CACHE.get(groupKey);
if (item == null) {
return Constants.NULL;
}
if (item.tagMd5 == null) {
return Constants.NULL;
}
return item.tagMd5.get(tag);
}
/**
* 返回beta Ip列表
*/
static public List<String> getBetaIps(String groupKey) {
CacheItem item = CACHE.get(groupKey);
return (null != item) ? item.getIps4Beta() : Collections.<String>emptyList();
}
/**
* 返回cache
*/
static public CacheItem getContentCache(String groupKey) {
return CACHE.get(groupKey);
}
static public String getContentMd5(String groupKey, String ip, String tag) {
public static String getContentMd5(String groupKey, String ip, String tag) {
CacheItem item = CACHE.get(groupKey);
if (item != null && item.isBeta) {
if (item.ips4Beta.contains(ip)) {
@ -503,28 +543,76 @@ public class ConfigCacheService {
return (null != item) ? item.md5 : Constants.NULL;
}
static public long getLastModifiedTs(String groupKey) {
/**
* Get and return beta md5 value from cache. Empty string represents no data.
*/
public static String getContentBetaMd5(String groupKey) {
CacheItem item = CACHE.get(groupKey);
return (null != item) ? item.md54Beta : Constants.NULL;
}
/**
* Get and return tag md5 value from cache. Empty string represents no data.
*
* @param groupKey groupKey string value.
* @param tag tag string value.
* @return Content Tag Md5 value.
*/
public static String getContentTagMd5(String groupKey, String tag) {
CacheItem item = CACHE.get(groupKey);
if (item == null) {
return Constants.NULL;
}
if (item.tagMd5 == null) {
return Constants.NULL;
}
return item.tagMd5.get(tag);
}
/**
* Get and return beta ip list.
*
* @param groupKey groupKey string value.
* @return list beta ips.
*/
public static List<String> getBetaIps(String groupKey) {
CacheItem item = CACHE.get(groupKey);
return (null != item) ? item.getIps4Beta() : Collections.<String>emptyList();
}
/**
* Get and return content cache.
*
* @param groupKey groupKey string value.
* @return CacheItem.
*/
public static CacheItem getContentCache(String groupKey) {
return CACHE.get(groupKey);
}
public static long getLastModifiedTs(String groupKey) {
CacheItem item = CACHE.get(groupKey);
return (null != item) ? item.lastModifiedTs : 0L;
}
static public boolean isUptodate(String groupKey, String md5) {
public static boolean isUptodate(String groupKey, String md5) {
String serverMd5 = ConfigCacheService.getContentMd5(groupKey);
return StringUtils.equals(md5, serverMd5);
}
static public boolean isUptodate(String groupKey, String md5, String ip, String tag) {
public static boolean isUptodate(String groupKey, String md5, String ip, String tag) {
String serverMd5 = ConfigCacheService.getContentMd5(groupKey, ip, tag);
return StringUtils.equals(md5, serverMd5);
}
/**
* 给数据加读锁如果成功后面必须调用{@link #releaseReadLock(String)}失败则不需要
* Try to add read lock. If it successed, then it can call {@link #releaseWriteLock(String)}.And it won't call if
* failed.
*
* @param groupKey
* @return 零表示没有数据失败正数表示成功负数表示有写锁导致加锁失败
* @param groupKey groupKey string value.
* @return 0 - No data and failed. Positive number 0 - Success. Negative number - lock failed
*/
static public int tryReadLock(String groupKey) {
public static int tryReadLock(String groupKey) {
CacheItem groupItem = CACHE.get(groupKey);
int result = (null == groupItem) ? 0 : (groupItem.rwLock.tryReadLock() ? 1 : -1);
if (result < 0) {
@ -533,7 +621,12 @@ public class ConfigCacheService {
return result;
}
static public void releaseReadLock(String groupKey) {
/**
* Release readLock.
*
* @param groupKey groupKey string value.
*/
public static void releaseReadLock(String groupKey) {
CacheItem item = CACHE.get(groupKey);
if (null != item) {
item.rwLock.releaseReadLock();
@ -541,10 +634,11 @@ public class ConfigCacheService {
}
/**
* 给数据加写锁如果成功后面必须调用{@link #releaseWriteLock(String)}失败则不需要
* Try to add write lock. If it successed, then it can call {@link #releaseWriteLock(String)}.And it won't call if
* failed.
*
* @param groupKey
* @return 零表示没有数据失败正数表示成功负数表示加锁失败
* @param groupKey groupKey string value.
* @return 0 - No data and failed. Positive number 0 - Success. Negative number - lock failed
*/
static int tryWriteLock(String groupKey) {
CacheItem groupItem = CACHE.get(groupKey);
@ -572,19 +666,19 @@ public class ConfigCacheService {
return (null == item) ? tmp : item;
}
private final static String NO_SPACE_CN = "设备上没有空间";
private static final String NO_SPACE_CN = "设备上没有空间";
private final static String NO_SPACE_EN = "No space left on device";
private static final String NO_SPACE_EN = "No space left on device";
private final static String DISK_QUATA_CN = "超出磁盘限额";
private static final String DISK_QUATA_CN = "超出磁盘限额";
private final static String DISK_QUATA_EN = "Disk quota exceeded";
private static final String DISK_QUATA_EN = "Disk quota exceeded";
static final Logger log = LoggerFactory.getLogger(ConfigCacheService.class);
static final Logger LOGGER = LoggerFactory.getLogger(ConfigCacheService.class);
/**
* groupKey -> cacheItem
* groupKey -> cacheItem.
*/
static private final ConcurrentHashMap<String, CacheItem> CACHE = new ConcurrentHashMap<String, CacheItem>();
private static final ConcurrentHashMap<String, CacheItem> CACHE = new ConcurrentHashMap<String, CacheItem>();
}

View File

@ -22,10 +22,16 @@ import com.alibaba.nacos.config.server.utils.event.EventDispatcher;
import com.alibaba.nacos.core.utils.ApplicationUtils;
/**
* ConfigChangePublisher.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class ConfigChangePublisher {
/**
* Notify ConfigChange.
* @param event ConfigDataChangeEvent instance.
*/
public static void notifyConfigChange(ConfigDataChangeEvent event) {
if (PropertyUtil.isEmbeddedStorage() && !ApplicationUtils.getStandaloneMode()) {
return;

View File

@ -37,10 +37,21 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Callable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ExecutorCompletionService;
/**
* config sub service
* Config sub service.
*
* @author Nacos
*/
@ -72,11 +83,11 @@ public class ConfigSubService {
}
/**
* 获得调用的URL
* Get and return called url string value.
*
* @param ip ip
* @param relativePath path
* @return all path
* @param ip ip.
* @param relativePath path.
* @return all path.
*/
private String getUrl(String ip, String relativePath) {
return "http://" + ip + ApplicationUtils.getContextPath() + relativePath;
@ -87,16 +98,16 @@ public class ConfigSubService {
Collection<Member> ipList = memberManager.allMembers();
List<SampleResult> collectionResult = new ArrayList<SampleResult>(ipList.size());
// 提交查询任务
// Submit query task.
for (Member ip : ipList) {
try {
completionService.submit(new Job(ip.getAddress(), url, params));
} catch (Exception e) { // 发送请求失败
} catch (Exception e) { // Send request failed.
LogUtil.defaultLog
.warn("Get client info from {} with exception: {} during submit job", ip, e.getMessage());
}
}
// 获取结果并合并
// Get and merge result.
SampleResult sampleResults = null;
for (Member member : ipList) {
try {
@ -125,6 +136,13 @@ public class ConfigSubService {
return collectionResult;
}
/**
* Merge SampleResult.
*
* @param sampleCollectResult sampleCollectResult.
* @param sampleResults sampleResults.
* @return SampleResult.
*/
public SampleResult mergeSampleResult(SampleResult sampleCollectResult, List<SampleResult> sampleResults) {
SampleResult mergeResult = new SampleResult();
Map<String, String> lisentersGroupkeyStatus = null;
@ -146,7 +164,7 @@ public class ConfigSubService {
}
/**
* 去每个Nacos Server节点查询订阅者的任务
* Query subsrciber's task from every nacos server nodes.
*
* @author Nacos
*/
@ -177,9 +195,8 @@ public class ConfigSubService {
String urlAll = getUrl(ip, url) + "?" + paramUrl;
com.alibaba.nacos.config.server.service.notify.NotifyService.HttpResult result = NotifyService
.invokeURL(urlAll, null, Constants.ENCODE);
/**
* http code 200
*/
// Http code 200
if (result.code == HttpURLConnection.HTTP_OK) {
String json = result.content;
SampleResult resultObj = JSONUtils.deserializeObject(json, new TypeReference<SampleResult>() {

View File

@ -33,14 +33,30 @@ import org.springframework.stereotype.Service;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import static com.alibaba.nacos.config.server.utils.LogUtil.memoryLog;
import static com.alibaba.nacos.config.server.utils.LogUtil.pullLog;
/**
* 长轮询服务负责处理
* LongPollingService.
*
* @author Nacos
*/
@ -99,7 +115,7 @@ public class LongPollingService extends AbstractEventListener {
for (ClientLongPolling clientLongPolling : allSubs) {
if (clientLongPolling.ip.equals(clientIp)) {
// 一个ip可能有多个监听
// One ip can have multiple listener.
if (!lisentersGroupkeyStatus.equals(clientLongPolling.clientMd5Map)) {
lisentersGroupkeyStatus.putAll(clientLongPolling.clientMd5Map);
}
@ -110,10 +126,11 @@ public class LongPollingService extends AbstractEventListener {
}
/**
* 聚合采样结果中的采样ip和监听配置的信息合并策略用后面的覆盖前面的是没有问题的
* Aggregate the sampling IP and monitoring configuration information in the sampling results.
* There is no problem for the merging strategy to cover the previous one with the latter.
*
* @param sampleResults sample Results
* @return Results
* @param sampleResults sample Results.
* @return Results.
*/
public SampleResult mergeSampleResult(List<SampleResult> sampleResults) {
SampleResult mergeResult = new SampleResult();
@ -128,6 +145,10 @@ public class LongPollingService extends AbstractEventListener {
return mergeResult;
}
/**
* Collect application subscribe configinfos.
* @return configinfos results.
*/
public Map<String, Set<String>> collectApplicationSubscribeConfigInfos() {
if (allSubs == null || allSubs.isEmpty()) {
return null;
@ -208,6 +229,14 @@ public class LongPollingService extends AbstractEventListener {
return null;
}
/**
* Add LongPollingClient.
*
* @param req HttpServletRequest.
* @param rsp HttpServletResponse.
* @param clientMd5Map clientMd5Map.
* @param probeRequestSize probeRequestSize.
*/
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
@ -216,13 +245,12 @@ public class LongPollingService extends AbstractEventListener {
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 提前500ms返回响应为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance
*/
// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// do nothing but set fix polling timeout
// Do nothing but set fix polling timeout.
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
@ -240,9 +268,11 @@ public class LongPollingService extends AbstractEventListener {
}
}
String ip = RequestUtil.getRemoteIp(req);
// 一定要由HTTP线程调用否则离开后容器会立即发送响应
// Must be called by http thread, or send response.
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout()的超时时间不准所以只能自己控制
// AsyncContext.setTimeout() is incorrect, Control by oneself
asyncContext.setTimeout(0L);
scheduler.execute(
@ -259,7 +289,7 @@ public class LongPollingService extends AbstractEventListener {
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
// Ignore.
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
@ -268,7 +298,7 @@ public class LongPollingService extends AbstractEventListener {
}
}
static public boolean isSupportLongPolling(HttpServletRequest req) {
public static boolean isSupportLongPolling(HttpServletRequest req) {
return null != req.getHeader(LONG_POLLING_HEADER);
}
@ -288,21 +318,17 @@ public class LongPollingService extends AbstractEventListener {
scheduler.scheduleWithFixedDelay(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
}
// =================
public static final String LONG_POLLING_HEADER = "Long-Pulling-Timeout";
static public final String LONG_POLLING_HEADER = "Long-Pulling-Timeout";
static public final String LONG_POLLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";
public static final String LONG_POLLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";
final ScheduledExecutorService scheduler;
/**
* 长轮询订阅关系
* ClientLongPolling subscibers.
*/
final Queue<ClientLongPolling> allSubs;
// =================
class DataChangeTask implements Runnable {
@Override
@ -312,18 +338,18 @@ public class LongPollingService extends AbstractEventListener {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// 如果beta发布且不在beta列表直接跳过
// If published tag is not in the beta list, then it skipped.
if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
continue;
}
// 如果tag发布且不在tag列表直接跳过
// If published tag is not in the tag list, then it skipped.
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // 删除订阅关系
iter.remove(); // Delete subscribers' relationships.
LogUtil.clientLog
.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
RequestUtil
@ -359,8 +385,6 @@ public class LongPollingService extends AbstractEventListener {
final String tag;
}
// =================
class StatTask implements Runnable {
@Override
@ -370,8 +394,6 @@ public class LongPollingService extends AbstractEventListener {
}
}
// =================
class ClientLongPolling implements Runnable {
@Override
@ -381,9 +403,8 @@ public class LongPollingService extends AbstractEventListener {
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
/**
* 删除订阅关系
*/
// Delete subsciber's relations.
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
@ -418,9 +439,8 @@ public class LongPollingService extends AbstractEventListener {
}
void sendResponse(List<String> changedGroups) {
/**
* 取消超时任务
*/
// Cancel time out task.
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
@ -429,9 +449,8 @@ public class LongPollingService extends AbstractEventListener {
void generateResponse(List<String> changedGroups) {
if (null == changedGroups) {
/**
* 告诉容器发送HTTP响应
*/
// Tell web container to send http response.
asyncContext.complete();
return;
}
@ -439,17 +458,17 @@ public class LongPollingService extends AbstractEventListener {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
try {
String respString = MD5Util.compareMd5ResultString(changedGroups);
final String respString = MD5Util.compareMd5ResultString(changedGroups);
// 禁用缓存
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(respString);
asyncContext.complete();
} catch (Exception se) {
pullLog.error(se.toString(), se);
} catch (Exception ex) {
pullLog.error(ex.toString(), ex);
asyncContext.complete();
}
}
@ -466,8 +485,6 @@ public class LongPollingService extends AbstractEventListener {
this.tag = tag;
}
// =================
final AsyncContext asyncContext;
final Map<String, String> clientMd5Map;
@ -500,15 +517,15 @@ public class LongPollingService extends AbstractEventListener {
}
try {
String respString = MD5Util.compareMd5ResultString(changedGroups);
// 禁用缓存
final String respString = MD5Util.compareMd5ResultString(changedGroups);
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(respString);
} catch (Exception se) {
pullLog.error(se.toString(), se);
} catch (Exception ex) {
pullLog.error(ex.toString(), ex);
}
}
@ -519,5 +536,4 @@ public class LongPollingService extends AbstractEventListener {
public void setRetainIps(Map<String, Long> retainIps) {
this.retainIps = retainIps;
}
}

View File

@ -30,7 +30,7 @@ import java.util.Map;
import static com.alibaba.nacos.config.server.utils.LogUtil.fatalLog;
/**
* Switch
* SwitchService.
*
* @author Nacos
*/
@ -78,6 +78,11 @@ public class SwitchService {
return StringUtils.isBlank(value) ? defaultValue : value;
}
/**
* Load config.
*
* @param config config content string value.
*/
public static void load(String config) {
if (StringUtils.isBlank(config)) {
fatalLog.error("switch config is blank.");
@ -125,5 +130,4 @@ public class SwitchService {
return sb.toString();
}
}