diff --git a/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java b/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java index 4a7fa4339..4c9657d59 100644 --- a/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java @@ -140,7 +140,7 @@ public class ClientWorkerTest { clientWorker.removeConfig(dataId, group, tenant, tag); Assert.fail(); } catch (NacosException e) { - Assert.assertEquals("Client not connected,current status:STARTING", e.getErrMsg()); + Assert.assertEquals("Client not connected, current status:STARTING", e.getErrMsg()); Assert.assertEquals(-401, e.getErrCode()); } @@ -148,7 +148,7 @@ public class ClientWorkerTest { clientWorker.getServerConfig(dataId, group, tenant, 100, false); Assert.fail(); } catch (NacosException e) { - Assert.assertEquals("Client not connected,current status:STARTING", e.getErrMsg()); + Assert.assertEquals("Client not connected, current status:STARTING", e.getErrMsg()); Assert.assertEquals(-401, e.getErrCode()); } } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java index b9d9f61b0..d5dfdac26 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java @@ -32,7 +32,9 @@ import com.alibaba.nacos.api.remote.response.ErrorResponse; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.common.lifecycle.Closeable; import com.alibaba.nacos.common.remote.ConnectionType; +import com.alibaba.nacos.common.utils.CollectionUtils; import com.alibaba.nacos.common.utils.LoggerUtils; +import com.alibaba.nacos.common.utils.NumberUtils; import com.alibaba.nacos.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,9 +48,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR; @@ -65,18 +68,18 @@ public abstract class RpcClient implements Closeable { private ServerListFactory serverListFactory; - protected LinkedBlockingQueue eventLinkedBlockingQueue = new LinkedBlockingQueue(); + protected LinkedBlockingQueue eventLinkedBlockingQueue = new LinkedBlockingQueue<>(); - protected volatile AtomicReference rpcClientStatus = new AtomicReference( + protected volatile AtomicReference rpcClientStatus = new AtomicReference<>( RpcClientStatus.WAIT_INIT); protected ScheduledExecutorService clientEventExecutor; - private final BlockingQueue reconnectionSignal = new ArrayBlockingQueue(1); + private final BlockingQueue reconnectionSignal = new ArrayBlockingQueue<>(1); protected volatile Connection currentConnection; - protected Map labels = new HashMap(); + protected Map labels = new HashMap<>(); private String name; @@ -98,12 +101,14 @@ public abstract class RpcClient implements Closeable { /** * listener called where connection's status changed. */ - protected List connectionEventListeners = new ArrayList(); + protected List connectionEventListeners = new ArrayList<>(); /** * handlers to process server push request. */ - protected List serverRequestHandlers = new ArrayList(); + protected List serverRequestHandlers = new ArrayList<>(); + + private static final Pattern EXCLUDE_PROTOCOL_PATTERN = Pattern.compile("(?<=\\w{1,5}://)(.*)"); static { PayloadRegistry.init(); @@ -116,7 +121,7 @@ public abstract class RpcClient implements Closeable { public RpcClient(ServerListFactory serverListFactory) { this.serverListFactory = serverListFactory; rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED); - LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init in constructor, ServerListFactory ={}", + LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init in constructor, ServerListFactory = {}", serverListFactory.getClass().getName()); } @@ -124,7 +129,7 @@ public abstract class RpcClient implements Closeable { this(name); this.serverListFactory = serverListFactory; rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED); - LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init in constructor, ServerListFactory ={}", + LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init in constructor, ServerListFactory = {}", serverListFactory.getClass().getName()); } @@ -139,7 +144,7 @@ public abstract class RpcClient implements Closeable { } /** - * init server list factory.only can init once. + * init server list factory. only can init once. * * @param serverListFactory serverListFactory */ @@ -150,7 +155,7 @@ public abstract class RpcClient implements Closeable { this.serverListFactory = serverListFactory; rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED); - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init, ServerListFactory ={}", name, + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] RpcClient init, ServerListFactory = {}", name, serverListFactory.getClass().getName()); return this; } @@ -162,7 +167,7 @@ public abstract class RpcClient implements Closeable { */ public RpcClient labels(Map labels) { this.labels.putAll(labels); - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init label, labels={}", name, this.labels); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] RpcClient init label, labels = {}", name, this.labels); return this; } @@ -174,7 +179,7 @@ public abstract class RpcClient implements Closeable { */ public RpcClient keepAlive(long keepAliveTime, TimeUnit timeUnit) { this.keepAliveTime = keepAliveTime * timeUnit.toMillis(keepAliveTime); - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init keepalive time, keepAliveTimeMillis={}", name, + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] RpcClient init keepalive time, keepAliveTimeMillis = {}", name, keepAliveTime); return this; } @@ -186,12 +191,12 @@ public abstract class RpcClient implements Closeable { if (connectionEventListeners.isEmpty()) { return; } - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Notify disconnected event to listeners", name); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", name); for (ConnectionEventListener connectionEventListener : connectionEventListeners) { try { connectionEventListener.onDisConnect(); } catch (Throwable throwable) { - LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Notify disconnect listener error,listener ={}", name, + LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", name, connectionEventListener.getClass().getName()); } } @@ -204,12 +209,12 @@ public abstract class RpcClient implements Closeable { if (connectionEventListeners.isEmpty()) { return; } - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Notify connected event to listeners.", name); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", name); for (ConnectionEventListener connectionEventListener : connectionEventListeners) { try { connectionEventListener.onConnected(); } catch (Throwable throwable) { - LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Notify connect listener error,listener ={}", name, + LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", name, connectionEventListener.getClass().getName()); } } @@ -243,7 +248,7 @@ public abstract class RpcClient implements Closeable { } /** - * check if current connected server is in serverlist ,if not switch server. + * check if current connected server is in server list, if not switch server. */ public void onServerListChange() { if (currentConnection != null && currentConnection.serverInfo != null) { @@ -257,7 +262,7 @@ public abstract class RpcClient implements Closeable { } if (!found) { LoggerUtils.printIfInfoEnabled(LOGGER, - "Current connected server {} is not in latest server list,switch switchServerAsync", + "Current connected server {} is not in latest server list, switch switchServerAsync", serverInfo.getAddress()); switchServerAsync(); } @@ -275,110 +280,101 @@ public abstract class RpcClient implements Closeable { return; } - clientEventExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("com.alibaba.nacos.client.remote.worker"); - t.setDaemon(true); - return t; - } + clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> { + Thread t = new Thread(r); + t.setName("com.alibaba.nacos.client.remote.worker"); + t.setDaemon(true); + return t; }); // connection event consumer. - clientEventExecutor.submit(new Runnable() { - @Override - public void run() { - while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) { - ConnectionEvent take = null; - try { - take = eventLinkedBlockingQueue.take(); - if (take.isConnected()) { - notifyConnected(); - } else if (take.isDisConnected()) { - notifyDisConnected(); - } - } catch (Throwable e) { - //Do nothing + clientEventExecutor.submit(() -> { + while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) { + ConnectionEvent take; + try { + take = eventLinkedBlockingQueue.take(); + if (take.isConnected()) { + notifyConnected(); + } else if (take.isDisConnected()) { + notifyDisConnected(); } + } catch (Throwable e) { + // Do nothing } } }); - clientEventExecutor.submit(new Runnable() { - @Override - public void run() { - while (true) { - try { - if (isShutdown()) { - break; - } - ReconnectContext reconnectContext = reconnectionSignal - .poll(keepAliveTime, TimeUnit.MILLISECONDS); - if (reconnectContext == null) { - //check alive time. - if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) { - boolean isHealthy = healthCheck(); - if (!isHealthy) { - if (currentConnection == null) { - continue; - } - LoggerUtils.printIfInfoEnabled(LOGGER, - "[{}]Server healthy check fail,currentConnection={}", name, - currentConnection.getConnectionId()); - - RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get(); - if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) { - break; - } - - boolean success = RpcClient.this.rpcClientStatus - .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY); - if (success) { - reconnectContext = new ReconnectContext(null, false); - } else { - continue; - } - - } else { - lastActiveTimeStamp = System.currentTimeMillis(); + clientEventExecutor.submit(() -> { + while (true) { + try { + if (isShutdown()) { + break; + } + ReconnectContext reconnectContext = reconnectionSignal + .poll(keepAliveTime, TimeUnit.MILLISECONDS); + if (reconnectContext == null) { + // check alive time. + if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) { + boolean isHealthy = healthCheck(); + if (!isHealthy) { + if (currentConnection == null) { continue; } - } else { - continue; - } - - } - - if (reconnectContext.serverInfo != null) { - //clear recommend server if server is not in server list. - boolean serverExist = false; - for (String server : getServerListFactory().getServerList()) { - ServerInfo serverInfo = resolveServerInfo(server); - if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) { - serverExist = true; - reconnectContext.serverInfo.serverPort = serverInfo.serverPort; + LoggerUtils.printIfInfoEnabled(LOGGER, + "[{}] Server healthy check fail, currentConnection = {}", name, + currentConnection.getConnectionId()); + + RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get(); + if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) { break; } + + boolean statusFLowSuccess = RpcClient.this.rpcClientStatus + .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY); + if (statusFLowSuccess) { + reconnectContext = new ReconnectContext(null, false); + } else { + continue; + } + + } else { + lastActiveTimeStamp = System.currentTimeMillis(); + continue; } - if (!serverExist) { - LoggerUtils.printIfInfoEnabled(LOGGER, - "[{}] Recommend server is not in server list ,ignore recommend server {}", name, - reconnectContext.serverInfo.getAddress()); - - reconnectContext.serverInfo = null; - + } else { + continue; + } + + } + + if (reconnectContext.serverInfo != null) { + // clear recommend server if server is not in server list. + boolean serverExist = false; + for (String server : getServerListFactory().getServerList()) { + ServerInfo serverInfo = resolveServerInfo(server); + if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) { + serverExist = true; + reconnectContext.serverInfo.serverPort = serverInfo.serverPort; + break; } } - reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail); - } catch (Throwable throwable) { - //Do nothing + if (!serverExist) { + LoggerUtils.printIfInfoEnabled(LOGGER, + "[{}] Recommend server is not in server list, ignore recommend server {}", name, + reconnectContext.serverInfo.getAddress()); + + reconnectContext.serverInfo = null; + + } } + reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail); + } catch (Throwable throwable) { + // Do nothing } } }); - //connect to server ,try to connect to server sync once, async starting if fail. + // connect to server, try to connect to server sync RETRY_TIMES times, async starting if failed. Connection connectToServer = null; rpcClientStatus.set(RpcClientStatus.STARTING); @@ -394,14 +390,14 @@ public abstract class RpcClient implements Closeable { connectToServer = connectToServer(serverInfo); } catch (Throwable e) { LoggerUtils.printIfWarnEnabled(LOGGER, - "[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}", + "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes); } } if (connectToServer != null) { - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up,connectionId={}", + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}", name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId()); this.currentConnection = connectToServer; rpcClientStatus.set(RpcClientStatus.RUNNING); @@ -412,16 +408,13 @@ public abstract class RpcClient implements Closeable { registerServerRequestHandler(new ConnectResetRequestHandler()); - //register client detection request. - registerServerRequestHandler(new ServerRequestHandler() { - @Override - public Response requestReply(Request request) { - if (request instanceof ClientDetectionRequest) { - return new ClientDetectionResponse(); - } - - return null; + // register client detection request. + registerServerRequestHandler(request -> { + if (request instanceof ClientDetectionRequest) { + return new ClientDetectionResponse(); } + + return null; }); } @@ -448,7 +441,7 @@ public abstract class RpcClient implements Closeable { } } } catch (Exception e) { - LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Switch server error ,{}", name, e); + LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Switch server error, {}", name, e); } return new ConnectResetResponse(); } @@ -458,9 +451,9 @@ public abstract class RpcClient implements Closeable { @Override public void shutdown() throws NacosException { - LOGGER.info("Shutdown rpc client ,set status to shutdown"); + LOGGER.info("Shutdown rpc client, set status to shutdown"); rpcClientStatus.set(RpcClientStatus.SHUTDOWN); - LOGGER.info("Shutdown client event executor " + clientEventExecutor); + LOGGER.info("Shutdown client event executor " + clientEventExecutor); clientEventExecutor.shutdownNow(); LOGGER.info("Close current connection " + currentConnection.getConnectionId()); closeConnection(currentConnection); @@ -473,10 +466,10 @@ public abstract class RpcClient implements Closeable { } try { Response response = this.currentConnection.request(healthCheckRequest, 3000L); - //not only check server is ok ,also check connection is register. + // not only check server is ok, also check connection is register. return response != null && response.isSuccess(); } catch (NacosException e) { - //ignore + // ignore } return false; } @@ -500,16 +493,16 @@ public abstract class RpcClient implements Closeable { try { - AtomicReference recommendServer = new AtomicReference(recommendServerInfo); + AtomicReference recommendServer = new AtomicReference<>(recommendServerInfo); if (onRequestFail && healthCheck()) { - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success,currentServer is{} ", name, + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success, currentServer is {} ", name, currentConnection.serverInfo.getAddress()); rpcClientStatus.set(RpcClientStatus.RUNNING); return; } - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] try to re connect to a new server ,server is {}", name, - recommendServerInfo == null ? " not appointed,will choose a random server." + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to reconnect to a new server, server is {}", name, + recommendServerInfo == null ? " not appointed, will choose a random server." : (recommendServerInfo.getAddress() + ", will try it once.")); // loop until start client success. @@ -517,35 +510,35 @@ public abstract class RpcClient implements Closeable { int reConnectTimes = 0; int retryTurns = 0; - Exception lastException = null; + Exception lastException; while (!switchSuccess && !isShutdown()) { - //1.get a new server + // 1.get a new server ServerInfo serverInfo = null; try { serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get(); - //2.create a new channel to new server + // 2.create a new channel to new server Connection connectionNew = connectToServer(serverInfo); if (connectionNew != null) { - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] success to connect a server [{}],connectionId={}", + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}", name, serverInfo.getAddress(), connectionNew.getConnectionId()); - //successfully create a new connect. + // successfully create a new connect. if (currentConnection != null) { LoggerUtils.printIfInfoEnabled(LOGGER, - "[{}] Abandon prev connection ,server is {}, connectionId is {}", name, + "[{}] Abandon prev connection, server is {}, connectionId is {}", name, currentConnection.serverInfo.getAddress(), currentConnection.getConnectionId()); - //set current connection to enable connection event. + // set current connection to enable connection event. currentConnection.setAbandon(true); closeConnection(currentConnection); } currentConnection = connectionNew; rpcClientStatus.set(RpcClientStatus.RUNNING); switchSuccess = true; - boolean s = eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED)); + eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED)); return; } - //close connection if client is already shutdown. + // close connection if client is already shutdown. if (isShutdown()) { closeConnection(currentConnection); } @@ -561,7 +554,7 @@ public abstract class RpcClient implements Closeable { if (reConnectTimes > 0 && reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) { LoggerUtils.printIfInfoEnabled(LOGGER, - "[{}] fail to connect server,after trying {} times, last try server is {},error={}", name, + "[{}] Fail to connect server, after trying {} times, last try server is {}, error = {}", name, reConnectTimes, serverInfo, lastException == null ? "unknown" : lastException); if (Integer.MAX_VALUE == retryTurns) { retryTurns = 50; @@ -573,22 +566,22 @@ public abstract class RpcClient implements Closeable { reConnectTimes++; try { - //sleep x milliseconds to switch next server. + // sleep x milliseconds to switch next server. if (!isRunning()) { - // first round ,try servers at a delay 100ms;second round ,200ms; max delays 5s. to be reconsidered. + // first round, try servers at a delay 100ms;second round, 200ms; max delays 5s. to be reconsidered. Thread.sleep(Math.min(retryTurns + 1, 50) * 100L); } } catch (InterruptedException e) { - // Do nothing. + // Do nothing. } } if (isShutdown()) { - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Client is shutdown ,stop reconnect to server", name); + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Client is shutdown, stop reconnect to server", name); } } catch (Exception e) { - LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to re connect to server ,error is {}", name, e); + LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to reconnect to server, error is {}", name, e); } } @@ -643,7 +636,7 @@ public abstract class RpcClient implements Closeable { */ public Response request(Request request, long timeoutMills) throws NacosException { int retryTimes = 0; - Response response = null; + Response response; Exception exceptionThrow = null; long start = System.currentTimeMillis(); while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) { @@ -652,7 +645,7 @@ public abstract class RpcClient implements Closeable { if (this.currentConnection == null || !isRunning()) { waitReconnect = true; throw new NacosException(NacosException.CLIENT_DISCONNECT, - "Client not connected,current status:" + rpcClientStatus.get()); + "Client not connected, current status:" + rpcClientStatus.get()); } response = this.currentConnection.request(request, timeoutMills); if (response == null) { @@ -664,7 +657,7 @@ public abstract class RpcClient implements Closeable { waitReconnect = true; if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { LoggerUtils.printIfErrorEnabled(LOGGER, - "Connection is unregistered, switch server,connectionId={},request={}", + "Connection is unregistered, switch server, connectionId = {}, request = {}", currentConnection.getConnectionId(), request.getClass().getSimpleName()); switchServerAsync(); } @@ -680,14 +673,14 @@ public abstract class RpcClient implements Closeable { } catch (Exception e) { if (waitReconnect) { try { - //wait client to re connect. + // wait client to reconnect. Thread.sleep(Math.min(100, timeoutMills / 3)); } catch (Exception exception) { - //Do nothing. + // Do nothing. } } - LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request={}, retryTimes={},errorMessage={}", + LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}", request, retryTimes, e.getMessage()); exceptionThrow = e; @@ -731,14 +724,14 @@ public abstract class RpcClient implements Closeable { } catch (Exception e) { if (waitReconnect) { try { - //wait client to re connect. + // wait client to reconnect. Thread.sleep(Math.min(100, callback.getTimeout() / 3)); } catch (Exception exception) { - //Do nothing. + // Do nothing. } } LoggerUtils - .printIfErrorEnabled(LOGGER, "[{}]Send request fail, request={}, retryTimes={},errorMessage={}", + .printIfErrorEnabled(LOGGER, "[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}", name, request, retryTimes, e.getMessage()); exceptionToThrow = e; @@ -779,14 +772,14 @@ public abstract class RpcClient implements Closeable { } catch (Exception e) { if (waitReconnect) { try { - //wait client to re connect. + // wait client to reconnect. Thread.sleep(100L); } catch (Exception exception) { - //Do nothing. + // Do nothing. } } LoggerUtils - .printIfErrorEnabled(LOGGER, "[{}]Send request fail, request={}, retryTimes={},errorMessage={}", + .printIfErrorEnabled(LOGGER, "[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}", name, request, retryTimes, e.getMessage()); exceptionToThrow = e; @@ -823,7 +816,7 @@ public abstract class RpcClient implements Closeable { */ protected Response handleServerRequest(final Request request) { - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]receive server push request,request={},requestId={}", name, + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Receive server push request, request = {}, requestId = {}", name, request.getClass().getSimpleName(), request.getRequestId()); lastActiveTimeStamp = System.currentTimeMillis(); for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) { @@ -831,12 +824,12 @@ public abstract class RpcClient implements Closeable { Response response = serverRequestHandler.requestReply(request); if (response != null) { - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]ack server push request,request={},requestId={}", name, + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Ack server push request, request = {}, requestId = {}", name, request.getClass().getSimpleName(), request.getRequestId()); return response; } } catch (Exception e) { - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]handleServerRequest:{}, errorMessage={}", name, + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] HandleServerRequest:{}, errorMessage = {}", name, serverRequestHandler.getClass().getName(), e.getMessage()); } @@ -851,7 +844,7 @@ public abstract class RpcClient implements Closeable { */ public synchronized void registerConnectionListener(ConnectionEventListener connectionEventListener) { - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Registry connection listener to current client:{}", name, + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Registry connection listener to current client:{}", name, connectionEventListener.getClass().getName()); this.connectionEventListeners.add(connectionEventListener); } @@ -862,7 +855,7 @@ public abstract class RpcClient implements Closeable { * @param serverRequestHandler serverRequestHandler */ public synchronized void registerServerRequestHandler(ServerRequestHandler serverRequestHandler) { - LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Register server push request handler:{}", name, + LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Register server push request handler:{}", name, serverRequestHandler.getClass().getName()); this.serverRequestHandlers.add(serverRequestHandler); @@ -913,25 +906,16 @@ public abstract class RpcClient implements Closeable { */ @SuppressWarnings("PMD.UndefineMagicConstantRule") private ServerInfo resolveServerInfo(String serverAddress) { - String property = System.getProperty("nacos.server.port", "8848"); - int serverPort = Integer.parseInt(property); - ServerInfo serverInfo = null; - if (serverAddress.contains(Constants.HTTP_PREFIX)) { - String[] split = serverAddress.split(Constants.COLON); - String serverIp = split[1].replaceAll("//", ""); - if (split.length > 2 && StringUtils.isNotBlank(split[2])) { - serverPort = Integer.parseInt(split[2]); - } - serverInfo = new ServerInfo(serverIp, serverPort); - } else { - String[] split = serverAddress.split(Constants.COLON); - String serverIp = split[0]; - if (split.length > 1 && StringUtils.isNotBlank(split[1])) { - serverPort = Integer.parseInt(split[1]); - } - serverInfo = new ServerInfo(serverIp, serverPort); + Matcher matcher = EXCLUDE_PROTOCOL_PATTERN.matcher(serverAddress); + if (matcher.find()) { + serverAddress = matcher.group(1); } - return serverInfo; + + String[] ipPortTuple = serverAddress.split(Constants.COLON, 2); + int defaultPort = Integer.parseInt(System.getProperty("nacos.server.port", "8848")); + String serverPort = CollectionUtils.getOrDefault(ipPortTuple, 1, Integer.toString(defaultPort)); + + return new ServerInfo(ipPortTuple[0], NumberUtils.toInt(serverPort, defaultPort)); } public static class ServerInfo { @@ -996,7 +980,7 @@ public abstract class RpcClient implements Closeable { @Override public String toString() { - return "{serverIp='" + serverIp + '\'' + ", server main port=" + serverPort + '}'; + return "{serverIp = '" + serverIp + '\'' + ", server main port = " + serverPort + '}'; } } diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/CollectionUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/CollectionUtils.java index d703ad1b4..008ccbe69 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/CollectionUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/CollectionUtils.java @@ -17,8 +17,8 @@ package com.alibaba.nacos.common.utils; import java.lang.reflect.Array; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Enumeration; @@ -237,16 +237,16 @@ public final class CollectionUtils { * Returns the value to which the specified index , or {@code defaultValue} if this collection contains no value for * the index. * - * @param coll the collection to get a value from + * @param obj the object to get a value from * @param index the index to get * @param defaultValue default value * @param General Type * @return the value to which the specified index , or {@code defaultValue} if this collection contains no value for * the index. */ - public static T getOrDefault(Collection coll, int index, T defaultValue) { + public static T getOrDefault(Object obj, int index, T defaultValue) { try { - return (T) get(coll, index); + return (T) get(obj, index); } catch (IndexOutOfBoundsException e) { return defaultValue; } diff --git a/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java b/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java index a777577fb..e0dcecf36 100644 --- a/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java +++ b/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java @@ -275,6 +275,8 @@ public class RpcClientTest { "http://10.10.10.10::8848")).getAddress()); Assert.assertEquals("10.10.10.10:8848", ((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient, "http://10.10.10.10")).getAddress()); + Assert.assertEquals("10.10.10.10:8848", ((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient, + "https://10.10.10.10::8848")).getAddress()); } @Test