encode bugfix (#4548)

* basic spell fix.import
 fail bugfix

* encode bugfix

* remote log bugfix;  create client concurrent bugfix;optimize log

* checkstyle  fix

* checkstyle  fix

* checkstyle  fix

* checkstyle  fix

* checkstyle  fix

* checkstyle pmd fix

* checkstyle pmd fix

* http poll sla bugfix; rpc config listen notify optimize
This commit is contained in:
nov.lzf 2020-12-26 15:53:10 +08:00 committed by GitHub
parent f486561bc9
commit 6e34f2886b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 577 additions and 501 deletions

View File

@ -93,7 +93,7 @@ public interface CmdbService {
*
* @param entityName name of entity
* @param entityType type of entity
* @return
* @return entity.
*/
Entity getEntity(String entityName, String entityType);
}

View File

@ -100,13 +100,13 @@ public class ConfigBatchListenRequest extends AbstractConfigRequest {
public ConfigListenContext() {
}
@Override
public String toString() {
return "ConfigListenContext{" + "group='" + group + '\'' + ", md5='" + md5 + '\'' + ", dataId='" + dataId
+ '\'' + ", tenant='" + tenant + '\'' + '}';
}
/**
* Getter method for property <tt>group</tt>.
*

View File

@ -41,7 +41,7 @@ public class ConfigChangeClusterSyncRequest extends AbstractConfigRequest {
/**
* is beta.
*
* @return
* @return is beta or not.
*/
public boolean isBeta() {
return "Y".equalsIgnoreCase(isBeta);

View File

@ -36,9 +36,10 @@ public class ConfigChangeBatchListenResponse extends Response {
}
/**
* add changed config.
* add changed config.
*
* @param dataId dataId.
* @param group group.
* @param group group.
* @param tenant tenant.
*/
public void addChangeConfig(String dataId, String group, String tenant) {
@ -71,7 +72,7 @@ public class ConfigChangeBatchListenResponse extends Response {
* build fail response.
*
* @param errorMessage errorMessage.
* @return
* @return response.
*/
public static ConfigChangeBatchListenResponse buildFailResponse(String errorMessage) {
ConfigChangeBatchListenResponse response = new ConfigChangeBatchListenResponse();
@ -145,7 +146,7 @@ public class ConfigChangeBatchListenResponse extends Response {
public void setTenant(String tenant) {
this.tenant = tenant;
}
@Override
public String toString() {
return "ConfigContext{" + "group='" + group + '\'' + ", dataId='" + dataId + '\'' + ", tenant='" + tenant

View File

@ -20,35 +20,35 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode;
/**
* ConfigPubishResponse.
* ConfigPublishResponse.
*
* @author liuzunfei
* @version $Id: ConfigPubishResponse.java, v 0.1 2020年07月16日 4:59 PM liuzunfei Exp $
*/
public class ConfigPubishResponse extends Response {
public class ConfigPublishResponse extends Response {
public ConfigPubishResponse() {
public ConfigPublishResponse() {
super();
}
/**
* Buidl success resposne.
* Build success response.
*
* @return
* @return response.
*/
public static ConfigPubishResponse buildSuccessResponse() {
return new ConfigPubishResponse();
public static ConfigPublishResponse buildSuccessResponse() {
return new ConfigPublishResponse();
}
/**
* Buidl fail resposne.
* Build fail response.
*
* @return
* @return response.
*/
public static ConfigPubishResponse buildFailResponse(String errorMsg) {
ConfigPubishResponse configPubishResponse = new ConfigPubishResponse();
configPubishResponse.setResultCode(ResponseCode.FAIL.getCode());
configPubishResponse.setMessage(errorMsg);
return configPubishResponse;
public static ConfigPublishResponse buildFailResponse(String errorMsg) {
ConfigPublishResponse configPublishResponse = new ConfigPublishResponse();
configPublishResponse.setResultCode(ResponseCode.FAIL.getCode());
configPublishResponse.setMessage(errorMsg);
return configPublishResponse;
}
}

View File

@ -57,7 +57,7 @@ public class ConfigQueryResponse extends Response {
*
* @param errorCode errorCode.
* @param message message.
* @return
* @return response.
*/
public static ConfigQueryResponse buildFailResponse(int errorCode, String message) {
ConfigQueryResponse response = new ConfigQueryResponse();
@ -66,10 +66,10 @@ public class ConfigQueryResponse extends Response {
}
/**
* Buidl success resposne.
* Build success response.
*
* @param content content.
* @return
* @return response.
*/
public static ConfigQueryResponse buildSuccessResponse(String content) {
ConfigQueryResponse response = new ConfigQueryResponse();

View File

@ -32,18 +32,18 @@ public class ConfigRemoveResponse extends Response {
}
/**
* Buidl success resposne.
* Build success response.
*
* @return
* @return response.
*/
public static ConfigRemoveResponse buildSuccessResponse() {
return new ConfigRemoveResponse();
}
/**
* Buidl fail resposne.
* Build fail response.
*
* @return
* @return response.
*/
public static ConfigRemoveResponse buildFailResponse(String errorMsg) {
ConfigRemoveResponse removeResponse = new ConfigRemoveResponse();

View File

@ -107,11 +107,16 @@ public class NacosException extends Exception {
*/
public static final int CLIENT_INVALID_PARAM = -400;
/**
* invalid param参数错误.
*/
public static final int CLIENT_DISCONNECT = -401;
/**
* over client threshold超过server端的限流阈值.
*/
public static final int CLIENT_OVER_THRESHOLD = -503;
/*
* server error code.
* 400 403 throw exception to user
@ -160,4 +165,6 @@ public class NacosException extends Exception {
* ome exceptions that occurred when the use the Nacos RestTemplate and Nacos AsyncRestTemplate.
*/
public static final int HTTP_CLIENT_ERROR_CODE = -500;
}

View File

@ -82,7 +82,7 @@ public class DefaultRequestFuture implements RequestFuture {
this.requestId = requestId;
this.connectionId = connectionId;
if (requestCallBack != null) {
this.timeoutFuture = RpcScheduledExecutor.TIMEOUT_SHEDULER
this.timeoutFuture = RpcScheduledExecutor.TIMEOUT_SCHEDULER
.schedule(new TimeoutHandler(), requestCallBack.getTimeout(), TimeUnit.MILLISECONDS);
}
this.timeoutInnerTrigger = timeoutInnerTrigger;

View File

@ -31,14 +31,14 @@ public interface RequestCallBack<T extends Response> {
/**
* get executor on callback.
*
* @return
* @return executor.
*/
public Executor getExecutor();
/**
* get timeout mills.
*
* @return
* @return timeouts.
*/
public long getTimeout();

View File

@ -31,7 +31,7 @@ public interface RequestFuture {
/**
* check that it is done or not..
* @return
* @return is done .
*/
boolean isDone();

View File

@ -87,7 +87,7 @@ public interface Requester {
/**
* check this requester is busy.
*
* @return
* @return busy or not.
*/
public boolean isBusy();
}

View File

@ -27,17 +27,11 @@ import java.util.concurrent.ThreadFactory;
*/
public class RpcScheduledExecutor extends ScheduledThreadPoolExecutor {
public static final RpcScheduledExecutor TIMEOUT_SHEDULER = new RpcScheduledExecutor(1,
public static final RpcScheduledExecutor TIMEOUT_SCHEDULER = new RpcScheduledExecutor(0,
"com.alibaba.nacos.remote.TimerScheduler");
/**
* executor to execute future request.
*/
public static final RpcScheduledExecutor AYNS_REQUEST_EXECUTOR = new RpcScheduledExecutor(
Runtime.getRuntime().availableProcessors(), "com.alibaba.nacos.remote.RpcRequestExecutor");
public static final RpcScheduledExecutor COMMON_SERVER_EXECUTOR = new RpcScheduledExecutor(
Runtime.getRuntime().availableProcessors(), "com.alibaba.nacos.remote.ServerCommonScheduler");
0, "com.alibaba.nacos.remote.ServerCommonScheduler");
public RpcScheduledExecutor(int corePoolSize, final String threadName) {
super(corePoolSize, new ThreadFactory() {

View File

@ -115,6 +115,6 @@ public abstract class Request {
@Override
public String toString() {
return "Request{" + "headers=" + headers + ", requestId='" + requestId + '\'' + '}';
return this.getClass().getSimpleName() + "{" + "headers=" + headers + ", requestId='" + requestId + '\'' + '}';
}
}

View File

@ -52,9 +52,9 @@ public abstract class Response {
}
/**
* Check Response is Successd.
* Check Response is Successed.
*
* @return
* @return success or not.
*/
public boolean isSuccess() {
return this.resultCode == ResponseCode.SUCCESS.getCode();

View File

@ -51,14 +51,6 @@ public class CacheData {
this.isInitializing = isInitializing;
}
public boolean isListenSuccess() {
return isListenSuccess;
}
public void setListenSuccess(boolean listenSuccess) {
isListenSuccess = listenSuccess;
}
public String getMd5() {
return md5;
}
@ -285,6 +277,21 @@ public class CacheData {
return content;
}
/**
* 1.first add listener.default is false;need to check.
* 2.receive config change notify,set false;need to check.
* 3.last listener is remove,set to false;need to check
*
* @return
*/
public boolean isSync() {
return isSync;
}
public void setSync(boolean sync) {
isSync = sync;
}
public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group) {
if (null == dataId || null == group) {
throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group);
@ -350,7 +357,10 @@ public class CacheData {
private volatile boolean isInitializing = true;
private volatile boolean isListenSuccess = false;
/**
* if is sync with the server.
*/
private volatile boolean isSync = false;
private String type;

View File

@ -27,13 +27,12 @@ import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeBatchListenResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.config.common.GroupKey;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
@ -58,7 +57,6 @@ import com.alibaba.nacos.common.remote.client.ConnectionEventListener;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
@ -116,29 +114,11 @@ public class ClientWorker implements Closeable {
for (Listener listener : listeners) {
cache.addListener(listener);
}
if (!cache.isListenSuccess()) {
if (!cache.isSync()) {
agent.notifyListenConfig();
}
}
/**
* Remove listener.
*
* @param dataId dataId of data
* @param group group of data
* @param listener listener
*/
public void removeListener(String dataId, String group, Listener listener) {
group = null2defaultGroup(group);
CacheData cache = getCache(dataId, group);
if (null != cache) {
cache.removeListener(listener);
if (cache.getListeners().isEmpty()) {
agent.removeCache(dataId, group);
}
}
}
/**
* Add listeners for tenant.
*
@ -152,12 +132,15 @@ public class ClientWorker implements Closeable {
group = null2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
for (Listener listener : listeners) {
cache.addListener(listener);
}
if (!cache.isListenSuccess()) {
agent.notifyListenConfig();
synchronized (cache) {
for (Listener listener : listeners) {
cache.addListener(listener);
}
if (!cache.isSync()) {
agent.notifyListenConfig();
}
}
}
/**
@ -174,13 +157,38 @@ public class ClientWorker implements Closeable {
group = null2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
cache.setContent(content);
for (Listener listener : listeners) {
cache.addListener(listener);
synchronized (cache) {
cache.setContent(content);
for (Listener listener : listeners) {
cache.addListener(listener);
}
// if current cache is already at listening status,do not notify.
if (!cache.isSync()) {
agent.notifyListenConfig();
}
}
// if current cache is already at listening status,do not notify.
if (!cache.isListenSuccess()) {
agent.notifyListenConfig();
}
/**
* Remove listener.
*
* @param dataId dataId of data
* @param group group of data
* @param listener listener
*/
public void removeListener(String dataId, String group, Listener listener) {
group = null2defaultGroup(group);
CacheData cache = getCache(dataId, group);
if (null != cache) {
synchronized (cache) {
cache.removeListener(listener);
if (cache.getListeners().isEmpty()) {
cache.setSync(false);
agent.removeCache(dataId, group);
}
}
}
}
@ -198,6 +206,7 @@ public class ClientWorker implements Closeable {
if (null != cache) {
cache.removeListener(listener);
if (cache.getListeners().isEmpty()) {
cache.setSync(false);
agent.removeCache(dataId, group);
}
}
@ -415,39 +424,29 @@ public class ClientWorker implements Closeable {
this.configFilterChainManager = configFilterChainManager;
init(properties);
ServerListManager serverListManager = new ServerListManager(properties);
serverListManager.start();
if (ParamUtils.useHttpSwitch()) {
agent = new ConfigHttpTransportClient(properties, serverListManager);
} else {
agent = new ConfigRpcTransportClient(properties, serverListManager);
}
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executorService = Executors
ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setName("com.alibaba.nacos.client.Worker_" + agent.getName());
t.setDaemon(true);
return t;
}
});
agent.setExecutor(executorService);
agent.start();
}
private void refreshContentAndCheck(String groupKey, boolean notify) {
@ -464,6 +463,11 @@ public class ClientWorker implements Closeable {
if (null != ct[1]) {
cacheData.setType(ct[1]);
}
if (notify) {
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
ContentUtils.truncateContent(ct[0]), ct[1]);
}
cacheData.checkListenerMd5();
} catch (Exception e) {
LOGGER.error("refresh content and check md5 fail ,dataid={},group={},tenant={} ", cacheData.dataId,
@ -487,8 +491,7 @@ public class ClientWorker implements Closeable {
public void shutdown() throws NacosException {
String className = this.getClass().getName();
LOGGER.info("{} do shutdown begin", className);
ThreadUtils.shutdownThreadPool(executorService, LOGGER);
ThreadUtils.shutdownThreadPool(executor, LOGGER);
ThreadUtils.shutdownThreadPool(agent.executor, LOGGER);
LOGGER.info("{} do shutdown stop", className);
}
@ -500,10 +503,6 @@ public class ClientWorker implements Closeable {
this.isHealthServer = isHealthServer;
}
final ScheduledExecutorService executor;
final ScheduledExecutorService executorService;
/**
* groupKey -> cacheData.
*/
@ -529,127 +528,124 @@ public class ClientWorker implements Closeable {
private BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<Object>(1);
private Object bellItem = new Object();
private Map<String, RpcClient> rpcClientMap = new HashMap<String, RpcClient>();
public ConfigRpcTransportClient(Properties properties, ServerListManager serverListManager) {
super(properties, serverListManager);
}
private ConnectionType getConectiontype() {
private ConnectionType getConnectionType() {
ConnectionType connectionType = ConnectionType.GRPC;
String connetionType = ParamUtils.configRemoteConnectionType();
if (StringUtils.isNotBlank(connetionType)) {
ConnectionType connectionType1 = ConnectionType.valueOf(connetionType);
if (connectionType1 != null) {
connectionType = connectionType1;
String connectionTypeString = ParamUtils.configRemoteConnectionType();
if (StringUtils.isNotBlank(connectionTypeString)) {
ConnectionType connectionTypeInner = ConnectionType.valueOf(connectionTypeString);
if (connectionTypeInner != null) {
connectionType = connectionTypeInner;
}
}
return connectionType;
}
private Map<String, String> getLabels() {
private Map<String, String> getLabels() {
Map<String, String> labels = new HashMap<String, String>(2, 1);
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_CONFIG);
return labels;
}
private void initHandlerRpcClient(final RpcClient rpcClientInner) {
/*
* Register Listen Change Handler
*/
rpcClientInner.registerServerPushResponseHandler(new ServerRequestHandler() {
@Override
public Response requestReply(Request request, RequestMeta requestMeta) {
if (request instanceof ConfigChangeNotifyRequest) {
ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(),
configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
CacheData cacheData = cacheMap.get().get(groupKey);
if (cacheData != null) {
if (configChangeNotifyRequest.isContentPush()
&& cacheData.getLastModifiedTs() < configChangeNotifyRequest.getLastModifiedTs()) {
cacheData.setContent(configChangeNotifyRequest.getContent());
cacheData.setType(configChangeNotifyRequest.getType());
cacheData.checkListenerMd5();
}
cacheData.setListenSuccess(false);
notifyListenConfig();
rpcClientInner.registerServerPushResponseHandler((request, requestMeta) -> {
if (request instanceof ConfigChangeNotifyRequest) {
ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
LOGGER.info("[{}] [server-push] config changed. dataId={}, group={}", getName(),
configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup());
String groupKey = GroupKey
.getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),
configChangeNotifyRequest.getTenant());
CacheData cacheData = cacheMap.get().get(groupKey);
if (cacheData != null) {
if (configChangeNotifyRequest.isContentPush()
&& cacheData.getLastModifiedTs() < configChangeNotifyRequest.getLastModifiedTs()) {
cacheData.setContent(configChangeNotifyRequest.getContent());
cacheData.setType(configChangeNotifyRequest.getType());
cacheData.checkListenerMd5();
}
return new ConfigChangeNotifyResponse();
cacheData.setSync(false);
notifyListenConfig();
}
return null;
return new ConfigChangeNotifyResponse();
}
return null;
});
rpcClientInner.registerConnectionListener(new ConnectionEventListener() {
rpcClientInner.registerConnectionListener(new ConnectionEventListener() {
@Override
public void onConnected() {
LOGGER.info("[{}] Connected,notify listen context...", rpcClientInner.getName());
notifyListenConfig();
}
@Override
public void onDisConnect() {
String taskId = rpcClientInner.getLabels().get("taskId");
LOGGER.info("[{}] clear listen context...", rpcClientInner.getName());
LOGGER.info("[{}] DisConnected,clear listen context...", rpcClientInner.getName());
Collection<CacheData> values = cacheMap.get().values();
for (CacheData cacheData : values) {
if (taskId != null && Integer.valueOf(taskId).equals(cacheData.getTaskId())) {
cacheData.setListenSuccess(false);
cacheData.setSync(false);
continue;
}
cacheData.setListenSuccess(false);
cacheData.setSync(false);
}
}
});
rpcClientInner.init(new ServerListFactory() {
@Override
public String genNextServer() {
return ConfigRpcTransportClient.super.serverListManager.getNextServerAddr();
}
@Override
public String getCurrentServer() {
return ConfigRpcTransportClient.super.serverListManager.getCurrentServerAddr();
}
@Override
public List<String> getServerList() {
return ConfigRpcTransportClient.super.serverListManager.serverUrls;
}
});
}
@Override
public void startIntenal() throws NacosException {
public void startInternal() throws NacosException {
executor.schedule(new Runnable() {
@Override
public void run() {
try {
while (true) {
try {
listenExecutebell.poll(5L, TimeUnit.SECONDS);
executeConfigListen();
} catch (Exception e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
}
while (true) {
try {
listenExecutebell.poll(5L, TimeUnit.SECONDS);
executeConfigListen();
} catch (Exception e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
}
} catch (Throwable e) {
LOGGER.error("rpc listen task exception", e);
}
}
}, 0L, TimeUnit.MILLISECONDS);
// register server change subscriber.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
@ -671,7 +667,7 @@ public class ClientWorker implements Closeable {
}
}
@Override
public Class<? extends Event> subscribeType() {
return ServerlistChangeEvent.class;
@ -692,13 +688,16 @@ public class ClientWorker implements Closeable {
@Override
public void executeConfigListen() {
Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
for (CacheData cache : cacheMap.get().values()) {
//get listen config and remove listen config
if (!CollectionUtils.isEmpty(cache.getListeners()) && !cache.isListenSuccess()) {
if (cache.isSync()) {
continue;
}
if (!CollectionUtils.isEmpty(cache.getListeners())) {
//get listen config
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
@ -708,7 +707,7 @@ public class ClientWorker implements Closeable {
cacheDatas.add(cache);
}
} else if (CollectionUtils.isEmpty(cache.getListeners()) && cache.isListenSuccess()) {
} else if (CollectionUtils.isEmpty(cache.getListeners())) {
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
@ -721,21 +720,20 @@ public class ClientWorker implements Closeable {
}
}
}
if (!listenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
List<CacheData> listenCaches = entry.getValue();
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
configChangeListenRequest.setListen(true);
try {
RpcClient rpcClient = ensureRpcClient(taskId);
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
rpcClient, configChangeListenRequest);
if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<String>();
//handle changed keys,notify listener
if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
@ -745,25 +743,41 @@ public class ClientWorker implements Closeable {
.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
changeConfig.getTenant());
changeKeys.add(changeKey);
refreshContentAndCheck(changeKey, true);
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
this.executor.execute(() -> refreshContentAndCheck(changeKey, !isInitializing));
}
}
//handler constent configs
//handler content configs
for (CacheData cacheData : listenCaches) {
if (!changeKeys.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
cacheData.getTenant()))) {
cacheData.setListenSuccess(true);
//sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
cacheData.checkListenerMd5();
cacheData.setSync(true);
}
cacheData.setInitializing(false);
}
}
} catch (Exception e) {
LOGGER.error("async listen config change error ", e);
if (e instanceof NacosException
&& ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) {
LOGGER.warn("async listen config change fail ,client is not connected.");
} else {
LOGGER.error("async listen config change error ", e);
}
try {
Thread.sleep(10L);
} catch (InterruptedException interruptedException) {
//ignore
}
}
}
}
if (!removeListenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
String taskId = entry.getKey();
@ -775,99 +789,50 @@ public class ClientWorker implements Closeable {
boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
if (removeSuccess) {
for (CacheData cacheData : removeListenCaches) {
ClientWorker.this.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
synchronized (cacheData) {
if (cacheData.getListeners().isEmpty()) {
ClientWorker.this
.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
}
}
}
}
} catch (Exception e) {
LOGGER.error("async remove listen config change error ", e);
}
try {
Thread.sleep(10L);
} catch (InterruptedException interruptedException) {
//ignore
}
}
}
}
private RpcClient ensureRpcClient(String taskId) throws NacosException {
private synchronized RpcClient ensureRpcClient(String taskId) throws NacosException {
Map<String, String> labels = getLabels();
Map<String, String> newlabels = new HashMap<String, String>(labels);
newlabels.put("taskId", taskId);
RpcClient rpcClient = RpcClientFactory
.createClient("config-" + taskId + "-" + uuid, getConectiontype(), newlabels);
.createClient("config-" + taskId + "-" + uuid, getConnectionType(), newlabels);
if (rpcClient.isWaitInitiated()) {
initHandlerRpcClient(rpcClient);
rpcClient.start();
}
return rpcClient;
}
/**
* build config strings.
*
* @param caches caches to build config string.
* @return
*/
private List<String> buildConfigStrs(List<CacheData> caches) {
StringBuilder listenConfigsBuilder = new StringBuilder();
List<String> configStrings = new ArrayList<String>();
int index = 0;
for (CacheData cache : caches) {
index++;
listenConfigsBuilder.append(cache.dataId).append(WORD_SEPARATOR);
listenConfigsBuilder.append(cache.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cache.tenant)) {
listenConfigsBuilder.append(cache.getMd5()).append(LINE_SEPARATOR);
} else {
listenConfigsBuilder.append(cache.getMd5()).append(WORD_SEPARATOR);
listenConfigsBuilder.append(cache.getTenant()).append(LINE_SEPARATOR);
}
if (index >= 3000) {
configStrings.add(listenConfigsBuilder.toString());
listenConfigsBuilder = new StringBuilder();
index = 0;
}
}
if (listenConfigsBuilder.length() > 0) {
configStrings.add(listenConfigsBuilder.toString());
}
return configStrings;
}
/**
* build config string.
*
* @param caches caches to build config string.
* @return
*/
private String buildConfigStr(List<CacheData> caches) {
StringBuilder listenConfigsBuilder = new StringBuilder();
List<String> configStrings = new ArrayList<String>();
for (CacheData cache : caches) {
listenConfigsBuilder.append(cache.dataId).append(WORD_SEPARATOR);
listenConfigsBuilder.append(cache.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cache.tenant)) {
listenConfigsBuilder.append(cache.getMd5()).append(LINE_SEPARATOR);
} else {
listenConfigsBuilder.append(cache.getMd5()).append(WORD_SEPARATOR);
listenConfigsBuilder.append(cache.getTenant()).append(LINE_SEPARATOR);
}
}
return listenConfigsBuilder.toString();
}
/**
* build config string.
*
* @param caches caches to build config string.
* @return
* @return request.
*/
private ConfigBatchListenRequest buildConfigRequest(List<CacheData> caches) {
ConfigBatchListenRequest configChangeListenRequest = new ConfigBatchListenRequest();
for (CacheData cacheData : caches) {
configChangeListenRequest.addConfigListenContext(cacheData.group, cacheData.dataId, cacheData.tenant,
@ -878,14 +843,14 @@ public class ClientWorker implements Closeable {
@Override
public void removeCache(String dataId, String group) {
// Notify to rpc unlisten ,and remove cache if success.
// Notify to rpc un listen ,and remove cache if success.
notifyListenConfig();
}
/**
* send cancel listen config change request .
*
* @param configListenString string of remove listen config string.
* @param configChangeListenRequest request of remove listen config string.
*/
private boolean unListenConfigChange(RpcClient rpcClient, ConfigBatchListenRequest configChangeListenRequest)
throws NacosException {
@ -896,11 +861,12 @@ public class ClientWorker implements Closeable {
}
@Override
public String[] queryConfig(String dataId, String group, String tenant, long readTimeous, boolean notify)
public String[] queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)
throws NacosException {
ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
request.putHeader("notify", String.valueOf(notify));
ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(getOneRunningClient(), request);
ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(getOneRunningClient(), request,
readTimeouts);
String[] ct = new String[2];
if (response.isSuccess()) {
@ -930,15 +896,20 @@ public class ClientWorker implements Closeable {
}
}
private Response requestProxy(RpcClient rpcClientInner, Request request) throws NacosException {
return requestProxy(rpcClientInner, request, 3000L);
}
private Response requestProxy(RpcClient rpcClientInner, Request request, long timeoutMills)
throws NacosException {
try {
request.putAllHeader(super.getSecurityHeaders());
request.putAllHeader(super.getSpasHeaders());
} catch (Exception e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
JsonObject asJsonObjectTemp = new Gson().toJsonTree(request).getAsJsonObject();
asJsonObjectTemp.remove("headers");
asJsonObjectTemp.remove("requestId");
@ -947,9 +918,9 @@ public class ClientWorker implements Closeable {
throw new NacosException(NacosException.CLIENT_OVER_THRESHOLD,
"More than client-side current limit threshold");
}
return rpcClientInner.request(request);
return rpcClientInner.request(request, timeoutMills);
}
RpcClient getOneRunningClient() throws NacosException {
return ensureRpcClient("0");
}
@ -962,7 +933,7 @@ public class ClientWorker implements Closeable {
request.putAdditonalParam("tag", tag);
request.putAdditonalParam("appName", appName);
request.putAdditonalParam("betaIps", betaIps);
ConfigPubishResponse response = (ConfigPubishResponse) requestProxy(getOneRunningClient(), request);
ConfigPublishResponse response = (ConfigPublishResponse) requestProxy(getOneRunningClient(), request);
return response.isSuccess();
} catch (Exception e) {
LOGGER.warn("[{}] [publish-single] error, dataId={}, group={}, tenant={}, code={}, msg={}",
@ -972,8 +943,8 @@ public class ClientWorker implements Closeable {
}
@Override
public boolean removeConfig(String dataid, String group, String tenat, String tag) throws NacosException {
ConfigRemoveRequest request = new ConfigRemoveRequest(dataid, group, tenat, tag);
public boolean removeConfig(String dataId, String group, String tenant, String tag) throws NacosException {
ConfigRemoveRequest request = new ConfigRemoveRequest(dataId, group, tenant, tag);
ConfigRemoveResponse response = (ConfigRemoveResponse) requestProxy(getOneRunningClient(), request);
return response.isSuccess();
}
@ -996,7 +967,7 @@ public class ClientWorker implements Closeable {
}
@Override
public void startIntenal() {
public void startInternal() {
executor.scheduleWithFixedDelay(new Runnable() {
@Override
@ -1022,14 +993,14 @@ public class ClientWorker implements Closeable {
@Override
public void executeConfigListen() {
// Dispatch taskes.
// Dispatch tasks.
int listenerSize = cacheMap.get().size();
// Round up the longingTaskCount.
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// The task list is no order.So it maybe has issues when changing.
executorService.execute(new LongPollingRunnable(agent, i, this));
executor.execute(new LongPollingRunnable(agent, i, this));
}
currentLongingTaskCount = longingTaskCount;
}
@ -1060,7 +1031,7 @@ public class ClientWorker implements Closeable {
params.put("group", group);
params.put("tenant", tenant);
}
Map<String, String> headers = new HashMap<String, String>(16);
headers.put("notify", String.valueOf(notify));
result = httpGet(Constants.CONFIG_CONTROLLER_PATH, headers, params, agent.getEncode(), readTimeout);
@ -1106,7 +1077,7 @@ public class ClientWorker implements Closeable {
}
}
}
private void assembleHttpParams(Map<String, String> params, Map<String, String> headers) throws Exception {
Map<String, String> securityHeaders = super.getSecurityHeaders();
if (securityHeaders != null) {
@ -1130,7 +1101,7 @@ public class ClientWorker implements Closeable {
if (signHeaders != null) {
headers.putAll(signHeaders);
}
}
@Override
@ -1168,7 +1139,7 @@ public class ClientWorker implements Closeable {
HttpRestResult<String> result = null;
try {
result = httpPost(url, headers, params, encode, POST_TIMEOUT);
} catch (Exception ex) {
LOGGER.warn("[{}] [publish-single] exception, dataId={}, group={}, msg={}", agent.getName(), dataId,
@ -1190,7 +1161,7 @@ public class ClientWorker implements Closeable {
return false;
}
}
private HttpRestResult<String> httpPost(String path, Map<String, String> headers,
Map<String, String> paramValues, String encoding, long readTimeoutMs) throws Exception {
if (headers == null) {
@ -1199,7 +1170,7 @@ public class ClientWorker implements Closeable {
assembleHttpParams(paramValues, headers);
return agent.httpPost(path, headers, paramValues, encoding, readTimeoutMs);
}
private HttpRestResult<String> httpGet(String path, Map<String, String> headers,
Map<String, String> paramValues, String encoding, long readTimeoutMs) throws Exception {
if (headers == null) {
@ -1208,7 +1179,7 @@ public class ClientWorker implements Closeable {
assembleHttpParams(paramValues, headers);
return agent.httpGet(path, headers, paramValues, encoding, readTimeoutMs);
}
private HttpRestResult<String> httpDelete(String path, Map<String, String> headers,
Map<String, String> paramValues, String encoding, long readTimeoutMs) throws Exception {
if (headers == null) {
@ -1256,7 +1227,7 @@ public class ClientWorker implements Closeable {
return false;
}
}
}
/**
@ -1267,9 +1238,9 @@ public class ClientWorker implements Closeable {
private final int taskId;
private HttpAgent httpAgent;
private ConfigTransportClient configTransportClient;
public LongPollingRunnable(HttpAgent httpAgent, int taskId, ConfigTransportClient configTransportClient) {
this.taskId = taskId;
this.httpAgent = httpAgent;
@ -1313,8 +1284,9 @@ public class ClientWorker implements Closeable {
tenant = key[2];
}
try {
String[] ct = getServerConfig(dataId, group, tenant, 3000L, true);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
String[] ct = getServerConfig(dataId, group, tenant, 3000L, !cache.isInitializing());
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
@ -1338,13 +1310,13 @@ public class ClientWorker implements Closeable {
}
inInitializingCacheList.clear();
executorService.execute(this);
configTransportClient.executor.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
configTransportClient.executor.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
@ -1397,18 +1369,18 @@ public class ClientWorker implements Closeable {
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
Map<String, String> headers = new HashMap<String, String>(2);
headers.put("Long-Pulling-Timeout", "" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.put("Long-Pulling-Timeout-No-Hangup", "true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
try {
//assemble headers.
Map<String, String> securityHeaders = configTransportClient.getSecurityHeaders();
if (securityHeaders != null) {
@ -1432,12 +1404,12 @@ public class ClientWorker implements Closeable {
// In order to prevent the server from handling the delay of the client's long task,
// increase the client's read timeout to avoid this problem.
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
HttpRestResult<String> result = httpAgent
.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, httpAgent.getEncode(),
readTimeoutMs);
if (result.ok()) {
setHealthServer(true);
return parseUpdateDataIdResponse(httpAgent, result.getData());
@ -1464,15 +1436,15 @@ public class ClientWorker implements Closeable {
if (StringUtils.isBlank(response)) {
return Collections.emptyList();
}
try {
response = URLDecoder.decode(response, "UTF-8");
} catch (Exception e) {
LOGGER.error("[" + httpAgent.getName() + "] [polling-resp] decode modifiedDataIdsString error", e);
}
List<String> updateList = new LinkedList<String>();
for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {
if (!StringUtils.isBlank(dataIdAndGroup)) {
String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);
@ -1499,7 +1471,7 @@ public class ClientWorker implements Closeable {
/**
* get client worker agent.
*
* @return
* @return agent name.
*/
public String getAgentName() {
return this.agent.getName();

View File

@ -131,7 +131,7 @@ public abstract class ConfigTransportClient {
/**
* get common header.
*
* @return
* @return headers.
*/
protected Map<String, String> getCommonHeader() {
Map<String, String> headers = new HashMap<String, String>(16);
@ -244,7 +244,7 @@ public abstract class ConfigTransportClient {
}
startIntenal();
startInternal();
}
/**
@ -252,7 +252,7 @@ public abstract class ConfigTransportClient {
*
* @throws NacosException exception may throw.
*/
public abstract void startIntenal() throws NacosException;
public abstract void startInternal() throws NacosException;
/**
* get client name.
@ -264,7 +264,7 @@ public abstract class ConfigTransportClient {
/**
* get encode.
*
* @return
* @return encode.
*/
public String getEncode() {
return this.encode;
@ -273,7 +273,7 @@ public abstract class ConfigTransportClient {
/**
* get tenant.
*
* @return
* @return tenant.
*/
public String getTenant() {
return this.tenant;
@ -286,8 +286,6 @@ public abstract class ConfigTransportClient {
/**
* listen change .
*
* @return
*/
public abstract void executeConfigListen();

View File

@ -211,7 +211,7 @@ public class ParamUtils {
/**
* check whether still using http .
*
* @return
* @return use http transport .
*/
public static boolean useHttpSwitch() {
String useHttpSwitch = System.getProperty("clientworker.use.http.switch");
@ -221,7 +221,7 @@ public class ParamUtils {
/**
* get connection type for remote.
*
* @return
* @return connection type.
*/
public static String configRemoteConnectionType() {
String remoteConnectionType = System.getProperty("nacos.remote.config.connectiontype");

View File

@ -31,8 +31,8 @@
<DefaultRolloverStrategy max="${sys:JM.LOG.RETAIN.COUNT:-7}"/>
</RollingFile>
<RollingFile name="REMOTE_LOG_FILE" fileName="${sys:JM.LOG.PATH}/rpc.log"
filePattern="${sys:JM.LOG.PATH}/rpc.log.%d{yyyy-MM-dd}.%i">
<RollingFile name="REMOTE_LOG_FILE" fileName="${sys:JM.LOG.PATH}/nacos/remote.log"
filePattern="${sys:JM.LOG.PATH}/nacos/remote.log.%d{yyyy-MM-dd}.%i">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n</Pattern>
</PatternLayout>
@ -44,20 +44,6 @@
<DefaultRolloverStrategy max="${sys:JM.LOG.RETAIN.COUNT:-7}"/>
</RollingFile>
<RollingFile name="GRPC_LOG_FILE" fileName="${sys:JM.LOG.PATH}/grpc.log"
filePattern="${sys:JM.LOG.PATH}/grpc.log.%d{yyyy-MM-dd}.%i">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="${sys:JM.LOG.FILE.SIZE:-10MB}"/>
</Policies>
<DefaultRolloverStrategy max="${sys:JM.LOG.RETAIN.COUNT:-7}"/>
</RollingFile>
<RollingFile name="NAMING_LOG_FILE" fileName="${sys:JM.LOG.PATH}/nacos/naming.log"
filePattern="${sys:JM.LOG.PATH}/nacos/naming.log.%d{yyyy-MM-dd}.%i">
@ -84,13 +70,7 @@
additivity="false">
<AppenderRef ref="REMOTE_LOG_FILE"/>
</Logger>
<Logger name="com.alibaba.nacos.client.grpc" level="${sys:com.alibaba.nacos.grpc.log.level:-info}"
additivity="false">
<AppenderRef ref="GRPC_LOG_FILE"/>
</Logger>
<Logger name="com.alibaba.nacos.client.config" level="${sys:com.alibaba.nacos.config.log.level:-info}"
additivity="false">
<AppenderRef ref="CONFIG_LOG_FILE"/>

View File

@ -53,10 +53,10 @@
</appender>
<appender name="REMOTE_LOG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${JM.LOG.PATH}/remote.log</file>
<file>${JM.LOG.PATH}/nacos/remote.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${JM.LOG.PATH}/remote.log.%i</fileNamePattern>
<fileNamePattern>${JM.LOG.PATH}/nacos/remote.log.%i</fileNamePattern>
<maxIndex>${JM.LOG.RETAIN.COUNT:-7}</maxIndex>
</rollingPolicy>
@ -76,9 +76,9 @@
</logger>
<Logger name="com.alibaba.nacos.common.remote.client" level="${sys:com.alibaba.nacos.config.log.level:-info}"
<Logger name="com.alibaba.nacos.common.remote.client" level="${com.alibaba.nacos.log.level:-info}"
additivity="false">
<AppenderRef ref="REMOTE_LOG_FILE"/>
<appender-ref ref="REMOTE_LOG_FILE"/>
</Logger>

View File

@ -24,6 +24,7 @@ import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.request.ServerCheckRequest;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.response.ConnectionUnregisterResponse;
import com.alibaba.nacos.api.remote.response.Response;
@ -60,7 +61,7 @@ import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR;
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class RpcClient implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
private static final Logger LOGGER = LoggerFactory.getLogger("com.alibaba.nacos.common.remote.client");
private ServerListFactory serverListFactory;
@ -69,7 +70,7 @@ public abstract class RpcClient implements Closeable {
protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<RpcClientStatus>(
RpcClientStatus.WAIT_INIT);
protected ScheduledExecutorService executorService;
protected ScheduledExecutorService executor;
protected volatile Connection currentConnection;
@ -77,6 +78,10 @@ public abstract class RpcClient implements Closeable {
private String name;
private static final int RETRY_TIMES = 3;
private static final long DEFAULT_TIMEOUT_MILLS = 3000L;
/**
* listener called where connect status changed.
*/
@ -127,7 +132,12 @@ public abstract class RpcClient implements Closeable {
}
LoggerUtils.printIfInfoEnabled(LOGGER, "Notify disconnected event to listeners");
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
connectionEventListener.onDisConnect();
try {
connectionEventListener.onDisConnect();
} catch (Throwable throwable) {
LoggerUtils.printIfErrorEnabled(LOGGER, "notify disconnect listener error,listener ={}",
connectionEventListener.getClass().getName());
}
}
}
@ -140,7 +150,12 @@ public abstract class RpcClient implements Closeable {
}
LoggerUtils.printIfInfoEnabled(LOGGER, "Notify connected event to listeners.");
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
connectionEventListener.onConnected();
try {
connectionEventListener.onConnected();
} catch (Throwable throwable) {
LoggerUtils.printIfErrorEnabled(LOGGER, "notify connect listener error,listener ={}",
connectionEventListener.getClass().getName());
}
}
}
@ -207,7 +222,7 @@ public abstract class RpcClient implements Closeable {
return;
}
executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
executor = new ScheduledThreadPoolExecutor(0, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
@ -218,7 +233,7 @@ public abstract class RpcClient implements Closeable {
});
// connection event consumer.
executorService.submit(new Runnable() {
executor.submit(new Runnable() {
@Override
public void run() {
while (true) {
@ -241,7 +256,7 @@ public abstract class RpcClient implements Closeable {
Connection connectToServer = null;
rpcClientStatus.set(RpcClientStatus.STARTING);
int startUpRetryTimes = 3;
int startUpRetryTimes = RETRY_TIMES;
while (startUpRetryTimes > 0 && connectToServer == null) {
try {
startUpRetryTimes--;
@ -268,38 +283,7 @@ public abstract class RpcClient implements Closeable {
switchServerAsync();
}
registerServerPushResponseHandler(new ServerRequestHandler() {
@Override
public Response requestReply(Request request, RequestMeta requestMeta) {
if (request instanceof ConnectResetRequest) {
try {
synchronized (this) {
if (isRunning()) {
ConnectResetRequest connectResetRequest = (ConnectResetRequest) request;
if (StringUtils.isNotBlank(connectResetRequest.getServerIp()) && NumberUtil
.isDigits(connectResetRequest.getServerPort())) {
ServerInfo serverInfo = new ServerInfo();
serverInfo.setServerIp(connectResetRequest.getServerIp());
serverInfo.setServerPort(
Integer.valueOf(connectResetRequest.getServerPort()) + rpcPortOffset());
switchServerAsync(serverInfo);
} else {
switchServerAsync();
}
}
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "Switch server error ", e);
}
return new ConnectResetResponse();
}
return null;
}
});
registerServerPushResponseHandler(new ConnectResetRequestHandler());
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
@ -314,9 +298,43 @@ public abstract class RpcClient implements Closeable {
}
class ConnectResetRequestHandler implements ServerRequestHandler {
@Override
public Response requestReply(Request request, RequestMeta requestMeta) {
if (request instanceof ConnectResetRequest) {
try {
synchronized (RpcClient.this) {
if (isRunning()) {
ConnectResetRequest connectResetRequest = (ConnectResetRequest) request;
if (StringUtils.isNotBlank(connectResetRequest.getServerIp()) && NumberUtil
.isDigits(connectResetRequest.getServerPort())) {
ServerInfo serverInfo = new ServerInfo();
serverInfo.setServerIp(connectResetRequest.getServerIp());
serverInfo.setServerPort(
Integer.valueOf(connectResetRequest.getServerPort()) + rpcPortOffset());
switchServerAsync(serverInfo, false);
} else {
switchServerAsync();
}
}
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "Switch server error ", e);
}
return new ConnectResetResponse();
}
return null;
}
}
@Override
public void shutdown() throws NacosException {
executorService.shutdown();
executor.shutdown();
rpcClientStatus.set(RpcClientStatus.SHUTDOWN);
closeConnection(currentConnection);
}
@ -325,20 +343,38 @@ public abstract class RpcClient implements Closeable {
private volatile AtomicBoolean switchingFlag = new AtomicBoolean(false);
private boolean serverCheck() {
ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
try {
Response response = this.currentConnection.request(serverCheckRequest, buildMeta());
return response == null ? false : response.isSuccess();
} catch (NacosException e) {
//ignore
}
return false;
}
public void switchServerAsyncOnRequestFail() {
switchServerAsync(null, true);
}
public void switchServerAsync() {
switchServerAsync(null);
switchServerAsync(null, false);
}
/**
* switch server .
*/
protected void switchServerAsync(final ServerInfo recommendServerInfo) {
protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {
//return if is in switching of other thread.
if (switchingFlag.get()) {
return;
}
executorService.submit(new Runnable() {
LoggerUtils.printIfInfoEnabled(LOGGER,
String.format("[%s] Submit server switch task : %s,onRequestFail=%s", name, recommendServerInfo,
onRequestFail));
executor.submit(new Runnable() {
@Override
public void run() {
@ -350,6 +386,17 @@ public abstract class RpcClient implements Closeable {
if (!innerLock) {
return;
}
if (onRequestFail && serverCheck()) {
LoggerUtils.printIfInfoEnabled(LOGGER,
String.format("[%s] Server check success : %s", name, recommendServer));
rpcClientStatus.set(RpcClientStatus.RUNNING);
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER,
String.format("[%s] Execute server switch task : %s", name, recommendServer));
switchingFlag.compareAndSet(false, true);
// loop until start client success.
boolean switchSuccess = false;
@ -361,20 +408,20 @@ public abstract class RpcClient implements Closeable {
//1.get a new server
ServerInfo serverInfo = null;
//2.create a new channel to new server
try {
serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
Connection connectNew = connectToServer(serverInfo);
if (connectNew != null) {
//2.create a new channel to new server
Connection connectionNew = connectToServer(serverInfo);
if (connectionNew != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
String.format("[%s] success to connect server : %s", name, serverInfo));
//successfully create a new connect.
if (currentConnection != null) {
//set current connection to enable connection event.
currentConnection.setAbandon(true);
closeConnection(currentConnection);
}
currentConnection = connectNew;
currentConnection = connectionNew;
rpcClientStatus.set(RpcClientStatus.RUNNING);
switchSuccess = true;
boolean s = eventLinkedBlockingQueue
@ -382,9 +429,9 @@ public abstract class RpcClient implements Closeable {
return;
}
//close connetion if client is already shutdown.
//close connection if client is already shutdown.
if (isShutdown()) {
closeConnection(connectNew);
closeConnection(currentConnection);
}
lastException = null;
@ -401,7 +448,7 @@ public abstract class RpcClient implements Closeable {
"[%s] fail to connect server,after trying %s times, last try server is %s", name,
reConnectTimes, serverInfo));
if (Integer.MAX_VALUE == retryTurns) {
retryTurns = 10;
retryTurns = 50;
} else {
retryTurns++;
}
@ -410,10 +457,10 @@ public abstract class RpcClient implements Closeable {
reConnectTimes++;
try {
//sleep 100 millsecond to switch next server.
//sleep x milliseconds to switch next server.
if (!isRunning()) {
// first round ,try servers at a delay 100ms;second round ,200ms; max delays 1s. to be reconsidered.基本上会快速收敛到几个可用的IP
Thread.sleep(Math.min(retryTurns + 1, 10) * 100L);
// first round ,try servers at a delay 100ms;second round ,200ms; max delays 5s. to be reconsidered.
Thread.sleep(Math.min(retryTurns + 1, 50) * 100L);
}
} catch (InterruptedException e) {
// Do nothing.
@ -475,7 +522,7 @@ public abstract class RpcClient implements Closeable {
* @return response from server.
*/
public Response request(Request request) throws NacosException {
return request(request, 3000L);
return request(request, DEFAULT_TIMEOUT_MILLS);
}
/**
@ -485,13 +532,14 @@ public abstract class RpcClient implements Closeable {
* @return response from server.
*/
public Response request(Request request, long timeoutMills) throws NacosException {
int retryTimes = 3;
int retryTimes = 1;
Response response = null;
Exception exceptionToThrow = null;
while (retryTimes > 0) {
long start = System.currentTimeMillis();
while (retryTimes < RETRY_TIMES && (System.currentTimeMillis() - start) < timeoutMills) {
try {
if (this.currentConnection == null || !isRunning()) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected.");
throw new NacosException(NacosException.CLIENT_DISCONNECT, "Client not connected.");
}
response = this.currentConnection.request(request, buildMeta());
@ -509,50 +557,69 @@ public abstract class RpcClient implements Closeable {
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "Fail to send request, request={}, errorMessage={}", request,
e.getMessage());
exceptionToThrow = e;
if (e instanceof NacosException
&& ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) {
// Do nothing.
} else {
LoggerUtils
.printIfErrorEnabled(LOGGER, "send request fail, request={}, retryTimes={},errorMessage={}",
request, retryTimes, e.getMessage());
}
}
retryTimes--;
retryTimes++;
}
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
switchServerAsyncOnRequestFail();
}
if (exceptionToThrow != null) {
throw new NacosException(SERVER_ERROR, exceptionToThrow);
throw (exceptionToThrow instanceof NacosException) ? (NacosException) exceptionToThrow
: new NacosException(SERVER_ERROR, exceptionToThrow);
}
return null;
}
/**
* send aync request.
* send async request.
*
* @param request request.
*/
public void asyncRequest(Request request, RequestCallBack callback) throws NacosException {
int retryTimes = 3;
int retryTimes = 0;
Exception exceptionToThrow = null;
while (retryTimes > 0) {
long start = System.currentTimeMillis();
while (retryTimes < RETRY_TIMES && System.currentTimeMillis() > start + callback.getTimeout()) {
try {
if (this.currentConnection == null) {
if (this.currentConnection == null || !isRunning()) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected.");
}
this.currentConnection.asyncRequest(request, buildMeta(), callback);
return;
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "Fail to send request, request={}, error Message={}", request,
e.getMessage());
exceptionToThrow = e;
if (e instanceof NacosException
&& ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) {
// Do nothing.
} else {
LoggerUtils
.printIfErrorEnabled(LOGGER, "send request fail, request={}, retryTimes={},errorMessage={}",
request, retryTimes, e.getMessage());
}
}
retryTimes--;
retryTimes++;
}
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsyncOnRequestFail();
}
if (exceptionToThrow != null) {
throw new NacosException(SERVER_ERROR, exceptionToThrow);
throw (exceptionToThrow instanceof NacosException) ? (NacosException) exceptionToThrow
: new NacosException(SERVER_ERROR, exceptionToThrow);
}
}
@ -563,11 +630,38 @@ public abstract class RpcClient implements Closeable {
* @return request future.
*/
public RequestFuture requestFuture(Request request) throws NacosException {
if (this.currentConnection == null) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected.");
int retryTimes = 0;
long start = System.currentTimeMillis();
Exception exceptionToThrow = null;
while (retryTimes < RETRY_TIMES && System.currentTimeMillis() > start + DEFAULT_TIMEOUT_MILLS) {
try {
if (this.currentConnection == null || !isRunning()) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected.");
}
RequestFuture requestFuture = this.currentConnection.requestFuture(request, buildMeta());
return requestFuture;
} catch (Exception e) {
exceptionToThrow = e;
if (e instanceof NacosException
&& ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) {
// Do nothing.
} else {
LoggerUtils
.printIfErrorEnabled(LOGGER, "send request fail, request={}, retryTimes={},errorMessage={}",
request, retryTimes, e.getMessage());
}
}
}
RequestFuture requestFuture = this.currentConnection.requestFuture(request, buildMeta());
return requestFuture;
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsyncOnRequestFail();
}
if (exceptionToThrow != null) {
throw (exceptionToThrow instanceof NacosException) ? (NacosException) exceptionToThrow
: new NacosException(SERVER_ERROR, exceptionToThrow);
}
return null;
}
@ -588,11 +682,19 @@ public abstract class RpcClient implements Closeable {
*/
protected Response handleServerRequest(final Request request, final RequestMeta meta) {
LoggerUtils.printIfInfoEnabled(LOGGER, "receive server push request,request={},requestId={}",
request.getClass().getSimpleName(), request.getRequestId());
for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
Response response = serverRequestHandler.requestReply(request, meta);
if (response != null) {
return response;
try {
Response response = serverRequestHandler.requestReply(request, meta);
if (response != null) {
return response;
}
} catch (Exception e) {
LoggerUtils.printIfInfoEnabled(LOGGER, "handleServerRequest:{}, errorMessage={}",
serverRequestHandler.getClass().getName(), e.getMessage());
}
}
return null;
}

View File

@ -68,7 +68,7 @@ public class RpcClientFactory {
*
* @param clientName client name.
* @param connectionType client type.
* @return
* @return rpc client.
*/
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels) {
String clientNameInner = clientName;
@ -97,7 +97,7 @@ public class RpcClientFactory {
*
* @param clientName client name.
* @param connectionType client type.
* @return
* @return rpc client.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Map<String, String> labels) {

View File

@ -28,20 +28,20 @@ public interface ServerListFactory {
/**
* switch to a new server and get it.
*
* @return
* @return server " ip:port".
*/
String genNextServer();
/**
* get current server.
* @return
* @return server " ip:port".
*/
String getCurrentServer();
/**
* get current server.
*
* @return
* @return servers.
*/
List<String> getServerList();

View File

@ -34,6 +34,7 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.GrpcUtil;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,8 +71,9 @@ public abstract class GrpcClient extends RpcClient {
*/
private RequestGrpc.RequestFutureStub createNewChannelStub(String serverIp, int serverPort) {
ManagedChannel managedChannelTemp = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext()
.build();
ManagedChannelBuilder<?> o = ManagedChannelBuilder.forAddress(serverIp, serverPort)
.executor(GrpcUtil.SHARED_CHANNEL_EXECUTOR.create()).usePlaintext();
ManagedChannel managedChannelTemp = o.build();
RequestGrpc.RequestFutureStub grpcServiceStubTemp = RequestGrpc.newFutureStub(managedChannelTemp);
@ -131,6 +133,7 @@ public abstract class GrpcClient extends RpcClient {
GrpcUtils.PlainRequest parse = GrpcUtils.parse(payload);
final Request request = (Request) parse.getBody();
if (request != null) {
try {
Response response = handleServerRequest(request, parse.metadata);
if (response != null) {
@ -146,6 +149,7 @@ public abstract class GrpcClient extends RpcClient {
payload.toString());
sendResponse(request.getRequestId(), false);
}
}
} catch (Exception e) {
@ -157,8 +161,10 @@ public abstract class GrpcClient extends RpcClient {
@Override
public void onError(Throwable throwable) {
if (isRunning() && !grpcConn.isAbandon()) {
LoggerUtils.printIfErrorEnabled(LOGGER, "Request stream error, switch server", throwable);
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
LoggerUtils.printIfErrorEnabled(LOGGER, "Request stream error, switch server,error={}", throwable);
if (throwable instanceof StatusRuntimeException) {
Status.Code code = ((StatusRuntimeException) throwable).getStatus().getCode();
if (Status.UNAVAILABLE.getCode().equals(code) || Status.CANCELLED.getCode().equals(code)) {
@ -168,20 +174,24 @@ public abstract class GrpcClient extends RpcClient {
}
}
} else {
LoggerUtils.printIfWarnEnabled(LOGGER, "Client is not running status, ignore error event");
LoggerUtils.printIfWarnEnabled(LOGGER, "ignore error event,isRunning:{},isAbandon={}", isRunning,
isAbandon);
}
}
@Override
public void onCompleted() {
if (isRunning() && !grpcConn.isAbandon()) {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
LoggerUtils.printIfErrorEnabled(LOGGER, "Request stream onCompleted, switch server");
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
} else {
LoggerUtils.printIfErrorEnabled(LOGGER, "Client is not running status, ignore complete event");
LoggerUtils.printIfInfoEnabled(LOGGER, "ignore complete event,isRunning:{},isAbandon={}", isRunning,
isAbandon);
}
}
@ -215,7 +225,7 @@ public abstract class GrpcClient extends RpcClient {
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo);
GrpcConnection grpcConn = new GrpcConnection(serverInfo, super.executor);
//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

View File

@ -37,6 +37,7 @@ import org.checkerframework.checker.nullness.compatqual.NullableDecl;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -53,6 +54,8 @@ public class GrpcConnection extends Connection {
*/
protected ManagedChannel channel;
Executor executor;
/**
* stub to send request.
*/
@ -60,8 +63,9 @@ public class GrpcConnection extends Connection {
protected StreamObserver<Payload> payloadStreamObserver;
public GrpcConnection(RpcClient.ServerInfo serverInfo) {
public GrpcConnection(RpcClient.ServerInfo serverInfo, Executor executor) {
super(serverInfo);
this.executor = executor;
}
@Override
@ -72,7 +76,6 @@ public class GrpcConnection extends Connection {
@Override
public Response request(Request request, RequestMeta requestMeta, long timeouts) throws NacosException {
Payload grpcRequest = GrpcUtils.convert(request, requestMeta);
ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
Payload grpcResponse = null;
try {
@ -155,16 +158,16 @@ public class GrpcConnection extends Connection {
public void onFailure(Throwable throwable) {
if (throwable instanceof CancellationException) {
requestCallBack.onException(
new TimeoutException("Timeout after " + requestCallBack.getTimeout() + " millseconds."));
new TimeoutException("Timeout after " + requestCallBack.getTimeout() + " milliseconds."));
} else {
requestCallBack.onException(throwable);
}
}
}, RpcScheduledExecutor.AYNS_REQUEST_EXECUTOR);
}, this.executor);
// set timeout future.
ListenableFuture<Payload> payloadListenableFuture = Futures
.withTimeout(requestFuture, requestCallBack.getTimeout(), TimeUnit.MILLISECONDS,
RpcScheduledExecutor.TIMEOUT_SHEDULER);
RpcScheduledExecutor.TIMEOUT_SCHEDULER);
}

View File

@ -16,6 +16,8 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosDeserializationException;
import com.alibaba.nacos.api.exception.runtime.NacosSerializationException;
import com.alibaba.nacos.api.grpc.auto.Metadata;
@ -24,6 +26,7 @@ import com.alibaba.nacos.api.remote.PayloadRegistry;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.exception.RemoteException;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
@ -33,6 +36,7 @@ import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.charset.Charset;
/**
* grpc utils, use to parse request and response.
@ -86,7 +90,7 @@ public class GrpcUtils {
*
* @param request request.
* @param meta request meta.
* @return
* @return payload.
*/
public static Payload convert(Request request, RequestMeta meta) {
//meta.
@ -99,12 +103,13 @@ public class GrpcUtils {
.setType(request.getClass().getName());
}
builder.setMetadata(metaBuilder.build());
// request body .
request.clearHeaders();
String jsonString = toJson(request);
Payload payload = builder.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(jsonString))).build();
Payload payload = builder
.setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE))))
.build();
return payload;
}
@ -114,18 +119,18 @@ public class GrpcUtils {
*
* @param request request.
* @param meta meta
* @return
* @return payload.
*/
public static Payload convert(Request request, Metadata meta) {
Metadata buildMeta = meta.toBuilder().putAllHeaders(request.getHeaders()).build();
request.clearHeaders();
String jsonString = toJson(request);
Payload.Builder builder = Payload.newBuilder();
Payload payload = builder.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(jsonString)))
.setMetadata(buildMeta)
.build();
Payload payload = builder
.setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE))))
.setMetadata(buildMeta).build();
return payload;
}
@ -134,15 +139,16 @@ public class GrpcUtils {
* convert response to payload.
*
* @param response response.
* @return
* @return payload.
*/
public static Payload convert(Response response) {
String jsonString = toJson(response);
Metadata.Builder metaBuilder = Metadata.newBuilder();
metaBuilder.setClientVersion(VersionUtils.getFullClientVersion()).setType(response.getClass().getName());
Payload payload = Payload.newBuilder().setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(jsonString)))
Payload payload = Payload.newBuilder()
.setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE))))
.setMetadata(metaBuilder.build()).build();
return payload;
}
@ -151,19 +157,21 @@ public class GrpcUtils {
* parse payload to request/response model.
*
* @param payload payload to be parsed.
* @return
* @return payload
*/
public static PlainRequest parse(Payload payload) {
PlainRequest plainRequest = new PlainRequest();
Class classbyType = PayloadRegistry.getClassByType(payload.getMetadata().getType());
if (classbyType != null) {
Object obj = toObj(payload.getBody().getValue().toStringUtf8(), classbyType);
Class classyType = PayloadRegistry.getClassByType(payload.getMetadata().getType());
if (classyType != null) {
Object obj = toObj(payload.getBody().getValue().toString(Charset.forName(Constants.ENCODE)), classyType);
if (obj instanceof Request) {
((Request) obj).putAllHeader(payload.getMetadata().getHeadersMap());
}
plainRequest.body = obj;
} else {
throw new RemoteException(NacosException.SERVER_ERROR, "unknown payload type:" + classyType);
}
plainRequest.type = payload.getMetadata().getType();
plainRequest.metadata = convertMeta(payload.getMetadata());
return plainRequest;

View File

@ -122,7 +122,7 @@ public class RsocketConnection extends Connection {
private static <T> CompletableFuture<T> failAfter(final long timeouts) {
final CompletableFuture<T> promise = new CompletableFuture<T>();
RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable<Object>() {
RpcScheduledExecutor.TIMEOUT_SCHEDULER.schedule(new Callable<Object>() {
@Override
public Object call() throws Exception {
final TimeoutException ex = new TimeoutException("Timeout after " + timeouts);

View File

@ -109,7 +109,7 @@ public class IoUtils {
*
* @param str strings to be compressed.
* @param encoding encoding.
* @return
* @return byte[]
*/
public static byte[] tryCompress(String str, String encoding) {
if (str == null || str.length() == 0) {

View File

@ -185,7 +185,7 @@ public class ConfigController {
}
/**
* Get configure board infomation fail.
* Get configure board information fail.
*
* @throws ServletException ServletException.
* @throws IOException IOException.

View File

@ -17,7 +17,7 @@
package com.alibaba.nacos.config.server.remote;
import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.auth.annotation.Secured;
@ -49,7 +49,7 @@ import java.util.Map;
* @version $Id: ConfigPublishRequestHandler.java, v 0.1 2020年07月16日 4:41 PM liuzunfei Exp $
*/
@Component
public class ConfigPublishRequestHandler extends RequestHandler<ConfigPublishRequest, ConfigPubishResponse> {
public class ConfigPublishRequestHandler extends RequestHandler<ConfigPublishRequest, ConfigPublishResponse> {
private final PersistService persistService;
@ -59,7 +59,7 @@ public class ConfigPublishRequestHandler extends RequestHandler<ConfigPublishReq
@Override
@Secured(action = ActionTypes.WRITE, resource = "", parser = ConfigResourceParser.class)
public ConfigPubishResponse handle(ConfigPublishRequest request, RequestMeta meta) throws NacosException {
public ConfigPublishResponse handle(ConfigPublishRequest request, RequestMeta meta) throws NacosException {
try {
String dataId = request.getDataId();
@ -115,10 +115,10 @@ public class ConfigPublishRequestHandler extends RequestHandler<ConfigPublishReq
ConfigTraceService
.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return ConfigPubishResponse.buildSuccessResponse();
return ConfigPublishResponse.buildSuccessResponse();
} catch (Exception e) {
Loggers.REMOTE_DIGEST.error("[ConfigPublishRequestHandler] publish config error ,request ={}", request, e);
return ConfigPubishResponse.buildFailResponse(e.getMessage());
return ConfigPublishResponse.buildFailResponse(e.getMessage());
}
}

View File

@ -41,11 +41,13 @@ import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import static com.alibaba.nacos.config.server.utils.LogUtil.PULL_LOG;
@ -93,7 +95,7 @@ public class ConfigQueryRequestHandler extends RequestHandler<ConfigQueryRequest
RequestMeta meta, boolean notify) throws UnsupportedEncodingException {
ConfigQueryResponse response = new ConfigQueryResponse();
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
String autoTag = meta.getLabels().get("Vipserver-Tag");
@ -166,7 +168,7 @@ public class ConfigQueryRequestHandler extends RequestHandler<ConfigQueryRequest
// pullLog.info("[client-get] clientIp={}, {},
// no data",
// new Object[]{clientIp, groupKey});
response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist");
return response;
}
@ -197,7 +199,7 @@ public class ConfigQueryRequestHandler extends RequestHandler<ConfigQueryRequest
// pullLog.info("[client-get] clientIp={}, {},
// no data",
// new Object[]{clientIp, groupKey});
response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist");
return response;
@ -231,7 +233,7 @@ public class ConfigQueryRequestHandler extends RequestHandler<ConfigQueryRequest
because the delayed value of active get requests is very large.
*/
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
ConfigTraceService.PULL_EVENT_OK, delayed, clientIp, notify);
ConfigTraceService.PULL_EVENT_OK, notify ? delayed : -1, clientIp, notify);
} finally {
releaseConfigReadLock(groupKey);
@ -256,13 +258,15 @@ public class ConfigQueryRequestHandler extends RequestHandler<ConfigQueryRequest
* read content.
*
* @param file file to read.
* @return
* @return content.
*/
public static String readFileContent(File file) {
BufferedReader reader = null;
StringBuffer sbf = new StringBuffer();
try {
reader = new BufferedReader(new FileReader(file));
InputStreamReader isr = new InputStreamReader(new FileInputStream(file), Charset.forName(Constants.ENCODE));
reader = new BufferedReader(isr);
String tempStr;
while ((tempStr = reader.readLine()) != null) {
sbf.append(tempStr);

View File

@ -72,7 +72,7 @@ public abstract class BaseRpcServer {
/**
* get connection type.
*
* @return
* @return connection type.
*/
public abstract ConnectionType getConnectionType();
@ -86,14 +86,14 @@ public abstract class BaseRpcServer {
/**
* the increase offset of nacos server port for rpc server port.
*
* @return
* @return delta port offset of main port.
*/
public abstract int rpcPortOffset();
/**
* get service port.
*
* @return
* @return service port.
*/
public int getServicePort() {
return EnvUtil.getPort() + rpcPortOffset();
@ -111,7 +111,6 @@ public abstract class BaseRpcServer {
/**
* the increase offset of nacos server port for rpc server port.
*
* @return
*/
public abstract void shutdownServer();

View File

@ -30,7 +30,7 @@ import javax.annotation.PostConstruct;
public abstract class ClientConnectionEventListener {
/**
* lisnter name.
* listener name.
*/
private String name;

View File

@ -21,10 +21,6 @@ import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* registry for client connection event listeners.
@ -37,30 +33,24 @@ public class ClientConnectionEventListenerRegistry {
final List<ClientConnectionEventListener> clientConnectionEventListeners = new ArrayList<ClientConnectionEventListener>();
protected ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.core.remote.client.connection.notifier");
t.setDaemon(true);
return t;
}
});
/**
* notify where a new client connected.
*
* @param connection connection that new created.
*/
public void notifyClientConnected(final Connection connection) {
executorService.schedule(new Runnable() {
@Override
public void run() {
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
clientConnectionEventListener.clientConnected(connection);
}
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
try {
clientConnectionEventListener.clientConnected(connection);
} catch (Throwable throwable) {
Loggers.REMOTE
.info("[NotifyClientConnected] failed for listener {}", clientConnectionEventListener.getName(),
throwable);
}
}, 0L, TimeUnit.MILLISECONDS);
}
}
/**
@ -69,18 +59,16 @@ public class ClientConnectionEventListenerRegistry {
* @param connection connection that disconnected.
*/
public void notifyClientDisConnected(final Connection connection) {
executorService.schedule(new Runnable() {
@Override
public void run() {
for (ClientConnectionEventListener each : clientConnectionEventListeners) {
try {
each.clientDisConnected(connection);
} catch (Exception e) {
Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}", each.getName(), e);
}
}
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
try {
clientConnectionEventListener.clientDisConnected(connection);
} catch (Throwable throwable) {
Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
clientConnectionEventListener.getName(), throwable);
}
}, 0L, TimeUnit.MILLISECONDS);
}
}
/**

View File

@ -162,7 +162,7 @@ public class ConnectionManager {
@PostConstruct
public void start() {
// Start UnHeathy Conection Expel Task.
// Start UnHealthy Connection Expel Task.
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
@ -215,7 +215,7 @@ public class ConnectionManager {
}
} catch (Throwable e) {
Loggers.REMOTE.error("error occurs when heathy check... ", e);
Loggers.REMOTE.error("error occurs when healthy check... ", e);
}
}
}, 1000L, 3000L, TimeUnit.MILLISECONDS);
@ -268,7 +268,7 @@ public class ConnectionManager {
/**
* get all client count.
*
* @return
* @return client count.
*/
public int currentClientsCount() {
return connections.size();
@ -322,7 +322,7 @@ public class ConnectionManager {
/**
* check if over limit.
*
* @return
* @return over limit or not.
*/
public boolean isOverLimit() {
return maxClient > 0 && this.connections.size() >= maxClient;

View File

@ -41,10 +41,10 @@ public class RequestHandlerRegistry implements ApplicationContextAware {
Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();
/**
* Get Reuquest Handler By request Type.
* Get Request Handler By request Type.
*
* @param requestType see definitions of sub constants classes of RequestTypeConstants
* @return
* @return request handler.
*/
public RequestHandler getByRequestType(String requestType) {
if (!registryHandlers.containsKey(requestType)) {

View File

@ -131,7 +131,7 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
.set(TRANS_KEY_CONN_ID, UuidUtils.generateUuid()).set(TRANS_KEY_CLIENT_PORT, remotePort)
.set(TRANS_KEY_LOCAL_PORT, localPort).build();
String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE.info("Connection transportReady, connectionId = {}", connectionId);
Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
return attrWrapper;
}
@ -139,7 +139,7 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
@Override
public void transportTerminated(Attributes transportAttrs) {
String connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE.info("Connection transportTerminated, connectionId = {}", connectionId);
Loggers.REMOTE_DIGEST.info("Connection transportTerminated,connectionId = {} ", connectionId);
connectionManager.unregister(connectionId);
}
}).build();

View File

@ -60,7 +60,7 @@ public class RsocketConnection extends Connection {
private static <T> CompletableFuture<T> failAfter(final long timeouts) {
final CompletableFuture<T> promise = new CompletableFuture<T>();
RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable<Object>() {
RpcScheduledExecutor.TIMEOUT_SCHEDULER.schedule(new Callable<Object>() {
@Override
public Object call() throws Exception {
final TimeoutException ex = new TimeoutException("Timeout after " + timeouts);

View File

@ -36,7 +36,7 @@ public class StringPool {
* get singleton string value from the pool.
*
* @param key key string to be pooled.
* @return
* @return value after pooled.
*/
public static String get(String key) {
if (key == null) {