diff --git a/api/src/main/java/com/alibaba/nacos/api/cmdb/spi/CmdbService.java b/api/src/main/java/com/alibaba/nacos/api/cmdb/spi/CmdbService.java index f71fb78b6..59e4b0d36 100644 --- a/api/src/main/java/com/alibaba/nacos/api/cmdb/spi/CmdbService.java +++ b/api/src/main/java/com/alibaba/nacos/api/cmdb/spi/CmdbService.java @@ -93,7 +93,7 @@ public interface CmdbService { * * @param entityName name of entity * @param entityType type of entity - * @return + * @return entity. */ Entity getEntity(String entityName, String entityType); } diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigBatchListenRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigBatchListenRequest.java index dd7c615a6..5e4cc9bcb 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigBatchListenRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigBatchListenRequest.java @@ -100,13 +100,13 @@ public class ConfigBatchListenRequest extends AbstractConfigRequest { public ConfigListenContext() { } - + @Override public String toString() { return "ConfigListenContext{" + "group='" + group + '\'' + ", md5='" + md5 + '\'' + ", dataId='" + dataId + '\'' + ", tenant='" + tenant + '\'' + '}'; } - + /** * Getter method for property group. * diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/cluster/ConfigChangeClusterSyncRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/cluster/ConfigChangeClusterSyncRequest.java index 171d0615a..e3b51a95a 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/cluster/ConfigChangeClusterSyncRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/cluster/ConfigChangeClusterSyncRequest.java @@ -41,7 +41,7 @@ public class ConfigChangeClusterSyncRequest extends AbstractConfigRequest { /** * is beta. * - * @return + * @return is beta or not. */ public boolean isBeta() { return "Y".equalsIgnoreCase(isBeta); diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java index 105a6d08c..72fc7ba5c 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeBatchListenResponse.java @@ -36,9 +36,10 @@ public class ConfigChangeBatchListenResponse extends Response { } /** - * add changed config. + * add changed config. + * * @param dataId dataId. - * @param group group. + * @param group group. * @param tenant tenant. */ public void addChangeConfig(String dataId, String group, String tenant) { @@ -71,7 +72,7 @@ public class ConfigChangeBatchListenResponse extends Response { * build fail response. * * @param errorMessage errorMessage. - * @return + * @return response. */ public static ConfigChangeBatchListenResponse buildFailResponse(String errorMessage) { ConfigChangeBatchListenResponse response = new ConfigChangeBatchListenResponse(); @@ -145,7 +146,7 @@ public class ConfigChangeBatchListenResponse extends Response { public void setTenant(String tenant) { this.tenant = tenant; } - + @Override public String toString() { return "ConfigContext{" + "group='" + group + '\'' + ", dataId='" + dataId + '\'' + ", tenant='" + tenant diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigPubishResponse.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigPublishResponse.java similarity index 60% rename from api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigPubishResponse.java rename to api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigPublishResponse.java index 9376f463d..b43c1940a 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigPubishResponse.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigPublishResponse.java @@ -20,35 +20,35 @@ import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.remote.response.ResponseCode; /** - * ConfigPubishResponse. + * ConfigPublishResponse. * * @author liuzunfei * @version $Id: ConfigPubishResponse.java, v 0.1 2020年07月16日 4:59 PM liuzunfei Exp $ */ -public class ConfigPubishResponse extends Response { +public class ConfigPublishResponse extends Response { - public ConfigPubishResponse() { + public ConfigPublishResponse() { super(); } /** - * Buidl success resposne. + * Build success response. * - * @return + * @return response. */ - public static ConfigPubishResponse buildSuccessResponse() { - return new ConfigPubishResponse(); + public static ConfigPublishResponse buildSuccessResponse() { + return new ConfigPublishResponse(); } /** - * Buidl fail resposne. + * Build fail response. * - * @return + * @return response. */ - public static ConfigPubishResponse buildFailResponse(String errorMsg) { - ConfigPubishResponse configPubishResponse = new ConfigPubishResponse(); - configPubishResponse.setResultCode(ResponseCode.FAIL.getCode()); - configPubishResponse.setMessage(errorMsg); - return configPubishResponse; + public static ConfigPublishResponse buildFailResponse(String errorMsg) { + ConfigPublishResponse configPublishResponse = new ConfigPublishResponse(); + configPublishResponse.setResultCode(ResponseCode.FAIL.getCode()); + configPublishResponse.setMessage(errorMsg); + return configPublishResponse; } } diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigQueryResponse.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigQueryResponse.java index ec0094f3f..3b67269c5 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigQueryResponse.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigQueryResponse.java @@ -57,7 +57,7 @@ public class ConfigQueryResponse extends Response { * * @param errorCode errorCode. * @param message message. - * @return + * @return response. */ public static ConfigQueryResponse buildFailResponse(int errorCode, String message) { ConfigQueryResponse response = new ConfigQueryResponse(); @@ -66,10 +66,10 @@ public class ConfigQueryResponse extends Response { } /** - * Buidl success resposne. + * Build success response. * * @param content content. - * @return + * @return response. */ public static ConfigQueryResponse buildSuccessResponse(String content) { ConfigQueryResponse response = new ConfigQueryResponse(); diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigRemoveResponse.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigRemoveResponse.java index 9d10199dd..7edf4108d 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigRemoveResponse.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigRemoveResponse.java @@ -32,18 +32,18 @@ public class ConfigRemoveResponse extends Response { } /** - * Buidl success resposne. + * Build success response. * - * @return + * @return response. */ public static ConfigRemoveResponse buildSuccessResponse() { return new ConfigRemoveResponse(); } /** - * Buidl fail resposne. + * Build fail response. * - * @return + * @return response. */ public static ConfigRemoveResponse buildFailResponse(String errorMsg) { ConfigRemoveResponse removeResponse = new ConfigRemoveResponse(); diff --git a/api/src/main/java/com/alibaba/nacos/api/exception/NacosException.java b/api/src/main/java/com/alibaba/nacos/api/exception/NacosException.java index 28f0276ef..d4059f873 100644 --- a/api/src/main/java/com/alibaba/nacos/api/exception/NacosException.java +++ b/api/src/main/java/com/alibaba/nacos/api/exception/NacosException.java @@ -107,11 +107,16 @@ public class NacosException extends Exception { */ public static final int CLIENT_INVALID_PARAM = -400; + /** + * invalid param(参数错误). + */ + public static final int CLIENT_DISCONNECT = -401; + /** * over client threshold(超过server端的限流阈值). */ public static final int CLIENT_OVER_THRESHOLD = -503; - + /* * server error code. * 400 403 throw exception to user @@ -160,4 +165,6 @@ public class NacosException extends Exception { * ome exceptions that occurred when the use the Nacos RestTemplate and Nacos AsyncRestTemplate. */ public static final int HTTP_CLIENT_ERROR_CODE = -500; + + } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java b/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java index cf757c53d..807a3c945 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/DefaultRequestFuture.java @@ -82,7 +82,7 @@ public class DefaultRequestFuture implements RequestFuture { this.requestId = requestId; this.connectionId = connectionId; if (requestCallBack != null) { - this.timeoutFuture = RpcScheduledExecutor.TIMEOUT_SHEDULER + this.timeoutFuture = RpcScheduledExecutor.TIMEOUT_SCHEDULER .schedule(new TimeoutHandler(), requestCallBack.getTimeout(), TimeUnit.MILLISECONDS); } this.timeoutInnerTrigger = timeoutInnerTrigger; diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java b/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java index 880a53d21..192c5df36 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/RequestCallBack.java @@ -31,14 +31,14 @@ public interface RequestCallBack { /** * get executor on callback. * - * @return + * @return executor. */ public Executor getExecutor(); /** * get timeout mills. * - * @return + * @return timeouts. */ public long getTimeout(); diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java b/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java index 31e5c5685..0f48bc348 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/RequestFuture.java @@ -31,7 +31,7 @@ public interface RequestFuture { /** * check that it is done or not.. - * @return + * @return is done . */ boolean isDone(); diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/Requester.java b/api/src/main/java/com/alibaba/nacos/api/remote/Requester.java index b35abcfba..01dbe3421 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/Requester.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/Requester.java @@ -87,7 +87,7 @@ public interface Requester { /** * check this requester is busy. * - * @return + * @return busy or not. */ public boolean isBusy(); } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java b/api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java index 2c66c30da..a1baa1c0d 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/RpcScheduledExecutor.java @@ -27,17 +27,11 @@ import java.util.concurrent.ThreadFactory; */ public class RpcScheduledExecutor extends ScheduledThreadPoolExecutor { - public static final RpcScheduledExecutor TIMEOUT_SHEDULER = new RpcScheduledExecutor(1, + public static final RpcScheduledExecutor TIMEOUT_SCHEDULER = new RpcScheduledExecutor(0, "com.alibaba.nacos.remote.TimerScheduler"); - - /** - * executor to execute future request. - */ - public static final RpcScheduledExecutor AYNS_REQUEST_EXECUTOR = new RpcScheduledExecutor( - Runtime.getRuntime().availableProcessors(), "com.alibaba.nacos.remote.RpcRequestExecutor"); - + public static final RpcScheduledExecutor COMMON_SERVER_EXECUTOR = new RpcScheduledExecutor( - Runtime.getRuntime().availableProcessors(), "com.alibaba.nacos.remote.ServerCommonScheduler"); + 0, "com.alibaba.nacos.remote.ServerCommonScheduler"); public RpcScheduledExecutor(int corePoolSize, final String threadName) { super(corePoolSize, new ThreadFactory() { diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/Request.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/Request.java index df4861da7..0d16817fd 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/Request.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/Request.java @@ -115,6 +115,6 @@ public abstract class Request { @Override public String toString() { - return "Request{" + "headers=" + headers + ", requestId='" + requestId + '\'' + '}'; + return this.getClass().getSimpleName() + "{" + "headers=" + headers + ", requestId='" + requestId + '\'' + '}'; } } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/Response.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/Response.java index e337e366e..969b2abee 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/response/Response.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/Response.java @@ -52,9 +52,9 @@ public abstract class Response { } /** - * Check Response is Successd. + * Check Response is Successed. * - * @return + * @return success or not. */ public boolean isSuccess() { return this.resultCode == ResponseCode.SUCCESS.getCode(); diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java index 13cba5ca9..4e2c5871b 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/CacheData.java @@ -51,14 +51,6 @@ public class CacheData { this.isInitializing = isInitializing; } - public boolean isListenSuccess() { - return isListenSuccess; - } - - public void setListenSuccess(boolean listenSuccess) { - isListenSuccess = listenSuccess; - } - public String getMd5() { return md5; } @@ -285,6 +277,21 @@ public class CacheData { return content; } + /** + * 1.first add listener.default is false;need to check. + * 2.receive config change notify,set false;need to check. + * 3.last listener is remove,set to false;need to check + * + * @return + */ + public boolean isSync() { + return isSync; + } + + public void setSync(boolean sync) { + isSync = sync; + } + public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group) { if (null == dataId || null == group) { throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group); @@ -350,7 +357,10 @@ public class CacheData { private volatile boolean isInitializing = true; - private volatile boolean isListenSuccess = false; + /** + * if is sync with the server. + */ + private volatile boolean isSync = false; private String type; diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index 7e1fd9379..8c96b42b2 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -27,13 +27,12 @@ import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest; import com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest; import com.alibaba.nacos.api.config.remote.response.ConfigChangeBatchListenResponse; import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse; -import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse; +import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse; import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse; import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.RemoteConstants; import com.alibaba.nacos.api.remote.request.Request; -import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.client.config.common.GroupKey; import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager; @@ -58,7 +57,6 @@ import com.alibaba.nacos.common.remote.client.ConnectionEventListener; import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.remote.client.RpcClientFactory; import com.alibaba.nacos.common.remote.client.ServerListFactory; -import com.alibaba.nacos.common.remote.client.ServerRequestHandler; import com.alibaba.nacos.common.utils.ConvertUtils; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.common.utils.StringUtils; @@ -116,29 +114,11 @@ public class ClientWorker implements Closeable { for (Listener listener : listeners) { cache.addListener(listener); } - if (!cache.isListenSuccess()) { + if (!cache.isSync()) { agent.notifyListenConfig(); } } - /** - * Remove listener. - * - * @param dataId dataId of data - * @param group group of data - * @param listener listener - */ - public void removeListener(String dataId, String group, Listener listener) { - group = null2defaultGroup(group); - CacheData cache = getCache(dataId, group); - if (null != cache) { - cache.removeListener(listener); - if (cache.getListeners().isEmpty()) { - agent.removeCache(dataId, group); - } - } - } - /** * Add listeners for tenant. * @@ -152,12 +132,15 @@ public class ClientWorker implements Closeable { group = null2defaultGroup(group); String tenant = agent.getTenant(); CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); - for (Listener listener : listeners) { - cache.addListener(listener); - } - if (!cache.isListenSuccess()) { - agent.notifyListenConfig(); + synchronized (cache) { + for (Listener listener : listeners) { + cache.addListener(listener); + } + if (!cache.isSync()) { + agent.notifyListenConfig(); + } } + } /** @@ -174,13 +157,38 @@ public class ClientWorker implements Closeable { group = null2defaultGroup(group); String tenant = agent.getTenant(); CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); - cache.setContent(content); - for (Listener listener : listeners) { - cache.addListener(listener); + synchronized (cache) { + cache.setContent(content); + for (Listener listener : listeners) { + cache.addListener(listener); + } + // if current cache is already at listening status,do not notify. + if (!cache.isSync()) { + agent.notifyListenConfig(); + } } - // if current cache is already at listening status,do not notify. - if (!cache.isListenSuccess()) { - agent.notifyListenConfig(); + + } + + /** + * Remove listener. + * + * @param dataId dataId of data + * @param group group of data + * @param listener listener + */ + public void removeListener(String dataId, String group, Listener listener) { + group = null2defaultGroup(group); + CacheData cache = getCache(dataId, group); + if (null != cache) { + synchronized (cache) { + cache.removeListener(listener); + if (cache.getListeners().isEmpty()) { + cache.setSync(false); + agent.removeCache(dataId, group); + } + } + } } @@ -198,6 +206,7 @@ public class ClientWorker implements Closeable { if (null != cache) { cache.removeListener(listener); if (cache.getListeners().isEmpty()) { + cache.setSync(false); agent.removeCache(dataId, group); } } @@ -415,39 +424,29 @@ public class ClientWorker implements Closeable { this.configFilterChainManager = configFilterChainManager; init(properties); - + ServerListManager serverListManager = new ServerListManager(properties); serverListManager.start(); - + if (ParamUtils.useHttpSwitch()) { agent = new ConfigHttpTransportClient(properties, serverListManager); } else { agent = new ConfigRpcTransportClient(properties, serverListManager); } - - this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); - t.setDaemon(true); - return t; - } - }); - this.executorService = Executors + ScheduledExecutorService executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); - t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); + t.setName("com.alibaba.nacos.client.Worker_" + agent.getName()); t.setDaemon(true); return t; } }); agent.setExecutor(executorService); agent.start(); - + } private void refreshContentAndCheck(String groupKey, boolean notify) { @@ -464,6 +463,11 @@ public class ClientWorker implements Closeable { if (null != ct[1]) { cacheData.setType(ct[1]); } + if (notify) { + LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", + agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(), + ContentUtils.truncateContent(ct[0]), ct[1]); + } cacheData.checkListenerMd5(); } catch (Exception e) { LOGGER.error("refresh content and check md5 fail ,dataid={},group={},tenant={} ", cacheData.dataId, @@ -487,8 +491,7 @@ public class ClientWorker implements Closeable { public void shutdown() throws NacosException { String className = this.getClass().getName(); LOGGER.info("{} do shutdown begin", className); - ThreadUtils.shutdownThreadPool(executorService, LOGGER); - ThreadUtils.shutdownThreadPool(executor, LOGGER); + ThreadUtils.shutdownThreadPool(agent.executor, LOGGER); LOGGER.info("{} do shutdown stop", className); } @@ -500,10 +503,6 @@ public class ClientWorker implements Closeable { this.isHealthServer = isHealthServer; } - final ScheduledExecutorService executor; - - final ScheduledExecutorService executorService; - /** * groupKey -> cacheData. */ @@ -529,127 +528,124 @@ public class ClientWorker implements Closeable { private BlockingQueue listenExecutebell = new ArrayBlockingQueue(1); private Object bellItem = new Object(); - + private Map rpcClientMap = new HashMap(); public ConfigRpcTransportClient(Properties properties, ServerListManager serverListManager) { super(properties, serverListManager); } - - private ConnectionType getConectiontype() { + + private ConnectionType getConnectionType() { ConnectionType connectionType = ConnectionType.GRPC; - String connetionType = ParamUtils.configRemoteConnectionType(); - if (StringUtils.isNotBlank(connetionType)) { - ConnectionType connectionType1 = ConnectionType.valueOf(connetionType); - if (connectionType1 != null) { - connectionType = connectionType1; + String connectionTypeString = ParamUtils.configRemoteConnectionType(); + if (StringUtils.isNotBlank(connectionTypeString)) { + ConnectionType connectionTypeInner = ConnectionType.valueOf(connectionTypeString); + if (connectionTypeInner != null) { + connectionType = connectionTypeInner; } } return connectionType; } - - private Map getLabels() { + private Map getLabels() { + Map labels = new HashMap(2, 1); labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK); labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_CONFIG); return labels; } - + private void initHandlerRpcClient(final RpcClient rpcClientInner) { /* * Register Listen Change Handler */ - rpcClientInner.registerServerPushResponseHandler(new ServerRequestHandler() { - @Override - public Response requestReply(Request request, RequestMeta requestMeta) { - if (request instanceof ConfigChangeNotifyRequest) { - ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request; - String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(), - configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant()); - CacheData cacheData = cacheMap.get().get(groupKey); - if (cacheData != null) { - if (configChangeNotifyRequest.isContentPush() - && cacheData.getLastModifiedTs() < configChangeNotifyRequest.getLastModifiedTs()) { - cacheData.setContent(configChangeNotifyRequest.getContent()); - cacheData.setType(configChangeNotifyRequest.getType()); - cacheData.checkListenerMd5(); - } - cacheData.setListenSuccess(false); - notifyListenConfig(); + rpcClientInner.registerServerPushResponseHandler((request, requestMeta) -> { + if (request instanceof ConfigChangeNotifyRequest) { + ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request; + LOGGER.info("[{}] [server-push] config changed. dataId={}, group={}", getName(), + configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup()); + String groupKey = GroupKey + .getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(), + configChangeNotifyRequest.getTenant()); + + CacheData cacheData = cacheMap.get().get(groupKey); + if (cacheData != null) { + if (configChangeNotifyRequest.isContentPush() + && cacheData.getLastModifiedTs() < configChangeNotifyRequest.getLastModifiedTs()) { + cacheData.setContent(configChangeNotifyRequest.getContent()); + cacheData.setType(configChangeNotifyRequest.getType()); + cacheData.checkListenerMd5(); } - return new ConfigChangeNotifyResponse(); + cacheData.setSync(false); + notifyListenConfig(); } - return null; + return new ConfigChangeNotifyResponse(); } - + return null; }); - - rpcClientInner.registerConnectionListener(new ConnectionEventListener() { + rpcClientInner.registerConnectionListener(new ConnectionEventListener() { + @Override public void onConnected() { + LOGGER.info("[{}] Connected,notify listen context...", rpcClientInner.getName()); notifyListenConfig(); } - + @Override public void onDisConnect() { String taskId = rpcClientInner.getLabels().get("taskId"); - LOGGER.info("[{}] clear listen context...", rpcClientInner.getName()); + LOGGER.info("[{}] DisConnected,clear listen context...", rpcClientInner.getName()); Collection values = cacheMap.get().values(); - + for (CacheData cacheData : values) { if (taskId != null && Integer.valueOf(taskId).equals(cacheData.getTaskId())) { - cacheData.setListenSuccess(false); + cacheData.setSync(false); continue; } - cacheData.setListenSuccess(false); + cacheData.setSync(false); } } - + }); - + rpcClientInner.init(new ServerListFactory() { @Override public String genNextServer() { return ConfigRpcTransportClient.super.serverListManager.getNextServerAddr(); - + } - + @Override public String getCurrentServer() { return ConfigRpcTransportClient.super.serverListManager.getCurrentServerAddr(); - + } - + @Override public List getServerList() { return ConfigRpcTransportClient.super.serverListManager.serverUrls; - + } }); } @Override - public void startIntenal() throws NacosException { + public void startInternal() throws NacosException { executor.schedule(new Runnable() { @Override public void run() { - try { - while (true) { - try { - listenExecutebell.poll(5L, TimeUnit.SECONDS); - executeConfigListen(); - } catch (Exception e) { - LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); - } + while (true) { + try { + listenExecutebell.poll(5L, TimeUnit.SECONDS); + executeConfigListen(); + } catch (Exception e) { + LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); } - } catch (Throwable e) { - LOGGER.error("rpc listen task exception", e); } } }, 0L, TimeUnit.MILLISECONDS); - + // register server change subscriber. NotifyCenter.registerSubscriber(new Subscriber() { @Override @@ -671,7 +667,7 @@ public class ClientWorker implements Closeable { } } - + @Override public Class subscribeType() { return ServerlistChangeEvent.class; @@ -692,13 +688,16 @@ public class ClientWorker implements Closeable { @Override public void executeConfigListen() { - + Map> listenCachesMap = new HashMap>(16); Map> removeListenCachesMap = new HashMap>(16); for (CacheData cache : cacheMap.get().values()) { - //get listen config and remove listen config - if (!CollectionUtils.isEmpty(cache.getListeners()) && !cache.isListenSuccess()) { + if (cache.isSync()) { + continue; + } + if (!CollectionUtils.isEmpty(cache.getListeners())) { + //get listen config if (!cache.isUseLocalConfigInfo()) { List cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId())); if (cacheDatas == null) { @@ -708,7 +707,7 @@ public class ClientWorker implements Closeable { cacheDatas.add(cache); } - } else if (CollectionUtils.isEmpty(cache.getListeners()) && cache.isListenSuccess()) { + } else if (CollectionUtils.isEmpty(cache.getListeners())) { if (!cache.isUseLocalConfigInfo()) { List cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId())); @@ -721,21 +720,20 @@ public class ClientWorker implements Closeable { } } } - + if (!listenCachesMap.isEmpty()) { for (Map.Entry> entry : listenCachesMap.entrySet()) { String taskId = entry.getKey(); List listenCaches = entry.getValue(); - + ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches); configChangeListenRequest.setListen(true); try { RpcClient rpcClient = ensureRpcClient(taskId); - ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy( rpcClient, configChangeListenRequest); if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) { - + Set changeKeys = new HashSet(); //handle changed keys,notify listener if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) { @@ -745,25 +743,41 @@ public class ClientWorker implements Closeable { .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant()); changeKeys.add(changeKey); - refreshContentAndCheck(changeKey, true); + boolean isInitializing = cacheMap.get().get(changeKey).isInitializing(); + this.executor.execute(() -> refreshContentAndCheck(changeKey, !isInitializing)); + } } - - //handler constent configs + + //handler content configs for (CacheData cacheData : listenCaches) { if (!changeKeys.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant()))) { - cacheData.setListenSuccess(true); + //sync:cache data md5 = server md5 && cache data md5 = all listeners md5. + cacheData.checkListenerMd5(); + cacheData.setSync(true); } + + cacheData.setInitializing(false); } - + } } catch (Exception e) { - LOGGER.error("async listen config change error ", e); + if (e instanceof NacosException + && ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) { + LOGGER.warn("async listen config change fail ,client is not connected."); + } else { + LOGGER.error("async listen config change error ", e); + } + try { + Thread.sleep(10L); + } catch (InterruptedException interruptedException) { + //ignore + } } } } - + if (!removeListenCachesMap.isEmpty()) { for (Map.Entry> entry : removeListenCachesMap.entrySet()) { String taskId = entry.getKey(); @@ -775,99 +789,50 @@ public class ClientWorker implements Closeable { boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest); if (removeSuccess) { for (CacheData cacheData : removeListenCaches) { - ClientWorker.this.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant); + synchronized (cacheData) { + if (cacheData.getListeners().isEmpty()) { + ClientWorker.this + .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant); + } + } } } - + } catch (Exception e) { LOGGER.error("async remove listen config change error ", e); } + try { + Thread.sleep(10L); + } catch (InterruptedException interruptedException) { + //ignore + } } } } - - private RpcClient ensureRpcClient(String taskId) throws NacosException { + + private synchronized RpcClient ensureRpcClient(String taskId) throws NacosException { Map labels = getLabels(); Map newlabels = new HashMap(labels); newlabels.put("taskId", taskId); - + RpcClient rpcClient = RpcClientFactory - .createClient("config-" + taskId + "-" + uuid, getConectiontype(), newlabels); + .createClient("config-" + taskId + "-" + uuid, getConnectionType(), newlabels); if (rpcClient.isWaitInitiated()) { initHandlerRpcClient(rpcClient); rpcClient.start(); } - + return rpcClient; } - - /** - * build config strings. - * - * @param caches caches to build config string. - * @return - */ - private List buildConfigStrs(List caches) { - StringBuilder listenConfigsBuilder = new StringBuilder(); - List configStrings = new ArrayList(); - int index = 0; - for (CacheData cache : caches) { - index++; - listenConfigsBuilder.append(cache.dataId).append(WORD_SEPARATOR); - listenConfigsBuilder.append(cache.group).append(WORD_SEPARATOR); - if (StringUtils.isBlank(cache.tenant)) { - listenConfigsBuilder.append(cache.getMd5()).append(LINE_SEPARATOR); - } else { - listenConfigsBuilder.append(cache.getMd5()).append(WORD_SEPARATOR); - listenConfigsBuilder.append(cache.getTenant()).append(LINE_SEPARATOR); - } - - if (index >= 3000) { - configStrings.add(listenConfigsBuilder.toString()); - listenConfigsBuilder = new StringBuilder(); - index = 0; - } - } - - if (listenConfigsBuilder.length() > 0) { - configStrings.add(listenConfigsBuilder.toString()); - } - return configStrings; - } - /** * build config string. * * @param caches caches to build config string. - * @return - */ - private String buildConfigStr(List caches) { - StringBuilder listenConfigsBuilder = new StringBuilder(); - List configStrings = new ArrayList(); - for (CacheData cache : caches) { - listenConfigsBuilder.append(cache.dataId).append(WORD_SEPARATOR); - listenConfigsBuilder.append(cache.group).append(WORD_SEPARATOR); - if (StringUtils.isBlank(cache.tenant)) { - listenConfigsBuilder.append(cache.getMd5()).append(LINE_SEPARATOR); - } else { - listenConfigsBuilder.append(cache.getMd5()).append(WORD_SEPARATOR); - listenConfigsBuilder.append(cache.getTenant()).append(LINE_SEPARATOR); - } - - } - - return listenConfigsBuilder.toString(); - } - - /** - * build config string. - * - * @param caches caches to build config string. - * @return + * @return request. */ private ConfigBatchListenRequest buildConfigRequest(List caches) { - + ConfigBatchListenRequest configChangeListenRequest = new ConfigBatchListenRequest(); for (CacheData cacheData : caches) { configChangeListenRequest.addConfigListenContext(cacheData.group, cacheData.dataId, cacheData.tenant, @@ -878,14 +843,14 @@ public class ClientWorker implements Closeable { @Override public void removeCache(String dataId, String group) { - // Notify to rpc unlisten ,and remove cache if success. + // Notify to rpc un listen ,and remove cache if success. notifyListenConfig(); } /** * send cancel listen config change request . * - * @param configListenString string of remove listen config string. + * @param configChangeListenRequest request of remove listen config string. */ private boolean unListenConfigChange(RpcClient rpcClient, ConfigBatchListenRequest configChangeListenRequest) throws NacosException { @@ -896,11 +861,12 @@ public class ClientWorker implements Closeable { } @Override - public String[] queryConfig(String dataId, String group, String tenant, long readTimeous, boolean notify) + public String[] queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify) throws NacosException { ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant); request.putHeader("notify", String.valueOf(notify)); - ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(getOneRunningClient(), request); + ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(getOneRunningClient(), request, + readTimeouts); String[] ct = new String[2]; if (response.isSuccess()) { @@ -930,15 +896,20 @@ public class ClientWorker implements Closeable { } } - + private Response requestProxy(RpcClient rpcClientInner, Request request) throws NacosException { + return requestProxy(rpcClientInner, request, 3000L); + } + + private Response requestProxy(RpcClient rpcClientInner, Request request, long timeoutMills) + throws NacosException { try { request.putAllHeader(super.getSecurityHeaders()); request.putAllHeader(super.getSpasHeaders()); } catch (Exception e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } - + JsonObject asJsonObjectTemp = new Gson().toJsonTree(request).getAsJsonObject(); asJsonObjectTemp.remove("headers"); asJsonObjectTemp.remove("requestId"); @@ -947,9 +918,9 @@ public class ClientWorker implements Closeable { throw new NacosException(NacosException.CLIENT_OVER_THRESHOLD, "More than client-side current limit threshold"); } - return rpcClientInner.request(request); + return rpcClientInner.request(request, timeoutMills); } - + RpcClient getOneRunningClient() throws NacosException { return ensureRpcClient("0"); } @@ -962,7 +933,7 @@ public class ClientWorker implements Closeable { request.putAdditonalParam("tag", tag); request.putAdditonalParam("appName", appName); request.putAdditonalParam("betaIps", betaIps); - ConfigPubishResponse response = (ConfigPubishResponse) requestProxy(getOneRunningClient(), request); + ConfigPublishResponse response = (ConfigPublishResponse) requestProxy(getOneRunningClient(), request); return response.isSuccess(); } catch (Exception e) { LOGGER.warn("[{}] [publish-single] error, dataId={}, group={}, tenant={}, code={}, msg={}", @@ -972,8 +943,8 @@ public class ClientWorker implements Closeable { } @Override - public boolean removeConfig(String dataid, String group, String tenat, String tag) throws NacosException { - ConfigRemoveRequest request = new ConfigRemoveRequest(dataid, group, tenat, tag); + public boolean removeConfig(String dataId, String group, String tenant, String tag) throws NacosException { + ConfigRemoveRequest request = new ConfigRemoveRequest(dataId, group, tenant, tag); ConfigRemoveResponse response = (ConfigRemoveResponse) requestProxy(getOneRunningClient(), request); return response.isSuccess(); } @@ -996,7 +967,7 @@ public class ClientWorker implements Closeable { } @Override - public void startIntenal() { + public void startInternal() { executor.scheduleWithFixedDelay(new Runnable() { @Override @@ -1022,14 +993,14 @@ public class ClientWorker implements Closeable { @Override public void executeConfigListen() { - // Dispatch taskes. + // Dispatch tasks. int listenerSize = cacheMap.get().size(); // Round up the longingTaskCount. int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. - executorService.execute(new LongPollingRunnable(agent, i, this)); + executor.execute(new LongPollingRunnable(agent, i, this)); } currentLongingTaskCount = longingTaskCount; } @@ -1060,7 +1031,7 @@ public class ClientWorker implements Closeable { params.put("group", group); params.put("tenant", tenant); } - + Map headers = new HashMap(16); headers.put("notify", String.valueOf(notify)); result = httpGet(Constants.CONFIG_CONTROLLER_PATH, headers, params, agent.getEncode(), readTimeout); @@ -1106,7 +1077,7 @@ public class ClientWorker implements Closeable { } } } - + private void assembleHttpParams(Map params, Map headers) throws Exception { Map securityHeaders = super.getSecurityHeaders(); if (securityHeaders != null) { @@ -1130,7 +1101,7 @@ public class ClientWorker implements Closeable { if (signHeaders != null) { headers.putAll(signHeaders); } - + } @Override @@ -1168,7 +1139,7 @@ public class ClientWorker implements Closeable { HttpRestResult result = null; try { - + result = httpPost(url, headers, params, encode, POST_TIMEOUT); } catch (Exception ex) { LOGGER.warn("[{}] [publish-single] exception, dataId={}, group={}, msg={}", agent.getName(), dataId, @@ -1190,7 +1161,7 @@ public class ClientWorker implements Closeable { return false; } } - + private HttpRestResult httpPost(String path, Map headers, Map paramValues, String encoding, long readTimeoutMs) throws Exception { if (headers == null) { @@ -1199,7 +1170,7 @@ public class ClientWorker implements Closeable { assembleHttpParams(paramValues, headers); return agent.httpPost(path, headers, paramValues, encoding, readTimeoutMs); } - + private HttpRestResult httpGet(String path, Map headers, Map paramValues, String encoding, long readTimeoutMs) throws Exception { if (headers == null) { @@ -1208,7 +1179,7 @@ public class ClientWorker implements Closeable { assembleHttpParams(paramValues, headers); return agent.httpGet(path, headers, paramValues, encoding, readTimeoutMs); } - + private HttpRestResult httpDelete(String path, Map headers, Map paramValues, String encoding, long readTimeoutMs) throws Exception { if (headers == null) { @@ -1256,7 +1227,7 @@ public class ClientWorker implements Closeable { return false; } } - + } /** @@ -1267,9 +1238,9 @@ public class ClientWorker implements Closeable { private final int taskId; private HttpAgent httpAgent; - + private ConfigTransportClient configTransportClient; - + public LongPollingRunnable(HttpAgent httpAgent, int taskId, ConfigTransportClient configTransportClient) { this.taskId = taskId; this.httpAgent = httpAgent; @@ -1313,8 +1284,9 @@ public class ClientWorker implements Closeable { tenant = key[2]; } try { - String[] ct = getServerConfig(dataId, group, tenant, 3000L, true); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); + + String[] ct = getServerConfig(dataId, group, tenant, 3000L, !cache.isInitializing()); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); @@ -1338,13 +1310,13 @@ public class ClientWorker implements Closeable { } inInitializingCacheList.clear(); - executorService.execute(this); + configTransportClient.executor.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); - executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); + configTransportClient.executor.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } } @@ -1397,18 +1369,18 @@ public class ClientWorker implements Closeable { params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); Map headers = new HashMap(2); headers.put("Long-Pulling-Timeout", "" + timeout); - + // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.put("Long-Pulling-Timeout-No-Hangup", "true"); } - + if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } - - try { + try { + //assemble headers. Map securityHeaders = configTransportClient.getSecurityHeaders(); if (securityHeaders != null) { @@ -1432,12 +1404,12 @@ public class ClientWorker implements Closeable { // In order to prevent the server from handling the delay of the client's long task, // increase the client's read timeout to avoid this problem. - + long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); HttpRestResult result = httpAgent .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, httpAgent.getEncode(), readTimeoutMs); - + if (result.ok()) { setHealthServer(true); return parseUpdateDataIdResponse(httpAgent, result.getData()); @@ -1464,15 +1436,15 @@ public class ClientWorker implements Closeable { if (StringUtils.isBlank(response)) { return Collections.emptyList(); } - + try { response = URLDecoder.decode(response, "UTF-8"); } catch (Exception e) { LOGGER.error("[" + httpAgent.getName() + "] [polling-resp] decode modifiedDataIdsString error", e); } - + List updateList = new LinkedList(); - + for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) { if (!StringUtils.isBlank(dataIdAndGroup)) { String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR); @@ -1499,7 +1471,7 @@ public class ClientWorker implements Closeable { /** * get client worker agent. * - * @return + * @return agent name. */ public String getAgentName() { return this.agent.getName(); diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ConfigTransportClient.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ConfigTransportClient.java index 208f2adbb..92d51cc9c 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ConfigTransportClient.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ConfigTransportClient.java @@ -131,7 +131,7 @@ public abstract class ConfigTransportClient { /** * get common header. * - * @return + * @return headers. */ protected Map getCommonHeader() { Map headers = new HashMap(16); @@ -244,7 +244,7 @@ public abstract class ConfigTransportClient { } - startIntenal(); + startInternal(); } /** @@ -252,7 +252,7 @@ public abstract class ConfigTransportClient { * * @throws NacosException exception may throw. */ - public abstract void startIntenal() throws NacosException; + public abstract void startInternal() throws NacosException; /** * get client name. @@ -264,7 +264,7 @@ public abstract class ConfigTransportClient { /** * get encode. * - * @return + * @return encode. */ public String getEncode() { return this.encode; @@ -273,7 +273,7 @@ public abstract class ConfigTransportClient { /** * get tenant. * - * @return + * @return tenant. */ public String getTenant() { return this.tenant; @@ -286,8 +286,6 @@ public abstract class ConfigTransportClient { /** * listen change . - * - * @return */ public abstract void executeConfigListen(); diff --git a/client/src/main/java/com/alibaba/nacos/client/config/utils/ParamUtils.java b/client/src/main/java/com/alibaba/nacos/client/config/utils/ParamUtils.java index 9e93b2c07..684753761 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/utils/ParamUtils.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/utils/ParamUtils.java @@ -211,7 +211,7 @@ public class ParamUtils { /** * check whether still using http . * - * @return + * @return use http transport . */ public static boolean useHttpSwitch() { String useHttpSwitch = System.getProperty("clientworker.use.http.switch"); @@ -221,7 +221,7 @@ public class ParamUtils { /** * get connection type for remote. * - * @return + * @return connection type. */ public static String configRemoteConnectionType() { String remoteConnectionType = System.getProperty("nacos.remote.config.connectiontype"); diff --git a/client/src/main/resources/nacos-log4j2.xml b/client/src/main/resources/nacos-log4j2.xml index 9ce9d1f8b..5a3c8fe00 100644 --- a/client/src/main/resources/nacos-log4j2.xml +++ b/client/src/main/resources/nacos-log4j2.xml @@ -31,8 +31,8 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n @@ -44,20 +44,6 @@ - - - - %d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n - - - - - - - - - @@ -84,13 +70,7 @@ additivity="false"> - - - - - - + diff --git a/client/src/main/resources/nacos-logback.xml b/client/src/main/resources/nacos-logback.xml index 8e25a9c80..c24b41534 100644 --- a/client/src/main/resources/nacos-logback.xml +++ b/client/src/main/resources/nacos-logback.xml @@ -53,10 +53,10 @@ - ${JM.LOG.PATH}/remote.log + ${JM.LOG.PATH}/nacos/remote.log - ${JM.LOG.PATH}/remote.log.%i + ${JM.LOG.PATH}/nacos/remote.log.%i ${JM.LOG.RETAIN.COUNT:-7} @@ -76,9 +76,9 @@ - - + diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java index 96d6472d6..8fd1c4b59 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java @@ -24,6 +24,7 @@ import com.alibaba.nacos.api.remote.RequestFuture; import com.alibaba.nacos.api.remote.request.ConnectResetRequest; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.RequestMeta; +import com.alibaba.nacos.api.remote.request.ServerCheckRequest; import com.alibaba.nacos.api.remote.response.ConnectResetResponse; import com.alibaba.nacos.api.remote.response.ConnectionUnregisterResponse; import com.alibaba.nacos.api.remote.response.Response; @@ -60,7 +61,7 @@ import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR; @SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") public abstract class RpcClient implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class); + private static final Logger LOGGER = LoggerFactory.getLogger("com.alibaba.nacos.common.remote.client"); private ServerListFactory serverListFactory; @@ -69,7 +70,7 @@ public abstract class RpcClient implements Closeable { protected volatile AtomicReference rpcClientStatus = new AtomicReference( RpcClientStatus.WAIT_INIT); - protected ScheduledExecutorService executorService; + protected ScheduledExecutorService executor; protected volatile Connection currentConnection; @@ -77,6 +78,10 @@ public abstract class RpcClient implements Closeable { private String name; + private static final int RETRY_TIMES = 3; + + private static final long DEFAULT_TIMEOUT_MILLS = 3000L; + /** * listener called where connect status changed. */ @@ -127,7 +132,12 @@ public abstract class RpcClient implements Closeable { } LoggerUtils.printIfInfoEnabled(LOGGER, "Notify disconnected event to listeners"); for (ConnectionEventListener connectionEventListener : connectionEventListeners) { - connectionEventListener.onDisConnect(); + try { + connectionEventListener.onDisConnect(); + } catch (Throwable throwable) { + LoggerUtils.printIfErrorEnabled(LOGGER, "notify disconnect listener error,listener ={}", + connectionEventListener.getClass().getName()); + } } } @@ -140,7 +150,12 @@ public abstract class RpcClient implements Closeable { } LoggerUtils.printIfInfoEnabled(LOGGER, "Notify connected event to listeners."); for (ConnectionEventListener connectionEventListener : connectionEventListeners) { - connectionEventListener.onConnected(); + try { + connectionEventListener.onConnected(); + } catch (Throwable throwable) { + LoggerUtils.printIfErrorEnabled(LOGGER, "notify connect listener error,listener ={}", + connectionEventListener.getClass().getName()); + } } } @@ -207,7 +222,7 @@ public abstract class RpcClient implements Closeable { return; } - executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { + executor = new ScheduledThreadPoolExecutor(0, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); @@ -218,7 +233,7 @@ public abstract class RpcClient implements Closeable { }); // connection event consumer. - executorService.submit(new Runnable() { + executor.submit(new Runnable() { @Override public void run() { while (true) { @@ -241,7 +256,7 @@ public abstract class RpcClient implements Closeable { Connection connectToServer = null; rpcClientStatus.set(RpcClientStatus.STARTING); - int startUpRetryTimes = 3; + int startUpRetryTimes = RETRY_TIMES; while (startUpRetryTimes > 0 && connectToServer == null) { try { startUpRetryTimes--; @@ -268,38 +283,7 @@ public abstract class RpcClient implements Closeable { switchServerAsync(); } - registerServerPushResponseHandler(new ServerRequestHandler() { - @Override - public Response requestReply(Request request, RequestMeta requestMeta) { - if (request instanceof ConnectResetRequest) { - - try { - synchronized (this) { - if (isRunning()) { - ConnectResetRequest connectResetRequest = (ConnectResetRequest) request; - if (StringUtils.isNotBlank(connectResetRequest.getServerIp()) && NumberUtil - .isDigits(connectResetRequest.getServerPort())) { - - ServerInfo serverInfo = new ServerInfo(); - - serverInfo.setServerIp(connectResetRequest.getServerIp()); - serverInfo.setServerPort( - Integer.valueOf(connectResetRequest.getServerPort()) + rpcPortOffset()); - switchServerAsync(serverInfo); - } else { - switchServerAsync(); - } - } - } - } catch (Exception e) { - LoggerUtils.printIfErrorEnabled(LOGGER, "Switch server error ", e); - } - return new ConnectResetResponse(); - } - return null; - } - - }); + registerServerPushResponseHandler(new ConnectResetRequestHandler()); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { @@ -314,9 +298,43 @@ public abstract class RpcClient implements Closeable { } + class ConnectResetRequestHandler implements ServerRequestHandler { + + @Override + public Response requestReply(Request request, RequestMeta requestMeta) { + + if (request instanceof ConnectResetRequest) { + + try { + synchronized (RpcClient.this) { + if (isRunning()) { + ConnectResetRequest connectResetRequest = (ConnectResetRequest) request; + if (StringUtils.isNotBlank(connectResetRequest.getServerIp()) && NumberUtil + .isDigits(connectResetRequest.getServerPort())) { + + ServerInfo serverInfo = new ServerInfo(); + + serverInfo.setServerIp(connectResetRequest.getServerIp()); + serverInfo.setServerPort( + Integer.valueOf(connectResetRequest.getServerPort()) + rpcPortOffset()); + switchServerAsync(serverInfo, false); + } else { + switchServerAsync(); + } + } + } + } catch (Exception e) { + LoggerUtils.printIfErrorEnabled(LOGGER, "Switch server error ", e); + } + return new ConnectResetResponse(); + } + return null; + } + } + @Override public void shutdown() throws NacosException { - executorService.shutdown(); + executor.shutdown(); rpcClientStatus.set(RpcClientStatus.SHUTDOWN); closeConnection(currentConnection); } @@ -325,20 +343,38 @@ public abstract class RpcClient implements Closeable { private volatile AtomicBoolean switchingFlag = new AtomicBoolean(false); + private boolean serverCheck() { + ServerCheckRequest serverCheckRequest = new ServerCheckRequest(); + try { + Response response = this.currentConnection.request(serverCheckRequest, buildMeta()); + return response == null ? false : response.isSuccess(); + } catch (NacosException e) { + //ignore + } + return false; + } + + public void switchServerAsyncOnRequestFail() { + switchServerAsync(null, true); + } + public void switchServerAsync() { - switchServerAsync(null); + switchServerAsync(null, false); } /** * switch server . */ - protected void switchServerAsync(final ServerInfo recommendServerInfo) { + protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) { //return if is in switching of other thread. if (switchingFlag.get()) { return; } - executorService.submit(new Runnable() { + LoggerUtils.printIfInfoEnabled(LOGGER, + String.format("[%s] Submit server switch task : %s,onRequestFail=%s", name, recommendServerInfo, + onRequestFail)); + executor.submit(new Runnable() { @Override public void run() { @@ -350,6 +386,17 @@ public abstract class RpcClient implements Closeable { if (!innerLock) { return; } + + if (onRequestFail && serverCheck()) { + LoggerUtils.printIfInfoEnabled(LOGGER, + String.format("[%s] Server check success : %s", name, recommendServer)); + rpcClientStatus.set(RpcClientStatus.RUNNING); + return; + } + + LoggerUtils.printIfInfoEnabled(LOGGER, + String.format("[%s] Execute server switch task : %s", name, recommendServer)); + switchingFlag.compareAndSet(false, true); // loop until start client success. boolean switchSuccess = false; @@ -361,20 +408,20 @@ public abstract class RpcClient implements Closeable { //1.get a new server ServerInfo serverInfo = null; - //2.create a new channel to new server try { serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get(); - - Connection connectNew = connectToServer(serverInfo); - if (connectNew != null) { + //2.create a new channel to new server + Connection connectionNew = connectToServer(serverInfo); + if (connectionNew != null) { LoggerUtils.printIfInfoEnabled(LOGGER, String.format("[%s] success to connect server : %s", name, serverInfo)); //successfully create a new connect. if (currentConnection != null) { + //set current connection to enable connection event. currentConnection.setAbandon(true); closeConnection(currentConnection); } - currentConnection = connectNew; + currentConnection = connectionNew; rpcClientStatus.set(RpcClientStatus.RUNNING); switchSuccess = true; boolean s = eventLinkedBlockingQueue @@ -382,9 +429,9 @@ public abstract class RpcClient implements Closeable { return; } - //close connetion if client is already shutdown. + //close connection if client is already shutdown. if (isShutdown()) { - closeConnection(connectNew); + closeConnection(currentConnection); } lastException = null; @@ -401,7 +448,7 @@ public abstract class RpcClient implements Closeable { "[%s] fail to connect server,after trying %s times, last try server is %s", name, reConnectTimes, serverInfo)); if (Integer.MAX_VALUE == retryTurns) { - retryTurns = 10; + retryTurns = 50; } else { retryTurns++; } @@ -410,10 +457,10 @@ public abstract class RpcClient implements Closeable { reConnectTimes++; try { - //sleep 100 millsecond to switch next server. + //sleep x milliseconds to switch next server. if (!isRunning()) { - // first round ,try servers at a delay 100ms;second round ,200ms; max delays 1s. to be reconsidered.基本上会快速收敛到几个可用的IP - Thread.sleep(Math.min(retryTurns + 1, 10) * 100L); + // first round ,try servers at a delay 100ms;second round ,200ms; max delays 5s. to be reconsidered. + Thread.sleep(Math.min(retryTurns + 1, 50) * 100L); } } catch (InterruptedException e) { // Do nothing. @@ -475,7 +522,7 @@ public abstract class RpcClient implements Closeable { * @return response from server. */ public Response request(Request request) throws NacosException { - return request(request, 3000L); + return request(request, DEFAULT_TIMEOUT_MILLS); } /** @@ -485,13 +532,14 @@ public abstract class RpcClient implements Closeable { * @return response from server. */ public Response request(Request request, long timeoutMills) throws NacosException { - int retryTimes = 3; + int retryTimes = 1; Response response = null; Exception exceptionToThrow = null; - while (retryTimes > 0) { + long start = System.currentTimeMillis(); + while (retryTimes < RETRY_TIMES && (System.currentTimeMillis() - start) < timeoutMills) { try { if (this.currentConnection == null || !isRunning()) { - throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected."); + throw new NacosException(NacosException.CLIENT_DISCONNECT, "Client not connected."); } response = this.currentConnection.request(request, buildMeta()); @@ -509,50 +557,69 @@ public abstract class RpcClient implements Closeable { } } catch (Exception e) { - LoggerUtils.printIfErrorEnabled(LOGGER, "Fail to send request, request={}, errorMessage={}", request, - e.getMessage()); exceptionToThrow = e; + if (e instanceof NacosException + && ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) { + // Do nothing. + } else { + LoggerUtils + .printIfErrorEnabled(LOGGER, "send request fail, request={}, retryTimes={},errorMessage={}", + request, retryTimes, e.getMessage()); + } } - retryTimes--; + retryTimes++; } if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { - switchServerAsync(); + switchServerAsyncOnRequestFail(); } if (exceptionToThrow != null) { - throw new NacosException(SERVER_ERROR, exceptionToThrow); + throw (exceptionToThrow instanceof NacosException) ? (NacosException) exceptionToThrow + : new NacosException(SERVER_ERROR, exceptionToThrow); } return null; } /** - * send aync request. + * send async request. * * @param request request. */ public void asyncRequest(Request request, RequestCallBack callback) throws NacosException { - int retryTimes = 3; + int retryTimes = 0; Exception exceptionToThrow = null; - while (retryTimes > 0) { + long start = System.currentTimeMillis(); + while (retryTimes < RETRY_TIMES && System.currentTimeMillis() > start + callback.getTimeout()) { try { - if (this.currentConnection == null) { + if (this.currentConnection == null || !isRunning()) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected."); } this.currentConnection.asyncRequest(request, buildMeta(), callback); return; } catch (Exception e) { - LoggerUtils.printIfErrorEnabled(LOGGER, "Fail to send request, request={}, error Message={}", request, - e.getMessage()); exceptionToThrow = e; + if (e instanceof NacosException + && ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) { + // Do nothing. + } else { + LoggerUtils + .printIfErrorEnabled(LOGGER, "send request fail, request={}, retryTimes={},errorMessage={}", + request, retryTimes, e.getMessage()); + } } - retryTimes--; + retryTimes++; } + + if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { + switchServerAsyncOnRequestFail(); + } if (exceptionToThrow != null) { - throw new NacosException(SERVER_ERROR, exceptionToThrow); + throw (exceptionToThrow instanceof NacosException) ? (NacosException) exceptionToThrow + : new NacosException(SERVER_ERROR, exceptionToThrow); } } @@ -563,11 +630,38 @@ public abstract class RpcClient implements Closeable { * @return request future. */ public RequestFuture requestFuture(Request request) throws NacosException { - if (this.currentConnection == null) { - throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected."); + int retryTimes = 0; + long start = System.currentTimeMillis(); + Exception exceptionToThrow = null; + while (retryTimes < RETRY_TIMES && System.currentTimeMillis() > start + DEFAULT_TIMEOUT_MILLS) { + try { + if (this.currentConnection == null || !isRunning()) { + throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected."); + } + RequestFuture requestFuture = this.currentConnection.requestFuture(request, buildMeta()); + return requestFuture; + } catch (Exception e) { + exceptionToThrow = e; + if (e instanceof NacosException + && ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) { + // Do nothing. + } else { + LoggerUtils + .printIfErrorEnabled(LOGGER, "send request fail, request={}, retryTimes={},errorMessage={}", + request, retryTimes, e.getMessage()); + } + } } - RequestFuture requestFuture = this.currentConnection.requestFuture(request, buildMeta()); - return requestFuture; + + if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { + switchServerAsyncOnRequestFail(); + } + + if (exceptionToThrow != null) { + throw (exceptionToThrow instanceof NacosException) ? (NacosException) exceptionToThrow + : new NacosException(SERVER_ERROR, exceptionToThrow); + } + return null; } @@ -588,11 +682,19 @@ public abstract class RpcClient implements Closeable { */ protected Response handleServerRequest(final Request request, final RequestMeta meta) { + LoggerUtils.printIfInfoEnabled(LOGGER, "receive server push request,request={},requestId={}", + request.getClass().getSimpleName(), request.getRequestId()); for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) { - Response response = serverRequestHandler.requestReply(request, meta); - if (response != null) { - return response; + try { + Response response = serverRequestHandler.requestReply(request, meta); + if (response != null) { + return response; + } + } catch (Exception e) { + LoggerUtils.printIfInfoEnabled(LOGGER, "handleServerRequest:{}, errorMessage={}", + serverRequestHandler.getClass().getName(), e.getMessage()); } + } return null; } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java index 2b7872950..ffa4bc64f 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java @@ -68,7 +68,7 @@ public class RpcClientFactory { * * @param clientName client name. * @param connectionType client type. - * @return + * @return rpc client. */ public static RpcClient createClient(String clientName, ConnectionType connectionType, Map labels) { String clientNameInner = clientName; @@ -97,7 +97,7 @@ public class RpcClientFactory { * * @param clientName client name. * @param connectionType client type. - * @return + * @return rpc client. */ public static RpcClient createClusterClient(String clientName, ConnectionType connectionType, Map labels) { diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerListFactory.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerListFactory.java index 47699559c..4a8c957fd 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerListFactory.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerListFactory.java @@ -28,20 +28,20 @@ public interface ServerListFactory { /** * switch to a new server and get it. * - * @return + * @return server " ip:port". */ String genNextServer(); /** * get current server. - * @return + * @return server " ip:port". */ String getCurrentServer(); /** * get current server. * - * @return + * @return servers. */ List getServerList(); diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java index 3db3c82db..06941c421 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java @@ -34,6 +34,7 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.grpc.internal.GrpcUtil; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,8 +71,9 @@ public abstract class GrpcClient extends RpcClient { */ private RequestGrpc.RequestFutureStub createNewChannelStub(String serverIp, int serverPort) { - ManagedChannel managedChannelTemp = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext() - .build(); + ManagedChannelBuilder o = ManagedChannelBuilder.forAddress(serverIp, serverPort) + .executor(GrpcUtil.SHARED_CHANNEL_EXECUTOR.create()).usePlaintext(); + ManagedChannel managedChannelTemp = o.build(); RequestGrpc.RequestFutureStub grpcServiceStubTemp = RequestGrpc.newFutureStub(managedChannelTemp); @@ -131,6 +133,7 @@ public abstract class GrpcClient extends RpcClient { GrpcUtils.PlainRequest parse = GrpcUtils.parse(payload); final Request request = (Request) parse.getBody(); if (request != null) { + try { Response response = handleServerRequest(request, parse.metadata); if (response != null) { @@ -146,6 +149,7 @@ public abstract class GrpcClient extends RpcClient { payload.toString()); sendResponse(request.getRequestId(), false); } + } } catch (Exception e) { @@ -157,8 +161,10 @@ public abstract class GrpcClient extends RpcClient { @Override public void onError(Throwable throwable) { - if (isRunning() && !grpcConn.isAbandon()) { - LoggerUtils.printIfErrorEnabled(LOGGER, "Request stream error, switch server", throwable); + boolean isRunning = isRunning(); + boolean isAbandon = grpcConn.isAbandon(); + if (isRunning && !isAbandon) { + LoggerUtils.printIfErrorEnabled(LOGGER, "Request stream error, switch server,error={}", throwable); if (throwable instanceof StatusRuntimeException) { Status.Code code = ((StatusRuntimeException) throwable).getStatus().getCode(); if (Status.UNAVAILABLE.getCode().equals(code) || Status.CANCELLED.getCode().equals(code)) { @@ -168,20 +174,24 @@ public abstract class GrpcClient extends RpcClient { } } } else { - LoggerUtils.printIfWarnEnabled(LOGGER, "Client is not running status, ignore error event"); + LoggerUtils.printIfWarnEnabled(LOGGER, "ignore error event,isRunning:{},isAbandon={}", isRunning, + isAbandon); } } @Override public void onCompleted() { - if (isRunning() && !grpcConn.isAbandon()) { + boolean isRunning = isRunning(); + boolean isAbandon = grpcConn.isAbandon(); + if (isRunning && !isAbandon) { LoggerUtils.printIfErrorEnabled(LOGGER, "Request stream onCompleted, switch server"); if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { switchServerAsync(); } } else { - LoggerUtils.printIfErrorEnabled(LOGGER, "Client is not running status, ignore complete event"); + LoggerUtils.printIfInfoEnabled(LOGGER, "ignore complete event,isRunning:{},isAbandon={}", isRunning, + isAbandon); } } @@ -215,7 +225,7 @@ public abstract class GrpcClient extends RpcClient { BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc .newStub(newChannelStubTemp.getChannel()); - GrpcConnection grpcConn = new GrpcConnection(serverInfo); + GrpcConnection grpcConn = new GrpcConnection(serverInfo, super.executor); //create stream request and bind connection event to this connection. StreamObserver payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java index 56d91e6b5..9db56c68b 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java @@ -37,6 +37,7 @@ import org.checkerframework.checker.nullness.compatqual.NullableDecl; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -53,6 +54,8 @@ public class GrpcConnection extends Connection { */ protected ManagedChannel channel; + Executor executor; + /** * stub to send request. */ @@ -60,8 +63,9 @@ public class GrpcConnection extends Connection { protected StreamObserver payloadStreamObserver; - public GrpcConnection(RpcClient.ServerInfo serverInfo) { + public GrpcConnection(RpcClient.ServerInfo serverInfo, Executor executor) { super(serverInfo); + this.executor = executor; } @Override @@ -72,7 +76,6 @@ public class GrpcConnection extends Connection { @Override public Response request(Request request, RequestMeta requestMeta, long timeouts) throws NacosException { Payload grpcRequest = GrpcUtils.convert(request, requestMeta); - ListenableFuture requestFuture = grpcFutureServiceStub.request(grpcRequest); Payload grpcResponse = null; try { @@ -155,16 +158,16 @@ public class GrpcConnection extends Connection { public void onFailure(Throwable throwable) { if (throwable instanceof CancellationException) { requestCallBack.onException( - new TimeoutException("Timeout after " + requestCallBack.getTimeout() + " millseconds.")); + new TimeoutException("Timeout after " + requestCallBack.getTimeout() + " milliseconds.")); } else { requestCallBack.onException(throwable); } } - }, RpcScheduledExecutor.AYNS_REQUEST_EXECUTOR); + }, this.executor); // set timeout future. ListenableFuture payloadListenableFuture = Futures .withTimeout(requestFuture, requestCallBack.getTimeout(), TimeUnit.MILLISECONDS, - RpcScheduledExecutor.TIMEOUT_SHEDULER); + RpcScheduledExecutor.TIMEOUT_SCHEDULER); } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcUtils.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcUtils.java index 0f0357132..b87955a52 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcUtils.java @@ -16,6 +16,8 @@ package com.alibaba.nacos.common.remote.client.grpc; +import com.alibaba.nacos.api.common.Constants; +import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.runtime.NacosDeserializationException; import com.alibaba.nacos.api.exception.runtime.NacosSerializationException; import com.alibaba.nacos.api.grpc.auto.Metadata; @@ -24,6 +26,7 @@ import com.alibaba.nacos.api.remote.PayloadRegistry; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.remote.exception.RemoteException; import com.alibaba.nacos.common.utils.VersionUtils; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; @@ -33,6 +36,7 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import java.io.IOException; +import java.nio.charset.Charset; /** * grpc utils, use to parse request and response. @@ -86,7 +90,7 @@ public class GrpcUtils { * * @param request request. * @param meta request meta. - * @return + * @return payload. */ public static Payload convert(Request request, RequestMeta meta) { //meta. @@ -99,12 +103,13 @@ public class GrpcUtils { .setType(request.getClass().getName()); } builder.setMetadata(metaBuilder.build()); - + // request body . request.clearHeaders(); String jsonString = toJson(request); - - Payload payload = builder.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(jsonString))).build(); + Payload payload = builder + .setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE)))) + .build(); return payload; } @@ -114,18 +119,18 @@ public class GrpcUtils { * * @param request request. * @param meta meta - * @return + * @return payload. */ public static Payload convert(Request request, Metadata meta) { - + Metadata buildMeta = meta.toBuilder().putAllHeaders(request.getHeaders()).build(); request.clearHeaders(); String jsonString = toJson(request); - + Payload.Builder builder = Payload.newBuilder(); - Payload payload = builder.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(jsonString))) - .setMetadata(buildMeta) - .build(); + Payload payload = builder + .setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE)))) + .setMetadata(buildMeta).build(); return payload; } @@ -134,15 +139,16 @@ public class GrpcUtils { * convert response to payload. * * @param response response. - * @return + * @return payload. */ public static Payload convert(Response response) { String jsonString = toJson(response); - + Metadata.Builder metaBuilder = Metadata.newBuilder(); metaBuilder.setClientVersion(VersionUtils.getFullClientVersion()).setType(response.getClass().getName()); - - Payload payload = Payload.newBuilder().setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(jsonString))) + + Payload payload = Payload.newBuilder() + .setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE)))) .setMetadata(metaBuilder.build()).build(); return payload; } @@ -151,19 +157,21 @@ public class GrpcUtils { * parse payload to request/response model. * * @param payload payload to be parsed. - * @return + * @return payload */ public static PlainRequest parse(Payload payload) { PlainRequest plainRequest = new PlainRequest(); - Class classbyType = PayloadRegistry.getClassByType(payload.getMetadata().getType()); - if (classbyType != null) { - Object obj = toObj(payload.getBody().getValue().toStringUtf8(), classbyType); + Class classyType = PayloadRegistry.getClassByType(payload.getMetadata().getType()); + if (classyType != null) { + Object obj = toObj(payload.getBody().getValue().toString(Charset.forName(Constants.ENCODE)), classyType); if (obj instanceof Request) { ((Request) obj).putAllHeader(payload.getMetadata().getHeadersMap()); } plainRequest.body = obj; + } else { + throw new RemoteException(NacosException.SERVER_ERROR, "unknown payload type:" + classyType); } - + plainRequest.type = payload.getMetadata().getType(); plainRequest.metadata = convertMeta(payload.getMetadata()); return plainRequest; diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java index 6a0c9877f..c88e3f84e 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketConnection.java @@ -122,7 +122,7 @@ public class RsocketConnection extends Connection { private static CompletableFuture failAfter(final long timeouts) { final CompletableFuture promise = new CompletableFuture(); - RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable() { + RpcScheduledExecutor.TIMEOUT_SCHEDULER.schedule(new Callable() { @Override public Object call() throws Exception { final TimeoutException ex = new TimeoutException("Timeout after " + timeouts); diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/IoUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/IoUtils.java index 49402dcd0..5b31227bb 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/IoUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/IoUtils.java @@ -109,7 +109,7 @@ public class IoUtils { * * @param str strings to be compressed. * @param encoding encoding. - * @return + * @return byte[] */ public static byte[] tryCompress(String str, String encoding) { if (str == null || str.length() == 0) { diff --git a/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java b/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java index 0492a1262..ef224b4a5 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java @@ -185,7 +185,7 @@ public class ConfigController { } /** - * Get configure board infomation fail. + * Get configure board information fail. * * @throws ServletException ServletException. * @throws IOException IOException. diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java index 0f0003969..7789d3fd8 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java @@ -17,7 +17,7 @@ package com.alibaba.nacos.config.server.remote; import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest; -import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse; +import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.auth.annotation.Secured; @@ -49,7 +49,7 @@ import java.util.Map; * @version $Id: ConfigPublishRequestHandler.java, v 0.1 2020年07月16日 4:41 PM liuzunfei Exp $ */ @Component -public class ConfigPublishRequestHandler extends RequestHandler { +public class ConfigPublishRequestHandler extends RequestHandler { private final PersistService persistService; @@ -59,7 +59,7 @@ public class ConfigPublishRequestHandler extends RequestHandler clientConnectionEventListeners = new ArrayList(); - protected ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("com.alibaba.nacos.core.remote.client.connection.notifier"); - t.setDaemon(true); - return t; - } - }); - /** * notify where a new client connected. * * @param connection connection that new created. */ public void notifyClientConnected(final Connection connection) { - executorService.schedule(new Runnable() { - @Override - public void run() { - for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) { - clientConnectionEventListener.clientConnected(connection); - } + + for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) { + try { + clientConnectionEventListener.clientConnected(connection); + } catch (Throwable throwable) { + Loggers.REMOTE + .info("[NotifyClientConnected] failed for listener {}", clientConnectionEventListener.getName(), + throwable); + } - }, 0L, TimeUnit.MILLISECONDS); + } + } /** @@ -69,18 +59,16 @@ public class ClientConnectionEventListenerRegistry { * @param connection connection that disconnected. */ public void notifyClientDisConnected(final Connection connection) { - executorService.schedule(new Runnable() { - @Override - public void run() { - for (ClientConnectionEventListener each : clientConnectionEventListeners) { - try { - each.clientDisConnected(connection); - } catch (Exception e) { - Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}", each.getName(), e); - } - } + + for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) { + try { + clientConnectionEventListener.clientDisConnected(connection); + } catch (Throwable throwable) { + Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}", + clientConnectionEventListener.getName(), throwable); } - }, 0L, TimeUnit.MILLISECONDS); + } + } /** diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java index 310d7b708..a40c200fb 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java @@ -162,7 +162,7 @@ public class ConnectionManager { @PostConstruct public void start() { - // Start UnHeathy Conection Expel Task. + // Start UnHealthy Connection Expel Task. RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() { @Override public void run() { @@ -215,7 +215,7 @@ public class ConnectionManager { } } catch (Throwable e) { - Loggers.REMOTE.error("error occurs when heathy check... ", e); + Loggers.REMOTE.error("error occurs when healthy check... ", e); } } }, 1000L, 3000L, TimeUnit.MILLISECONDS); @@ -268,7 +268,7 @@ public class ConnectionManager { /** * get all client count. * - * @return + * @return client count. */ public int currentClientsCount() { return connections.size(); @@ -322,7 +322,7 @@ public class ConnectionManager { /** * check if over limit. * - * @return + * @return over limit or not. */ public boolean isOverLimit() { return maxClient > 0 && this.connections.size() >= maxClient; diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RequestHandlerRegistry.java b/core/src/main/java/com/alibaba/nacos/core/remote/RequestHandlerRegistry.java index 12a6eb0ee..1a74508ce 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RequestHandlerRegistry.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RequestHandlerRegistry.java @@ -41,10 +41,10 @@ public class RequestHandlerRegistry implements ApplicationContextAware { Map registryHandlers = new HashMap(); /** - * Get Reuquest Handler By request Type. + * Get Request Handler By request Type. * * @param requestType see definitions of sub constants classes of RequestTypeConstants - * @return + * @return request handler. */ public RequestHandler getByRequestType(String requestType) { if (!registryHandlers.containsKey(requestType)) { diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java index ca6c1b82e..7fb89d7d5 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java @@ -131,7 +131,7 @@ public abstract class BaseGrpcServer extends BaseRpcServer { .set(TRANS_KEY_CONN_ID, UuidUtils.generateUuid()).set(TRANS_KEY_CLIENT_PORT, remotePort) .set(TRANS_KEY_LOCAL_PORT, localPort).build(); String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID); - Loggers.REMOTE.info("Connection transportReady, connectionId = {}", connectionId); + Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId); return attrWrapper; } @@ -139,7 +139,7 @@ public abstract class BaseGrpcServer extends BaseRpcServer { @Override public void transportTerminated(Attributes transportAttrs) { String connectionId = transportAttrs.get(TRANS_KEY_CONN_ID); - Loggers.REMOTE.info("Connection transportTerminated, connectionId = {}", connectionId); + Loggers.REMOTE_DIGEST.info("Connection transportTerminated,connectionId = {} ", connectionId); connectionManager.unregister(connectionId); } }).build(); diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java index 2933f68d5..8efd4086d 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java @@ -60,7 +60,7 @@ public class RsocketConnection extends Connection { private static CompletableFuture failAfter(final long timeouts) { final CompletableFuture promise = new CompletableFuture(); - RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable() { + RpcScheduledExecutor.TIMEOUT_SCHEDULER.schedule(new Callable() { @Override public Object call() throws Exception { final TimeoutException ex = new TimeoutException("Timeout after " + timeouts); diff --git a/core/src/main/java/com/alibaba/nacos/core/utils/StringPool.java b/core/src/main/java/com/alibaba/nacos/core/utils/StringPool.java index e3e708efc..e3e0b1781 100644 --- a/core/src/main/java/com/alibaba/nacos/core/utils/StringPool.java +++ b/core/src/main/java/com/alibaba/nacos/core/utils/StringPool.java @@ -36,7 +36,7 @@ public class StringPool { * get singleton string value from the pool. * * @param key key string to be pooled. - * @return + * @return value after pooled. */ public static String get(String key) { if (key == null) {