From b28636c6deabc90ff0ed698148815ab839e02964 Mon Sep 17 00:00:00 2001 From: "nov.lzf" Date: Tue, 21 Jul 2020 20:08:38 +0800 Subject: [PATCH] notify connect listeners on start up (#3405) * Add gprc support-> try to connect the server synchronous on start up , start a thread to connect to server until successfully connected when fail. * Add gprc support-> notify connect listeners. * Add gprc support-> update client reconnect strategy. * Add gprc support-> optimize push config request param model and meta request. * Add gprc support-> notify on new connected --- .../remote/request/ConfigPublishRequest.java | 30 +++++- .../client/config/impl/ClientWorker.java | 4 +- .../nacos/client/remote/RpcClient.java | 66 ++++++++++-- .../nacos/client/remote/grpc/GrpcClient.java | 101 +++++++++--------- .../com/alibaba/nacos/client/ConfigTest.java | 27 +++-- .../remote/ConfigPublishRequestHandler.java | 24 ++--- ...ClientConnectionEventListenerRegistry.java | 54 +++++++++- .../nacos/core/remote/ConnectCoordinator.java | 43 ++++---- .../nacos/core/remote/ConnectionManager.java | 7 ++ .../nacos/core/remote/grpc/GrpcServer.java | 8 -- 10 files changed, 252 insertions(+), 112 deletions(-) diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigPublishRequest.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigPublishRequest.java index 086eff403..611ca1644 100644 --- a/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigPublishRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/request/ConfigPublishRequest.java @@ -16,6 +16,9 @@ package com.alibaba.nacos.api.config.remote.request; +import java.util.HashMap; +import java.util.Map; + /** * request to publish a config. * @@ -32,10 +35,35 @@ public class ConfigPublishRequest extends ConfigCommonRequest { String content; + private Map additonMap; + public ConfigPublishRequest() { } + /** + * get additional param. + * + * @param key key of param. + * @return value of param ,return null if not exist. + */ + public String getAdditionParam(String key) { + return additonMap == null ? null : additonMap.get(key); + } + + /** + * put additional param value. will override if exist. + * + * @param key key of param. + * @param value value of param. + */ + public void putAdditonalParam(String key, String value) { + if (additonMap == null) { + additonMap = new HashMap(); + } + additonMap.put(key, value); + } + public ConfigPublishRequest(String dataId, String group, String tenant, String content) { this.content = content; this.dataId = dataId; @@ -119,4 +147,4 @@ public class ConfigPublishRequest extends ConfigCommonRequest { public void setTenant(String tenant) { this.tenant = tenant; } -} \ No newline at end of file +} diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index c47b84912..e0af572a8 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -645,6 +645,7 @@ public class ClientWorker implements Closeable { }, 1L, 10L, TimeUnit.MILLISECONDS); } else { + rpcClientProxy.initAndStart(new ServerListFactory() { @Override public String genNextServer() { @@ -652,13 +653,12 @@ public class ClientWorker implements Closeable { serverListManager.refreshCurrentServerAddr(); return serverListManager.getCurrentServerAddr(); } - + @Override public String getCurrentServer() { return agent.getServerListManager().getCurrentServerAddr(); } }); - /* * Register Listen Change Handler */ diff --git a/client/src/main/java/com/alibaba/nacos/client/remote/RpcClient.java b/client/src/main/java/com/alibaba/nacos/client/remote/RpcClient.java index 1ba347de4..0861d8757 100644 --- a/client/src/main/java/com/alibaba/nacos/client/remote/RpcClient.java +++ b/client/src/main/java/com/alibaba/nacos/client/remote/RpcClient.java @@ -23,10 +23,13 @@ import com.alibaba.nacos.client.utils.LogUtils; import com.alibaba.nacos.common.lifecycle.Closeable; import org.slf4j.Logger; -import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -47,18 +50,53 @@ public abstract class RpcClient implements Closeable { protected AtomicReference rpcClientStatus = new AtomicReference( RpcClientStatus.WAIT_INIT); + protected ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("com.alibaba.nacos.client.config.grpc.worker"); + t.setDaemon(true); + return t; + } + }); + /** - * Notify when client connected. + * Notify when client re connected. */ protected void notifyReConnected() { - if (!this.connectionEventListeners.isEmpty()) { - connectionEventListeners.forEach(new Consumer() { - @Override - public void accept(ConnectionEventListener connectionEventListener) { - connectionEventListener.onReconnected(); + executorService.schedule(new Runnable() { + @Override + public void run() { + if (!connectionEventListeners.isEmpty()) { + connectionEventListeners.forEach(new Consumer() { + @Override + public void accept(ConnectionEventListener connectionEventListener) { + connectionEventListener.onReconnected(); + } + }); } - }); - } + } + }, 0, TimeUnit.MILLISECONDS); + + } + + /** + * Notify when client new connected. + */ + protected void notifyConnected() { + executorService.schedule(new Runnable() { + @Override + public void run() { + if (!connectionEventListeners.isEmpty()) { + connectionEventListeners.forEach(new Consumer() { + @Override + public void accept(ConnectionEventListener connectionEventListener) { + connectionEventListener.onReconnected(); + } + }); + } + } + }, 0, TimeUnit.MILLISECONDS); } /** @@ -110,6 +148,15 @@ public abstract class RpcClient implements Closeable { public RpcClient() { } + /** + * Getter method for property connectionEventListeners. + * + * @return property value of connectionEventListeners + */ + protected List getConnectionEventListeners() { + return connectionEventListeners; + } + /** * init server list factory. * @@ -138,7 +185,6 @@ public abstract class RpcClient implements Closeable { /** * Start this client. */ - @PostConstruct public abstract void start() throws NacosException; /** diff --git a/client/src/main/java/com/alibaba/nacos/client/remote/grpc/GrpcClient.java b/client/src/main/java/com/alibaba/nacos/client/remote/grpc/GrpcClient.java index 1cba68de2..2d18f6fc9 100644 --- a/client/src/main/java/com/alibaba/nacos/client/remote/grpc/GrpcClient.java +++ b/client/src/main/java/com/alibaba/nacos/client/remote/grpc/GrpcClient.java @@ -43,9 +43,6 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -66,20 +63,10 @@ public class GrpcClient extends RpcClient { protected RequestGrpc.RequestBlockingStub grpcServiceStub; - ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("com.alibaba.nacos.client.config.grpc.worker"); - t.setDaemon(true); - return t; - } - }); - /** * Reconnect to current server before switch a new server. */ - private static final int MAX_RECONNECT_TIMES = 5; + private static final int MAX_RECONNECT_TIMES = 3; private AtomicInteger reConnectTimesLeft = new AtomicInteger(MAX_RECONNECT_TIMES); @@ -102,14 +89,25 @@ public class GrpcClient extends RpcClient { getServerListFactory().getCurrentServer()); if (isRunning() || isInitStatus()) { final RpcClientStatus prevStatus = rpcClientStatus.get(); - boolean updateSucess = false; - if (isRunning()) { - updateSucess = rpcClientStatus.compareAndSet(prevStatus, RpcClientStatus.RE_CONNECTING); - } else { - updateSucess = rpcClientStatus.compareAndSet(prevStatus, RpcClientStatus.STARTING); - } + boolean updateSucess = rpcClientStatus.compareAndSet(prevStatus, + isInitStatus() ? RpcClientStatus.STARTING : RpcClientStatus.RE_CONNECTING); if (updateSucess) { + + if (isStarting()) { + + buildClientAtFirstTime(); + boolean sucess = serverCheck(); + if (sucess) { + rpcClientStatus.compareAndSet(RpcClientStatus.STARTING, RpcClientStatus.RUNNING); + LOGGER.info("server check success, client start up success. "); + notifyConnected(); + return; + } else { + rpcClientStatus.compareAndSet(RpcClientStatus.STARTING, RpcClientStatus.RE_CONNECTING); + } + } + executorService.schedule(new Runnable() { @Override public void run() { @@ -117,20 +115,23 @@ public class GrpcClient extends RpcClient { // loop until start client success. while (!isRunning()) { - buildClientAtFirstTime(); boolean sucess = serverCheck(); if (sucess) { - if (rpcClientStatus.get() == RpcClientStatus.RE_CONNECTING) { - notifyReConnected(); - } - LOGGER.info("Server check success, Current Server is {}" + getServerListFactory() - .getCurrentServer()); + notifyReConnected(); + LOGGER.info("server check success, reconnected success, Current Server is {}" + + getServerListFactory().getCurrentServer()); rpcClientStatus.compareAndSet(rpcClientStatus.get(), RpcClientStatus.RUNNING); reConnectTimesLeft.set(MAX_RECONNECT_TIMES); + return; } else { int leftRetryTimes = reConnectTimesLeft.decrementAndGet(); if (leftRetryTimes <= 0) { + try { + Thread.sleep(5000L); + } catch (InterruptedException e) { + //do nothing + } getServerListFactory().genNextServer(); reConnectTimesLeft.set(MAX_RECONNECT_TIMES); try { @@ -177,7 +178,7 @@ public class GrpcClient extends RpcClient { public void run() { sendBeat(); } - }, 0, 2000, TimeUnit.MILLISECONDS); + }, 0, 3000, TimeUnit.MILLISECONDS); super.registerServerPushResponseHandler(new ServerPushResponseHandler() { @Override @@ -192,7 +193,6 @@ public class GrpcClient extends RpcClient { } } }); - } /** @@ -206,12 +206,11 @@ public class GrpcClient extends RpcClient { return; } - GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP()) - .build(); HeartBeatRequest heartBeatRequest = new HeartBeatRequest(); - GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).setType(heartBeatRequest.getType()) - .setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest))) - .build()).build(); + GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()) + .setType(heartBeatRequest.getType()).setBody( + Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest))) + .build()).build(); GrpcResponse response = grpcServiceStub.request(streamRequest); } catch (Exception e) { LOGGER.error("Send heart beat error,will tring to reconnet to server ", e); @@ -219,14 +218,20 @@ public class GrpcClient extends RpcClient { } } + private GrpcMetadata buildMeta() { + GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP()) + .setVersion(ClientCommonUtils.VERSION).build(); + return meta; + } + private boolean serverCheck() { try { - GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP()) - .build(); + HeartBeatRequest heartBeatRequest = new HeartBeatRequest(); - GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).setType(heartBeatRequest.getType()) - .setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest))) - .build()).build(); + GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()) + .setType(heartBeatRequest.getType()).setBody( + Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest))) + .build()).build(); GrpcResponse response = grpcServiceStub.request(streamRequest); return response != null; } catch (Exception e) { @@ -268,16 +273,14 @@ public class GrpcClient extends RpcClient { } LOGGER.info("GrpcClient start to connect to rpc server, serverIp={},port={}", serverIp, serverPort); - - this.channel = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext(true).build(); + + this.channel = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext().build(); grpcStreamServiceStub = RequestStreamGrpc.newStub(channel); grpcServiceStub = RequestGrpc.newBlockingStub(channel); - - GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP()) - .setVersion(ClientCommonUtils.VERSION).build(); - GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).build(); + + GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).build(); LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest); grpcStreamServiceStub.requestStream(streamRequest, new StreamObserver() { @@ -325,15 +328,17 @@ public class GrpcClient extends RpcClient { @Override public Response request(Request request) throws NacosException { + if (!this.isRunning()) { + throw new IllegalStateException("Client is not connected to any server now,please retry later"); + } try { - GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP()) - .build(); - GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(meta).setType(request.getType()) + + GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).setType(request.getType()) .setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request)))).build(); GrpcResponse response = grpcServiceStub.request(grpcrequest); String type = response.getType(); String bodyString = response.getBody().getValue().toStringUtf8(); - + // transfrom grpcResponse to response model Class classByType = ResponseRegistry.getClassByType(type); if (classByType != null) { diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java index 4821cc129..3776d2f7d 100644 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java @@ -20,13 +20,13 @@ import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.AbstractListener; -import com.alibaba.nacos.common.utils.ThreadUtils; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import java.util.Scanner; @@ -38,12 +38,12 @@ public class ConfigTest { @Before public void before() throws Exception { Properties properties = new Properties(); - //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:28848"); - properties.setProperty(PropertyKeyConst.SERVER_ADDR, - "11.239.114.187:8848,11.239.113.204:8848,11.239.112.161:8848"); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:28848"); + //properties.setProperty(PropertyKeyConst.SERVER_ADDR, + // "11.239.114.187:8848,11.239.113.204:8848,11.239.112.161:8848"); //"11.239.114.187:8848"); - configService = NacosFactory.createConfigService(properties); + //Thread.sleep(2000L); } @After @@ -53,6 +53,14 @@ public class ConfigTest { @Test public void test2() throws Exception { + Properties properties = new Properties(); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:28848"); + List list = new ArrayList(); + for (int i = 0; i <= 1000; i++) { + ConfigService configService2 = NacosFactory.createConfigService(properties); + System.out.println(configService2); + } + Thread.sleep(1000000L); } @@ -62,10 +70,11 @@ public class ConfigTest { final String dataId = "lessspring"; final String group = "lessspring"; final String content = "lessspring-" + System.currentTimeMillis(); + System.out.println("4-" + System.currentTimeMillis()); + boolean result = configService.publishConfig(dataId, group, content); - Assert.assertTrue(result); - - ThreadUtils.sleep(200L); + //Assert.assertTrue(result); + System.out.println("5-" + System.currentTimeMillis()); configService.getConfigAndSignListener(dataId, group, 5000, new AbstractListener() { @Override diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java index e124ce148..f739fc855 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java @@ -74,22 +74,22 @@ public class ConfigPublishRequestHandler extends RequestHandler { final String tenant = myRequest.getTenant(); final String srcIp = meta.getClientIp(); - final String requestIpApp = meta.getLabels().get("requestIpApp"); - final String tag = meta.getLabels().get("tag"); - final String appName = meta.getLabels().get("appName"); - final String type = meta.getLabels().get("type"); - final String srcUser = meta.getLabels().get("src_user"); + final String requestIpApp = myRequest.getAdditionParam("requestIpApp"); + final String tag = myRequest.getAdditionParam("tag"); + final String appName = myRequest.getAdditionParam("appName"); + final String type = myRequest.getAdditionParam("type"); + final String srcUser = myRequest.getAdditionParam("src_user"); // check tenant ParamUtils.checkParam(dataId, group, "datumId", content); ParamUtils.checkParam(tag); Map configAdvanceInfo = new HashMap(10); - MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", meta.getLabels().get("configTags")); - MapUtils.putIfValNoNull(configAdvanceInfo, "desc", meta.getLabels().get("desc")); - MapUtils.putIfValNoNull(configAdvanceInfo, "use", meta.getLabels().get("use")); - MapUtils.putIfValNoNull(configAdvanceInfo, "effect", meta.getLabels().get("effect")); + MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", myRequest.getAdditionParam("configTags")); + MapUtils.putIfValNoNull(configAdvanceInfo, "desc", myRequest.getAdditionParam("desc")); + MapUtils.putIfValNoNull(configAdvanceInfo, "use", myRequest.getAdditionParam("use")); + MapUtils.putIfValNoNull(configAdvanceInfo, "effect", myRequest.getAdditionParam("effect")); MapUtils.putIfValNoNull(configAdvanceInfo, "type", type); - MapUtils.putIfValNoNull(configAdvanceInfo, "schema", meta.getLabels().get("schema")); + MapUtils.putIfValNoNull(configAdvanceInfo, "schema", myRequest.getAdditionParam("schema")); ParamUtils.checkParam(configAdvanceInfo); if (AggrWhitelist.isAggrDataId(dataId)) { @@ -99,7 +99,7 @@ public class ConfigPublishRequestHandler extends RequestHandler { } final Timestamp time = TimeUtils.getCurrentTime(); - String betaIps = meta.getLabels().get("betaIps"); + String betaIps = myRequest.getAdditionParam("betaIps"); ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content); configInfo.setType(type); if (StringUtils.isBlank(betaIps)) { @@ -132,4 +132,4 @@ public class ConfigPublishRequestHandler extends RequestHandler { public List getRequestTypes() { return Lists.newArrayList(ConfigRequestTypeConstants.PUBLISH_CONFIG); } -} \ No newline at end of file +} diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ClientConnectionEventListenerRegistry.java b/core/src/main/java/com/alibaba/nacos/core/remote/ClientConnectionEventListenerRegistry.java index a581d24b5..353b3634a 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ClientConnectionEventListenerRegistry.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ClientConnectionEventListenerRegistry.java @@ -16,14 +16,19 @@ package com.alibaba.nacos.core.remote; +import com.alibaba.nacos.api.remote.connection.Connection; import com.alibaba.nacos.core.utils.Loggers; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; /** - * regitry for client connection event listeners. + * registry for client connection event listeners. * * @author liuzunfei * @version $Id: ClientConnectionEventListenerRegistry.java, v 0.1 2020年07月20日 1:47 PM liuzunfei Exp $ @@ -33,14 +38,57 @@ public class ClientConnectionEventListenerRegistry { final List clientConnectionEventListeners = new ArrayList(); + protected ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("com.alibaba.nacos.core.remote.client.connection.notifier"); + t.setDaemon(true); + return t; + } + }); + + /** + * notify where a new client connected. + * + * @param connection connection that new created. + */ + public void notifyClientConnected(final Connection connection) { + executorService.schedule(new Runnable() { + @Override + public void run() { + for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) { + clientConnectionEventListener.clientConnected(connection); + } + } + }, 0L, TimeUnit.MILLISECONDS); + } + + /** + * notify where a new client disconnected. + * + * @param connection connection that disconnected. + */ + public void notifyClientDisConnected(final Connection connection) { + executorService.schedule(new Runnable() { + @Override + public void run() { + for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) { + clientConnectionEventListener.clientDisConnected(connection); + } + } + }, 0L, TimeUnit.MILLISECONDS); + } + /** * register ClientConnectionEventListener. * * @param listener listener. */ public void registerClientConnectionEventListener(ClientConnectionEventListener listener) { - Loggers.GRPC.info("[ClientConnectionEventListenerRegistry] registry listener - " + listener.getClass().getSimpleName()); + Loggers.GRPC.info("[ClientConnectionEventListenerRegistry] registry listener - " + listener.getClass() + .getSimpleName()); this.clientConnectionEventListeners.add(listener); } -} \ No newline at end of file +} diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectCoordinator.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectCoordinator.java index 86d0c5413..5d4c0709f 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectCoordinator.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectCoordinator.java @@ -22,7 +22,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; -import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -57,29 +60,31 @@ public class ConnectCoordinator implements ConnectionHeathyChecker { executors.scheduleWithFixedDelay(new Runnable() { @Override public void run() { - - long currentStamp = System.currentTimeMillis(); - Collection connections = connectionManager.connetions.values(); - for (Connection conn : connections) { - try { - long lastActiveTimestamp = conn.getLastActiveTimestamp(); + try { + long currentStamp = System.currentTimeMillis(); + Set> entries = connectionManager.connetions.entrySet(); + + List toExpelCLients = new LinkedList(); + for (Map.Entry entry : entries) { + Connection client = entry.getValue(); + long lastActiveTimestamp = entry.getValue().getLastActiveTimestamp(); if (currentStamp - lastActiveTimestamp > EXPIRE_MILLSECOND) { - conn.closeGrapcefully(); - connectionManager.unregister(conn.getConnectionId()); - Loggers.GRPC.info("expire connection found ,success expel connectionid = {} ", - conn.getConnectionId()); - for (ClientConnectionEventListener listener : clientConnectionEventListenerRegistry.clientConnectionEventListeners) { - listener.clientDisConnected(conn); - } - + toExpelCLients.add(client.getConnectionId()); } - } catch (Exception e) { - Loggers.GRPC.error("error occurs when expel expire connection ,connectionid={} ", - conn.getConnectionId(), e); } + + for (String expeledClient : toExpelCLients) { + connectionManager.unregister(expeledClient); + Loggers.GRPC.info("expire connection found ,success expel connectionid = {} ", expeledClient); + + } + + } catch (Exception e) { + Loggers.GRPC.error("error occurs when heathy check... ", e); } } - }, 500L, 5000L, TimeUnit.MILLISECONDS); + }, 500L, 3000L, TimeUnit.MILLISECONDS); } } + diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java index 53490b144..785d6d590 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java @@ -18,6 +18,7 @@ package com.alibaba.nacos.core.remote; import com.alibaba.nacos.api.remote.connection.Connection; import com.alibaba.nacos.core.utils.Loggers; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.HashMap; @@ -34,6 +35,9 @@ public class ConnectionManager { Map connetions = new HashMap(); + @Autowired + private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry; + /** * register a new connect. * @@ -43,6 +47,7 @@ public class ConnectionManager { public void register(String connectionId, Connection connection) { Connection connectionInner = connetions.putIfAbsent(connectionId, connection); if (connectionInner == null) { + clientConnectionEventListenerRegistry.notifyClientConnected(connection); Loggers.GRPC.info("new connection registered successfully, connectionid = {} ", connectionId); } } @@ -54,7 +59,9 @@ public class ConnectionManager { public void unregister(String connectionId) { Connection remove = this.connetions.remove(connectionId); if (remove != null) { + remove.closeGrapcefully(); Loggers.GRPC.info(" connection unregistered successfully,connectionid = {} ", connectionId); + clientConnectionEventListenerRegistry.notifyClientConnected(remove); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java index c6da0fdc2..acfc000e7 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java @@ -16,8 +16,6 @@ package com.alibaba.nacos.core.remote.grpc; -import com.alibaba.nacos.core.remote.ClientConnectionEventListenerRegistry; -import com.alibaba.nacos.core.remote.ConnectCoordinator; import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.RequestHandlerRegistry; import com.alibaba.nacos.core.remote.RpcServer; @@ -41,12 +39,6 @@ public class GrpcServer extends RpcServer { private Server server; - @Autowired - private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry; - - @Autowired - private ConnectCoordinator connectCoordinator; - @Autowired private ConnectionManager connectionManager;