[ISSUE #6595] Optimize RpcClient (#6596)

* replace explicit type argument with <>

* replace anonymous class with lambda

* remove redundant initializer

* format comment & log and fix typo

* add unit test to RpcClient::resolveServerInfo

* extend CollectionUtils::getOrDefault

* simplify RpcClient::resolveServerInfo

* remove unused variable
This commit is contained in:
孙继峰 2021-08-09 09:42:37 +08:00 committed by GitHub
parent 470b7bb37c
commit f91089eb90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 166 additions and 180 deletions

View File

@ -140,7 +140,7 @@ public class ClientWorkerTest {
clientWorker.removeConfig(dataId, group, tenant, tag);
Assert.fail();
} catch (NacosException e) {
Assert.assertEquals("Client not connected,current status:STARTING", e.getErrMsg());
Assert.assertEquals("Client not connected, current status:STARTING", e.getErrMsg());
Assert.assertEquals(-401, e.getErrCode());
}
@ -148,7 +148,7 @@ public class ClientWorkerTest {
clientWorker.getServerConfig(dataId, group, tenant, 100, false);
Assert.fail();
} catch (NacosException e) {
Assert.assertEquals("Client not connected,current status:STARTING", e.getErrMsg());
Assert.assertEquals("Client not connected, current status:STARTING", e.getErrMsg());
Assert.assertEquals(-401, e.getErrCode());
}
}

View File

@ -32,7 +32,9 @@ import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.NumberUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,9 +48,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR;
@ -65,18 +68,18 @@ public abstract class RpcClient implements Closeable {
private ServerListFactory serverListFactory;
protected LinkedBlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<ConnectionEvent>();
protected LinkedBlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();
protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<RpcClientStatus>(
protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(
RpcClientStatus.WAIT_INIT);
protected ScheduledExecutorService clientEventExecutor;
private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<ReconnectContext>(1);
private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);
protected volatile Connection currentConnection;
protected Map<String, String> labels = new HashMap<String, String>();
protected Map<String, String> labels = new HashMap<>();
private String name;
@ -98,12 +101,14 @@ public abstract class RpcClient implements Closeable {
/**
* listener called where connection's status changed.
*/
protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<ConnectionEventListener>();
protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>();
/**
* handlers to process server push request.
*/
protected List<ServerRequestHandler> serverRequestHandlers = new ArrayList<ServerRequestHandler>();
protected List<ServerRequestHandler> serverRequestHandlers = new ArrayList<>();
private static final Pattern EXCLUDE_PROTOCOL_PATTERN = Pattern.compile("(?<=\\w{1,5}://)(.*)");
static {
PayloadRegistry.init();
@ -116,7 +121,7 @@ public abstract class RpcClient implements Closeable {
public RpcClient(ServerListFactory serverListFactory) {
this.serverListFactory = serverListFactory;
rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED);
LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init in constructor, ServerListFactory ={}",
LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init in constructor, ServerListFactory = {}",
serverListFactory.getClass().getName());
}
@ -124,7 +129,7 @@ public abstract class RpcClient implements Closeable {
this(name);
this.serverListFactory = serverListFactory;
rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED);
LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init in constructor, ServerListFactory ={}",
LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init in constructor, ServerListFactory = {}",
serverListFactory.getClass().getName());
}
@ -139,7 +144,7 @@ public abstract class RpcClient implements Closeable {
}
/**
* init server list factory.only can init once.
* init server list factory. only can init once.
*
* @param serverListFactory serverListFactory
*/
@ -150,7 +155,7 @@ public abstract class RpcClient implements Closeable {
this.serverListFactory = serverListFactory;
rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED);
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init, ServerListFactory ={}", name,
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] RpcClient init, ServerListFactory = {}", name,
serverListFactory.getClass().getName());
return this;
}
@ -162,7 +167,7 @@ public abstract class RpcClient implements Closeable {
*/
public RpcClient labels(Map<String, String> labels) {
this.labels.putAll(labels);
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init label, labels={}", name, this.labels);
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] RpcClient init label, labels = {}", name, this.labels);
return this;
}
@ -174,7 +179,7 @@ public abstract class RpcClient implements Closeable {
*/
public RpcClient keepAlive(long keepAliveTime, TimeUnit timeUnit) {
this.keepAliveTime = keepAliveTime * timeUnit.toMillis(keepAliveTime);
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init keepalive time, keepAliveTimeMillis={}", name,
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] RpcClient init keepalive time, keepAliveTimeMillis = {}", name,
keepAliveTime);
return this;
}
@ -186,12 +191,12 @@ public abstract class RpcClient implements Closeable {
if (connectionEventListeners.isEmpty()) {
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Notify disconnected event to listeners", name);
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", name);
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
try {
connectionEventListener.onDisConnect();
} catch (Throwable throwable) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Notify disconnect listener error,listener ={}", name,
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", name,
connectionEventListener.getClass().getName());
}
}
@ -204,12 +209,12 @@ public abstract class RpcClient implements Closeable {
if (connectionEventListeners.isEmpty()) {
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Notify connected event to listeners.", name);
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", name);
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
try {
connectionEventListener.onConnected();
} catch (Throwable throwable) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Notify connect listener error,listener ={}", name,
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", name,
connectionEventListener.getClass().getName());
}
}
@ -243,7 +248,7 @@ public abstract class RpcClient implements Closeable {
}
/**
* check if current connected server is in serverlist ,if not switch server.
* check if current connected server is in server list, if not switch server.
*/
public void onServerListChange() {
if (currentConnection != null && currentConnection.serverInfo != null) {
@ -257,7 +262,7 @@ public abstract class RpcClient implements Closeable {
}
if (!found) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"Current connected server {} is not in latest server list,switch switchServerAsync",
"Current connected server {} is not in latest server list, switch switchServerAsync",
serverInfo.getAddress());
switchServerAsync();
}
@ -275,110 +280,101 @@ public abstract class RpcClient implements Closeable {
return;
}
clientEventExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
}
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
});
// connection event consumer.
clientEventExecutor.submit(new Runnable() {
@Override
public void run() {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take = null;
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
} catch (Throwable e) {
//Do nothing
clientEventExecutor.submit(() -> {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take;
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
} catch (Throwable e) {
// Do nothing
}
}
});
clientEventExecutor.submit(new Runnable() {
@Override
public void run() {
while (true) {
try {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
//check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}]Server healthy check fail,currentConnection={}", name,
currentConnection.getConnectionId());
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
boolean success = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (success) {
reconnectContext = new ReconnectContext(null, false);
} else {
continue;
}
} else {
lastActiveTimeStamp = System.currentTimeMillis();
clientEventExecutor.submit(() -> {
while (true) {
try {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
} else {
continue;
}
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Server healthy check fail, currentConnection = {}", name,
currentConnection.getConnectionId());
}
if (reconnectContext.serverInfo != null) {
//clear recommend server if server is not in server list.
boolean serverExist = false;
for (String server : getServerListFactory().getServerList()) {
ServerInfo serverInfo = resolveServerInfo(server);
if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
serverExist = true;
reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
reconnectContext = new ReconnectContext(null, false);
} else {
continue;
}
} else {
lastActiveTimeStamp = System.currentTimeMillis();
continue;
}
if (!serverExist) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Recommend server is not in server list ,ignore recommend server {}", name,
reconnectContext.serverInfo.getAddress());
} else {
continue;
}
reconnectContext.serverInfo = null;
}
if (reconnectContext.serverInfo != null) {
// clear recommend server if server is not in server list.
boolean serverExist = false;
for (String server : getServerListFactory().getServerList()) {
ServerInfo serverInfo = resolveServerInfo(server);
if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
serverExist = true;
reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
break;
}
}
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
//Do nothing
if (!serverExist) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Recommend server is not in server list, ignore recommend server {}", name,
reconnectContext.serverInfo.getAddress());
reconnectContext.serverInfo = null;
}
}
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
// Do nothing
}
}
});
//connect to server ,try to connect to server sync once, async starting if fail.
// connect to server, try to connect to server sync RETRY_TIMES times, async starting if failed.
Connection connectToServer = null;
rpcClientStatus.set(RpcClientStatus.STARTING);
@ -394,14 +390,14 @@ public abstract class RpcClient implements Closeable {
connectToServer = connectToServer(serverInfo);
} catch (Throwable e) {
LoggerUtils.printIfWarnEnabled(LOGGER,
"[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}",
"[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
name, e.getMessage(), startUpRetryTimes);
}
}
if (connectToServer != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up,connectionId={}",
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
@ -412,16 +408,13 @@ public abstract class RpcClient implements Closeable {
registerServerRequestHandler(new ConnectResetRequestHandler());
//register client detection request.
registerServerRequestHandler(new ServerRequestHandler() {
@Override
public Response requestReply(Request request) {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
return null;
// register client detection request.
registerServerRequestHandler(request -> {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
return null;
});
}
@ -448,7 +441,7 @@ public abstract class RpcClient implements Closeable {
}
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Switch server error ,{}", name, e);
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Switch server error, {}", name, e);
}
return new ConnectResetResponse();
}
@ -458,9 +451,9 @@ public abstract class RpcClient implements Closeable {
@Override
public void shutdown() throws NacosException {
LOGGER.info("Shutdown rpc client ,set status to shutdown");
LOGGER.info("Shutdown rpc client, set status to shutdown");
rpcClientStatus.set(RpcClientStatus.SHUTDOWN);
LOGGER.info("Shutdown client event executor " + clientEventExecutor);
LOGGER.info("Shutdown client event executor " + clientEventExecutor);
clientEventExecutor.shutdownNow();
LOGGER.info("Close current connection " + currentConnection.getConnectionId());
closeConnection(currentConnection);
@ -473,10 +466,10 @@ public abstract class RpcClient implements Closeable {
}
try {
Response response = this.currentConnection.request(healthCheckRequest, 3000L);
//not only check server is ok ,also check connection is register.
// not only check server is ok, also check connection is register.
return response != null && response.isSuccess();
} catch (NacosException e) {
//ignore
// ignore
}
return false;
}
@ -500,16 +493,16 @@ public abstract class RpcClient implements Closeable {
try {
AtomicReference<ServerInfo> recommendServer = new AtomicReference<ServerInfo>(recommendServerInfo);
AtomicReference<ServerInfo> recommendServer = new AtomicReference<>(recommendServerInfo);
if (onRequestFail && healthCheck()) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success,currentServer is{} ", name,
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success, currentServer is {} ", name,
currentConnection.serverInfo.getAddress());
rpcClientStatus.set(RpcClientStatus.RUNNING);
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] try to re connect to a new server ,server is {}", name,
recommendServerInfo == null ? " not appointed,will choose a random server."
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to reconnect to a new server, server is {}", name,
recommendServerInfo == null ? " not appointed, will choose a random server."
: (recommendServerInfo.getAddress() + ", will try it once."));
// loop until start client success.
@ -517,35 +510,35 @@ public abstract class RpcClient implements Closeable {
int reConnectTimes = 0;
int retryTurns = 0;
Exception lastException = null;
Exception lastException;
while (!switchSuccess && !isShutdown()) {
//1.get a new server
// 1.get a new server
ServerInfo serverInfo = null;
try {
serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
//2.create a new channel to new server
// 2.create a new channel to new server
Connection connectionNew = connectToServer(serverInfo);
if (connectionNew != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] success to connect a server [{}],connectionId={}",
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}",
name, serverInfo.getAddress(), connectionNew.getConnectionId());
//successfully create a new connect.
// successfully create a new connect.
if (currentConnection != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Abandon prev connection ,server is {}, connectionId is {}", name,
"[{}] Abandon prev connection, server is {}, connectionId is {}", name,
currentConnection.serverInfo.getAddress(), currentConnection.getConnectionId());
//set current connection to enable connection event.
// set current connection to enable connection event.
currentConnection.setAbandon(true);
closeConnection(currentConnection);
}
currentConnection = connectionNew;
rpcClientStatus.set(RpcClientStatus.RUNNING);
switchSuccess = true;
boolean s = eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
return;
}
//close connection if client is already shutdown.
// close connection if client is already shutdown.
if (isShutdown()) {
closeConnection(currentConnection);
}
@ -561,7 +554,7 @@ public abstract class RpcClient implements Closeable {
if (reConnectTimes > 0
&& reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] fail to connect server,after trying {} times, last try server is {},error={}", name,
"[{}] Fail to connect server, after trying {} times, last try server is {}, error = {}", name,
reConnectTimes, serverInfo, lastException == null ? "unknown" : lastException);
if (Integer.MAX_VALUE == retryTurns) {
retryTurns = 50;
@ -573,22 +566,22 @@ public abstract class RpcClient implements Closeable {
reConnectTimes++;
try {
//sleep x milliseconds to switch next server.
// sleep x milliseconds to switch next server.
if (!isRunning()) {
// first round ,try servers at a delay 100ms;second round ,200ms; max delays 5s. to be reconsidered.
// first round, try servers at a delay 100ms;second round, 200ms; max delays 5s. to be reconsidered.
Thread.sleep(Math.min(retryTurns + 1, 50) * 100L);
}
} catch (InterruptedException e) {
// Do nothing.
// Do nothing.
}
}
if (isShutdown()) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Client is shutdown ,stop reconnect to server", name);
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Client is shutdown, stop reconnect to server", name);
}
} catch (Exception e) {
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to re connect to server ,error is {}", name, e);
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to reconnect to server, error is {}", name, e);
}
}
@ -643,7 +636,7 @@ public abstract class RpcClient implements Closeable {
*/
public Response request(Request request, long timeoutMills) throws NacosException {
int retryTimes = 0;
Response response = null;
Response response;
Exception exceptionThrow = null;
long start = System.currentTimeMillis();
while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
@ -652,7 +645,7 @@ public abstract class RpcClient implements Closeable {
if (this.currentConnection == null || !isRunning()) {
waitReconnect = true;
throw new NacosException(NacosException.CLIENT_DISCONNECT,
"Client not connected,current status:" + rpcClientStatus.get());
"Client not connected, current status:" + rpcClientStatus.get());
}
response = this.currentConnection.request(request, timeoutMills);
if (response == null) {
@ -664,7 +657,7 @@ public abstract class RpcClient implements Closeable {
waitReconnect = true;
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Connection is unregistered, switch server,connectionId={},request={}",
"Connection is unregistered, switch server, connectionId = {}, request = {}",
currentConnection.getConnectionId(), request.getClass().getSimpleName());
switchServerAsync();
}
@ -680,14 +673,14 @@ public abstract class RpcClient implements Closeable {
} catch (Exception e) {
if (waitReconnect) {
try {
//wait client to re connect.
// wait client to reconnect.
Thread.sleep(Math.min(100, timeoutMills / 3));
} catch (Exception exception) {
//Do nothing.
// Do nothing.
}
}
LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request={}, retryTimes={},errorMessage={}",
LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
request, retryTimes, e.getMessage());
exceptionThrow = e;
@ -731,14 +724,14 @@ public abstract class RpcClient implements Closeable {
} catch (Exception e) {
if (waitReconnect) {
try {
//wait client to re connect.
// wait client to reconnect.
Thread.sleep(Math.min(100, callback.getTimeout() / 3));
} catch (Exception exception) {
//Do nothing.
// Do nothing.
}
}
LoggerUtils
.printIfErrorEnabled(LOGGER, "[{}]Send request fail, request={}, retryTimes={},errorMessage={}",
.printIfErrorEnabled(LOGGER, "[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
name, request, retryTimes, e.getMessage());
exceptionToThrow = e;
@ -779,14 +772,14 @@ public abstract class RpcClient implements Closeable {
} catch (Exception e) {
if (waitReconnect) {
try {
//wait client to re connect.
// wait client to reconnect.
Thread.sleep(100L);
} catch (Exception exception) {
//Do nothing.
// Do nothing.
}
}
LoggerUtils
.printIfErrorEnabled(LOGGER, "[{}]Send request fail, request={}, retryTimes={},errorMessage={}",
.printIfErrorEnabled(LOGGER, "[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
name, request, retryTimes, e.getMessage());
exceptionToThrow = e;
@ -823,7 +816,7 @@ public abstract class RpcClient implements Closeable {
*/
protected Response handleServerRequest(final Request request) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]receive server push request,request={},requestId={}", name,
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Receive server push request, request = {}, requestId = {}", name,
request.getClass().getSimpleName(), request.getRequestId());
lastActiveTimeStamp = System.currentTimeMillis();
for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
@ -831,12 +824,12 @@ public abstract class RpcClient implements Closeable {
Response response = serverRequestHandler.requestReply(request);
if (response != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]ack server push request,request={},requestId={}", name,
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Ack server push request, request = {}, requestId = {}", name,
request.getClass().getSimpleName(), request.getRequestId());
return response;
}
} catch (Exception e) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]handleServerRequest:{}, errorMessage={}", name,
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] HandleServerRequest:{}, errorMessage = {}", name,
serverRequestHandler.getClass().getName(), e.getMessage());
}
@ -851,7 +844,7 @@ public abstract class RpcClient implements Closeable {
*/
public synchronized void registerConnectionListener(ConnectionEventListener connectionEventListener) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Registry connection listener to current client:{}", name,
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Registry connection listener to current client:{}", name,
connectionEventListener.getClass().getName());
this.connectionEventListeners.add(connectionEventListener);
}
@ -862,7 +855,7 @@ public abstract class RpcClient implements Closeable {
* @param serverRequestHandler serverRequestHandler
*/
public synchronized void registerServerRequestHandler(ServerRequestHandler serverRequestHandler) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Register server push request handler:{}", name,
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Register server push request handler:{}", name,
serverRequestHandler.getClass().getName());
this.serverRequestHandlers.add(serverRequestHandler);
@ -913,25 +906,16 @@ public abstract class RpcClient implements Closeable {
*/
@SuppressWarnings("PMD.UndefineMagicConstantRule")
private ServerInfo resolveServerInfo(String serverAddress) {
String property = System.getProperty("nacos.server.port", "8848");
int serverPort = Integer.parseInt(property);
ServerInfo serverInfo = null;
if (serverAddress.contains(Constants.HTTP_PREFIX)) {
String[] split = serverAddress.split(Constants.COLON);
String serverIp = split[1].replaceAll("//", "");
if (split.length > 2 && StringUtils.isNotBlank(split[2])) {
serverPort = Integer.parseInt(split[2]);
}
serverInfo = new ServerInfo(serverIp, serverPort);
} else {
String[] split = serverAddress.split(Constants.COLON);
String serverIp = split[0];
if (split.length > 1 && StringUtils.isNotBlank(split[1])) {
serverPort = Integer.parseInt(split[1]);
}
serverInfo = new ServerInfo(serverIp, serverPort);
Matcher matcher = EXCLUDE_PROTOCOL_PATTERN.matcher(serverAddress);
if (matcher.find()) {
serverAddress = matcher.group(1);
}
return serverInfo;
String[] ipPortTuple = serverAddress.split(Constants.COLON, 2);
int defaultPort = Integer.parseInt(System.getProperty("nacos.server.port", "8848"));
String serverPort = CollectionUtils.getOrDefault(ipPortTuple, 1, Integer.toString(defaultPort));
return new ServerInfo(ipPortTuple[0], NumberUtils.toInt(serverPort, defaultPort));
}
public static class ServerInfo {
@ -996,7 +980,7 @@ public abstract class RpcClient implements Closeable {
@Override
public String toString() {
return "{serverIp='" + serverIp + '\'' + ", server main port=" + serverPort + '}';
return "{serverIp = '" + serverIp + '\'' + ", server main port = " + serverPort + '}';
}
}

View File

@ -17,8 +17,8 @@
package com.alibaba.nacos.common.utils;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
@ -237,16 +237,16 @@ public final class CollectionUtils {
* Returns the value to which the specified index , or {@code defaultValue} if this collection contains no value for
* the index.
*
* @param coll the collection to get a value from
* @param obj the object to get a value from
* @param index the index to get
* @param defaultValue default value
* @param <T> General Type
* @return the value to which the specified index , or {@code defaultValue} if this collection contains no value for
* the index.
*/
public static <T> T getOrDefault(Collection<T> coll, int index, T defaultValue) {
public static <T> T getOrDefault(Object obj, int index, T defaultValue) {
try {
return (T) get(coll, index);
return (T) get(obj, index);
} catch (IndexOutOfBoundsException e) {
return defaultValue;
}

View File

@ -275,6 +275,8 @@ public class RpcClientTest {
"http://10.10.10.10::8848")).getAddress());
Assert.assertEquals("10.10.10.10:8848",
((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient, "http://10.10.10.10")).getAddress());
Assert.assertEquals("10.10.10.10:8848", ((RpcClient.ServerInfo) resolveServerInfoMethod.invoke(rpcClient,
"https://10.10.10.10::8848")).getAddress());
}
@Test