notify connect listeners on start up (#3405)

* Add gprc support-> try to connect the server synchronous on start up , start a thread to connect to server until successfully connected when fail.

* Add gprc support-> notify connect listeners.

* Add gprc support-> update client reconnect  strategy.

* Add gprc support-> optimize push config request param model and meta request.

* Add gprc support-> notify on new connected
This commit is contained in:
nov.lzf 2020-07-21 20:08:38 +08:00 committed by GitHub
parent 115c992c98
commit b28636c6de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 252 additions and 112 deletions

View File

@ -16,6 +16,9 @@
package com.alibaba.nacos.api.config.remote.request;
import java.util.HashMap;
import java.util.Map;
/**
* request to publish a config.
*
@ -32,10 +35,35 @@ public class ConfigPublishRequest extends ConfigCommonRequest {
String content;
private Map<String, String> additonMap;
public ConfigPublishRequest() {
}
/**
* get additional param.
*
* @param key key of param.
* @return value of param ,return null if not exist.
*/
public String getAdditionParam(String key) {
return additonMap == null ? null : additonMap.get(key);
}
/**
* put additional param value. will override if exist.
*
* @param key key of param.
* @param value value of param.
*/
public void putAdditonalParam(String key, String value) {
if (additonMap == null) {
additonMap = new HashMap<String, String>();
}
additonMap.put(key, value);
}
public ConfigPublishRequest(String dataId, String group, String tenant, String content) {
this.content = content;
this.dataId = dataId;

View File

@ -645,6 +645,7 @@ public class ClientWorker implements Closeable {
}, 1L, 10L, TimeUnit.MILLISECONDS);
} else {
rpcClientProxy.initAndStart(new ServerListFactory() {
@Override
public String genNextServer() {
@ -658,7 +659,6 @@ public class ClientWorker implements Closeable {
return agent.getServerListManager().getCurrentServerAddr();
}
});
/*
* Register Listen Change Handler
*/

View File

@ -23,10 +23,13 @@ import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.common.lifecycle.Closeable;
import org.slf4j.Logger;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@ -47,11 +50,24 @@ public abstract class RpcClient implements Closeable {
protected AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<RpcClientStatus>(
RpcClientStatus.WAIT_INIT);
protected ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.config.grpc.worker");
t.setDaemon(true);
return t;
}
});
/**
* Notify when client connected.
* Notify when client re connected.
*/
protected void notifyReConnected() {
if (!this.connectionEventListeners.isEmpty()) {
executorService.schedule(new Runnable() {
@Override
public void run() {
if (!connectionEventListeners.isEmpty()) {
connectionEventListeners.forEach(new Consumer<ConnectionEventListener>() {
@Override
public void accept(ConnectionEventListener connectionEventListener) {
@ -60,6 +76,28 @@ public abstract class RpcClient implements Closeable {
});
}
}
}, 0, TimeUnit.MILLISECONDS);
}
/**
* Notify when client new connected.
*/
protected void notifyConnected() {
executorService.schedule(new Runnable() {
@Override
public void run() {
if (!connectionEventListeners.isEmpty()) {
connectionEventListeners.forEach(new Consumer<ConnectionEventListener>() {
@Override
public void accept(ConnectionEventListener connectionEventListener) {
connectionEventListener.onReconnected();
}
});
}
}
}, 0, TimeUnit.MILLISECONDS);
}
/**
* check is this client is inited.
@ -110,6 +148,15 @@ public abstract class RpcClient implements Closeable {
public RpcClient() {
}
/**
* Getter method for property <tt>connectionEventListeners</tt>.
*
* @return property value of connectionEventListeners
*/
protected List<ConnectionEventListener> getConnectionEventListeners() {
return connectionEventListeners;
}
/**
* init server list factory.
*
@ -138,7 +185,6 @@ public abstract class RpcClient implements Closeable {
/**
* Start this client.
*/
@PostConstruct
public abstract void start() throws NacosException;
/**

View File

@ -43,9 +43,6 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@ -66,20 +63,10 @@ public class GrpcClient extends RpcClient {
protected RequestGrpc.RequestBlockingStub grpcServiceStub;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.config.grpc.worker");
t.setDaemon(true);
return t;
}
});
/**
* Reconnect to current server before switch a new server.
*/
private static final int MAX_RECONNECT_TIMES = 5;
private static final int MAX_RECONNECT_TIMES = 3;
private AtomicInteger reConnectTimesLeft = new AtomicInteger(MAX_RECONNECT_TIMES);
@ -102,14 +89,25 @@ public class GrpcClient extends RpcClient {
getServerListFactory().getCurrentServer());
if (isRunning() || isInitStatus()) {
final RpcClientStatus prevStatus = rpcClientStatus.get();
boolean updateSucess = false;
if (isRunning()) {
updateSucess = rpcClientStatus.compareAndSet(prevStatus, RpcClientStatus.RE_CONNECTING);
} else {
updateSucess = rpcClientStatus.compareAndSet(prevStatus, RpcClientStatus.STARTING);
}
boolean updateSucess = rpcClientStatus.compareAndSet(prevStatus,
isInitStatus() ? RpcClientStatus.STARTING : RpcClientStatus.RE_CONNECTING);
if (updateSucess) {
if (isStarting()) {
buildClientAtFirstTime();
boolean sucess = serverCheck();
if (sucess) {
rpcClientStatus.compareAndSet(RpcClientStatus.STARTING, RpcClientStatus.RUNNING);
LOGGER.info("server check success, client start up success. ");
notifyConnected();
return;
} else {
rpcClientStatus.compareAndSet(RpcClientStatus.STARTING, RpcClientStatus.RE_CONNECTING);
}
}
executorService.schedule(new Runnable() {
@Override
public void run() {
@ -117,20 +115,23 @@ public class GrpcClient extends RpcClient {
// loop until start client success.
while (!isRunning()) {
buildClientAtFirstTime();
boolean sucess = serverCheck();
if (sucess) {
if (rpcClientStatus.get() == RpcClientStatus.RE_CONNECTING) {
notifyReConnected();
}
LOGGER.info("Server check success, Current Server is {}" + getServerListFactory()
.getCurrentServer());
LOGGER.info("server check success, reconnected success, Current Server is {}"
+ getServerListFactory().getCurrentServer());
rpcClientStatus.compareAndSet(rpcClientStatus.get(), RpcClientStatus.RUNNING);
reConnectTimesLeft.set(MAX_RECONNECT_TIMES);
return;
} else {
int leftRetryTimes = reConnectTimesLeft.decrementAndGet();
if (leftRetryTimes <= 0) {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
//do nothing
}
getServerListFactory().genNextServer();
reConnectTimesLeft.set(MAX_RECONNECT_TIMES);
try {
@ -177,7 +178,7 @@ public class GrpcClient extends RpcClient {
public void run() {
sendBeat();
}
}, 0, 2000, TimeUnit.MILLISECONDS);
}, 0, 3000, TimeUnit.MILLISECONDS);
super.registerServerPushResponseHandler(new ServerPushResponseHandler() {
@Override
@ -192,7 +193,6 @@ public class GrpcClient extends RpcClient {
}
}
});
}
/**
@ -206,11 +206,10 @@ public class GrpcClient extends RpcClient {
return;
}
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.build();
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).setType(heartBeatRequest.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest)))
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta())
.setType(heartBeatRequest.getType()).setBody(
Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest)))
.build()).build();
GrpcResponse response = grpcServiceStub.request(streamRequest);
} catch (Exception e) {
@ -219,13 +218,19 @@ public class GrpcClient extends RpcClient {
}
}
private GrpcMetadata buildMeta() {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.setVersion(ClientCommonUtils.VERSION).build();
return meta;
}
private boolean serverCheck() {
try {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.build();
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).setType(heartBeatRequest.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest)))
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta())
.setType(heartBeatRequest.getType()).setBody(
Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest)))
.build()).build();
GrpcResponse response = grpcServiceStub.request(streamRequest);
return response != null;
@ -269,15 +274,13 @@ public class GrpcClient extends RpcClient {
LOGGER.info("GrpcClient start to connect to rpc server, serverIp={},port={}", serverIp, serverPort);
this.channel = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext(true).build();
this.channel = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext().build();
grpcStreamServiceStub = RequestStreamGrpc.newStub(channel);
grpcServiceStub = RequestGrpc.newBlockingStub(channel);
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.setVersion(ClientCommonUtils.VERSION).build();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).build();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).build();
LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest);
grpcStreamServiceStub.requestStream(streamRequest, new StreamObserver<GrpcResponse>() {
@ -325,10 +328,12 @@ public class GrpcClient extends RpcClient {
@Override
public Response request(Request request) throws NacosException {
if (!this.isRunning()) {
throw new IllegalStateException("Client is not connected to any server now,please retry later");
}
try {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.build();
GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(meta).setType(request.getType())
GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).setType(request.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request)))).build();
GrpcResponse response = grpcServiceStub.request(grpcrequest);
String type = response.getType();

View File

@ -20,13 +20,13 @@ import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.AbstractListener;
import com.alibaba.nacos.common.utils.ThreadUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;
@ -38,12 +38,12 @@ public class ConfigTest {
@Before
public void before() throws Exception {
Properties properties = new Properties();
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:28848");
properties.setProperty(PropertyKeyConst.SERVER_ADDR,
"11.239.114.187:8848,11.239.113.204:8848,11.239.112.161:8848");
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:28848");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR,
// "11.239.114.187:8848,11.239.113.204:8848,11.239.112.161:8848");
//"11.239.114.187:8848");
configService = NacosFactory.createConfigService(properties);
//Thread.sleep(2000L);
}
@After
@ -53,6 +53,14 @@ public class ConfigTest {
@Test
public void test2() throws Exception {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:28848");
List<ConfigService> list = new ArrayList<ConfigService>();
for (int i = 0; i <= 1000; i++) {
ConfigService configService2 = NacosFactory.createConfigService(properties);
System.out.println(configService2);
}
Thread.sleep(1000000L);
}
@ -62,10 +70,11 @@ public class ConfigTest {
final String dataId = "lessspring";
final String group = "lessspring";
final String content = "lessspring-" + System.currentTimeMillis();
boolean result = configService.publishConfig(dataId, group, content);
Assert.assertTrue(result);
System.out.println("4-" + System.currentTimeMillis());
ThreadUtils.sleep(200L);
boolean result = configService.publishConfig(dataId, group, content);
//Assert.assertTrue(result);
System.out.println("5-" + System.currentTimeMillis());
configService.getConfigAndSignListener(dataId, group, 5000, new AbstractListener() {
@Override

View File

@ -74,22 +74,22 @@ public class ConfigPublishRequestHandler extends RequestHandler {
final String tenant = myRequest.getTenant();
final String srcIp = meta.getClientIp();
final String requestIpApp = meta.getLabels().get("requestIpApp");
final String tag = meta.getLabels().get("tag");
final String appName = meta.getLabels().get("appName");
final String type = meta.getLabels().get("type");
final String srcUser = meta.getLabels().get("src_user");
final String requestIpApp = myRequest.getAdditionParam("requestIpApp");
final String tag = myRequest.getAdditionParam("tag");
final String appName = myRequest.getAdditionParam("appName");
final String type = myRequest.getAdditionParam("type");
final String srcUser = myRequest.getAdditionParam("src_user");
// check tenant
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", meta.getLabels().get("configTags"));
MapUtils.putIfValNoNull(configAdvanceInfo, "desc", meta.getLabels().get("desc"));
MapUtils.putIfValNoNull(configAdvanceInfo, "use", meta.getLabels().get("use"));
MapUtils.putIfValNoNull(configAdvanceInfo, "effect", meta.getLabels().get("effect"));
MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", myRequest.getAdditionParam("configTags"));
MapUtils.putIfValNoNull(configAdvanceInfo, "desc", myRequest.getAdditionParam("desc"));
MapUtils.putIfValNoNull(configAdvanceInfo, "use", myRequest.getAdditionParam("use"));
MapUtils.putIfValNoNull(configAdvanceInfo, "effect", myRequest.getAdditionParam("effect"));
MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);
MapUtils.putIfValNoNull(configAdvanceInfo, "schema", meta.getLabels().get("schema"));
MapUtils.putIfValNoNull(configAdvanceInfo, "schema", myRequest.getAdditionParam("schema"));
ParamUtils.checkParam(configAdvanceInfo);
if (AggrWhitelist.isAggrDataId(dataId)) {
@ -99,7 +99,7 @@ public class ConfigPublishRequestHandler extends RequestHandler {
}
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = meta.getLabels().get("betaIps");
String betaIps = myRequest.getAdditionParam("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
configInfo.setType(type);
if (StringUtils.isBlank(betaIps)) {

View File

@ -16,14 +16,19 @@
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.remote.connection.Connection;
import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* regitry for client connection event listeners.
* registry for client connection event listeners.
*
* @author liuzunfei
* @version $Id: ClientConnectionEventListenerRegistry.java, v 0.1 2020年07月20日 1:47 PM liuzunfei Exp $
@ -33,13 +38,56 @@ public class ClientConnectionEventListenerRegistry {
final List<ClientConnectionEventListener> clientConnectionEventListeners = new ArrayList<ClientConnectionEventListener>();
protected ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.core.remote.client.connection.notifier");
t.setDaemon(true);
return t;
}
});
/**
* notify where a new client connected.
*
* @param connection connection that new created.
*/
public void notifyClientConnected(final Connection connection) {
executorService.schedule(new Runnable() {
@Override
public void run() {
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
clientConnectionEventListener.clientConnected(connection);
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
/**
* notify where a new client disconnected.
*
* @param connection connection that disconnected.
*/
public void notifyClientDisConnected(final Connection connection) {
executorService.schedule(new Runnable() {
@Override
public void run() {
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
clientConnectionEventListener.clientDisConnected(connection);
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
/**
* register ClientConnectionEventListener.
*
* @param listener listener.
*/
public void registerClientConnectionEventListener(ClientConnectionEventListener listener) {
Loggers.GRPC.info("[ClientConnectionEventListenerRegistry] registry listener - " + listener.getClass().getSimpleName());
Loggers.GRPC.info("[ClientConnectionEventListenerRegistry] registry listener - " + listener.getClass()
.getSimpleName());
this.clientConnectionEventListeners.add(listener);
}

View File

@ -22,7 +22,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -57,29 +60,31 @@ public class ConnectCoordinator implements ConnectionHeathyChecker {
executors.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
long currentStamp = System.currentTimeMillis();
Collection<Connection> connections = connectionManager.connetions.values();
for (Connection conn : connections) {
try {
long lastActiveTimestamp = conn.getLastActiveTimestamp();
long currentStamp = System.currentTimeMillis();
Set<Map.Entry<String, Connection>> entries = connectionManager.connetions.entrySet();
List<String> toExpelCLients = new LinkedList<String>();
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
long lastActiveTimestamp = entry.getValue().getLastActiveTimestamp();
if (currentStamp - lastActiveTimestamp > EXPIRE_MILLSECOND) {
conn.closeGrapcefully();
connectionManager.unregister(conn.getConnectionId());
Loggers.GRPC.info("expire connection found success expel connectionid = {} ",
conn.getConnectionId());
for (ClientConnectionEventListener listener : clientConnectionEventListenerRegistry.clientConnectionEventListeners) {
listener.clientDisConnected(conn);
toExpelCLients.add(client.getConnectionId());
}
}
for (String expeledClient : toExpelCLients) {
connectionManager.unregister(expeledClient);
Loggers.GRPC.info("expire connection found success expel connectionid = {} ", expeledClient);
}
} catch (Exception e) {
Loggers.GRPC.error("error occurs when expel expire connection connectionid={} ",
conn.getConnectionId(), e);
Loggers.GRPC.error("error occurs when heathy check... ", e);
}
}
}
}, 500L, 5000L, TimeUnit.MILLISECONDS);
}, 500L, 3000L, TimeUnit.MILLISECONDS);
}
}

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.remote.connection.Connection;
import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
@ -34,6 +35,9 @@ public class ConnectionManager {
Map<String, Connection> connetions = new HashMap<String, Connection>();
@Autowired
private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;
/**
* register a new connect.
*
@ -43,6 +47,7 @@ public class ConnectionManager {
public void register(String connectionId, Connection connection) {
Connection connectionInner = connetions.putIfAbsent(connectionId, connection);
if (connectionInner == null) {
clientConnectionEventListenerRegistry.notifyClientConnected(connection);
Loggers.GRPC.info("new connection registered successfully, connectionid = {} ", connectionId);
}
}
@ -54,7 +59,9 @@ public class ConnectionManager {
public void unregister(String connectionId) {
Connection remove = this.connetions.remove(connectionId);
if (remove != null) {
remove.closeGrapcefully();
Loggers.GRPC.info(" connection unregistered successfully,connectionid = {} ", connectionId);
clientConnectionEventListenerRegistry.notifyClientConnected(remove);
}
}

View File

@ -16,8 +16,6 @@
package com.alibaba.nacos.core.remote.grpc;
import com.alibaba.nacos.core.remote.ClientConnectionEventListenerRegistry;
import com.alibaba.nacos.core.remote.ConnectCoordinator;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.core.remote.RpcServer;
@ -41,12 +39,6 @@ public class GrpcServer extends RpcServer {
private Server server;
@Autowired
private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;
@Autowired
private ConnectCoordinator connectCoordinator;
@Autowired
private ConnectionManager connectionManager;