remote support on servers and optimize connection and client model (#3609)

* Add gprc support-> optimize rpc listen execute task  notify

* Add gprc support->  add listener optimize.

* Add gprc support-> cluster rpc client support

* cluster rpc client support

* config change notify bettween server long connect support

* server push future refactor.

* server rpc sync compatibility support

* connection labels  support

* code fail fix
This commit is contained in:
nov.lzf 2020-08-14 17:40:00 +08:00 committed by GitHub
parent a0a1486fa2
commit 33df55d40a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 1788 additions and 548 deletions

View File

@ -35,4 +35,7 @@ public class ConfigRequestTypeConstants extends RequestTypeConstants {
public static final String CONFIG_CHANGE_NOTIFY = "CONFIG_CHANGE_NOTIFY";
public static final String CONFIG_CHANGE_CLUSTER_SYNC = "CONFIG_CHANGE_CLUSTER_SYNC";
}

View File

@ -0,0 +1,163 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.config.remote.request.cluster;
import com.alibaba.nacos.api.config.remote.request.AbstractConfigRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigRequestTypeConstants;
/**
* config change sync request on clusters.
*
* @author liuzunfei
* @version $Id: ConfigChangeClusterSyncRequest.java, v 0.1 2020年08月11日 4:30 PM liuzunfei Exp $
*/
public class ConfigChangeClusterSyncRequest extends AbstractConfigRequest {
String dataId;
String group;
String tenant;
String tag;
long lastModified;
String isBeta;
/**
* is beta.
*
* @return
*/
public boolean isBeta() {
return "Y".equalsIgnoreCase(isBeta);
}
@Override
public String getType() {
return ConfigRequestTypeConstants.CONFIG_CHANGE_CLUSTER_SYNC;
}
/**
* Getter method for property <tt>dataId</tt>.
*
* @return property value of dataId
*/
public String getDataId() {
return dataId;
}
/**
* Setter method for property <tt>dataId</tt>.
*
* @param dataId value to be assigned to property dataId
*/
public void setDataId(String dataId) {
this.dataId = dataId;
}
/**
* Getter method for property <tt>group</tt>.
*
* @return property value of group
*/
public String getGroup() {
return group;
}
/**
* Setter method for property <tt>group</tt>.
*
* @param group value to be assigned to property group
*/
public void setGroup(String group) {
this.group = group;
}
/**
* Getter method for property <tt>tenant</tt>.
*
* @return property value of tenant
*/
public String getTenant() {
return tenant;
}
/**
* Setter method for property <tt>tenant</tt>.
*
* @param tenant value to be assigned to property tenant
*/
public void setTenant(String tenant) {
this.tenant = tenant;
}
/**
* Getter method for property <tt>tag</tt>.
*
* @return property value of tag
*/
public String getTag() {
return tag;
}
/**
* Setter method for property <tt>tag</tt>.
*
* @param tag value to be assigned to property tag
*/
public void setTag(String tag) {
this.tag = tag;
}
/**
* Getter method for property <tt>lastModified</tt>.
*
* @return property value of lastModified
*/
public long getLastModified() {
return lastModified;
}
/**
* Setter method for property <tt>lastModified</tt>.
*
* @param lastModified value to be assigned to property lastModified
*/
public void setLastModified(long lastModified) {
this.lastModified = lastModified;
}
/**
* Getter method for property <tt>isBeta</tt>.
*
* @return property value of isBeta
*/
public String getIsBeta() {
return isBeta;
}
/**
* Setter method for property <tt>isBeta</tt>.
*
* @param isBeta value to be assigned to property isBeta
*/
public void setIsBeta(String isBeta) {
this.isBeta = isBeta;
}
}

View File

@ -35,4 +35,6 @@ public class ConfigResponseTypeConstants extends ResponseTypeConstants {
public static final String CONFIG_NOTIFY = "CONFIG_NOTIFY";
public static final String CONFIG_CHANGE_CLUSTER_SYNC = "CONFIG_CHANGE_CLUSTER_SYNC";
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.config.remote.response.cluster;
import com.alibaba.nacos.api.config.remote.response.ConfigResponseTypeConstants;
import com.alibaba.nacos.api.remote.response.Response;
/**
* config change sync response on clusters.
*
* @author liuzunfei
* @version $Id: ConfigChangeClusterSyncResponse.java, v 0.1 2020年08月11日 4:32 PM liuzunfei Exp $
*/
public class ConfigChangeClusterSyncResponse extends Response {
@Override
public String getType() {
return ConfigResponseTypeConstants.CONFIG_CHANGE_CLUSTER_SYNC;
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote;
/**
* constants define of remote.
*
* @author liuzunfei
* @version $Id: ConnectionMetaConstants.java, v 0.1 2020年08月13日 1:05 PM liuzunfei Exp $
*/
public class RemoteConstants {
/**
* label key value define.
*/
public static final String LABEL_SOURCE = "source";
public static final String LABEL_SOURCE_SDK = "sdk";
public static final String LABEL_SOURCE_NODE = "node";
public static final String LABEL_MODULE = "module";
public static final String LABEL_MODULE_CONFIG = "config";
public static final String LABEL_MODULE_NAMING = "naming";
}

View File

@ -16,6 +16,9 @@
package com.alibaba.nacos.api.remote.request;
import java.util.HashMap;
import java.util.Map;
/**
* request to setup a connection.
*
@ -30,6 +33,8 @@ public class ConnectionSetupRequest extends InternalRequest {
private String clientVersion;
protected Map<String, String> labels = new HashMap<String, String>();
public ConnectionSetupRequest() {
}
@ -39,6 +44,14 @@ public class ConnectionSetupRequest extends InternalRequest {
this.clientVersion = clientVersion;
}
public ConnectionSetupRequest(String connectionId, String clientIp, String clientVersion,
Map<String, String> labels) {
this.clientIp = clientIp;
this.connectionId = connectionId;
this.clientVersion = clientVersion;
this.labels = labels;
}
@Override
public String getType() {
return RequestTypeConstants.CONNECTION_SETUP;
@ -97,4 +110,22 @@ public class ConnectionSetupRequest extends InternalRequest {
public void setClientVersion(String clientVersion) {
this.clientVersion = clientVersion;
}
/**
* Getter method for property <tt>labels</tt>.
*
* @return property value of labels
*/
public Map<String, String> getLabels() {
return labels;
}
/**
* Setter method for property <tt>labels</tt>.
*
* @param labels value to be assigned to property labels
*/
public void setLabels(Map<String, String> labels) {
this.labels = labels;
}
}

View File

@ -28,6 +28,8 @@ public class PushAckRequest extends InternalRequest {
private boolean success;
private Exception exception;
@Override
public String getType() {
return RequestTypeConstants.PUSH_ACK;
@ -83,4 +85,22 @@ public class PushAckRequest extends InternalRequest {
public void setSuccess(boolean success) {
this.success = success;
}
/**
* Setter method for property <tt>exception</tt>.
*
* @param exception value to be assigned to property exception
*/
public void setException(Exception exception) {
this.exception = exception;
}
/**
* Getter method for property <tt>exception</tt>.
*
* @return property value of exception
*/
public Exception getException() {
return exception;
}
}

View File

@ -14,18 +14,24 @@
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
package com.alibaba.nacos.api.remote.response;
/**
* NacosRemoteConstants.
* abstract callback of push service.
*
* @author liuzunfei
* @version $Id: NacosRemoteConstants.java, v 0.1 2020年07月14日 9:22 PM liuzunfei Exp $
* @version $Id: PushCallBack.java, v 0.1 2020年07月20日 1:13 PM liuzunfei Exp $
*/
public class NacosRemoteConstants {
public abstract class AbstractPushCallBack implements PushCallBack {
public static final String LISTEN_CONTEXT_CONFIG = "CONFIG";
private long timeout;
public static final String LISTEN_CONTEXT_NAMING = "NAMING";
public AbstractPushCallBack(long timeout) {
this.timeout = timeout;
}
@Override
public long getTimeout() {
return timeout;
}
}

View File

@ -24,6 +24,8 @@ package com.alibaba.nacos.api.remote.response;
*/
public interface PushCallBack {
public long getTimeout();
public void onSuccess();
public void onFail(Exception e);

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigResponseTypeConstants;
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.naming.remote.request.NotifySubscriberRequest;
import com.alibaba.nacos.api.naming.remote.response.InstanceResponse;
@ -53,6 +54,9 @@ public class ResponseRegistry {
REGISTRY_RESPONSES.put(ConfigResponseTypeConstants.CONFIG_QUERY, ConfigQueryResponse.class);
REGISTRY_RESPONSES.put(ConfigResponseTypeConstants.CONFIG_PUBLISH, ConfigPubishResponse.class);
REGISTRY_RESPONSES.put(ConfigResponseTypeConstants.CONFIG_REMOVE, ConfigRemoveResponse.class);
//config on cluster.
REGISTRY_RESPONSES
.put(ConfigResponseTypeConstants.CONFIG_CHANGE_CLUSTER_SYNC, ConfigChangeClusterSyncResponse.class);
//naming response registry
REGISTRY_RESPONSES.put(NamingRemoteConstants.REGISTER_INSTANCE, InstanceResponse.class);

View File

@ -93,7 +93,9 @@ public class ClientWorker implements Closeable {
for (Listener listener : listeners) {
cache.addListener(listener);
}
notifyRpcListenConfig();
if (!cache.isListenSuccess()) {
notifyRpcListenConfig();
}
}
/**
@ -104,13 +106,15 @@ public class ClientWorker implements Closeable {
private void notifyRpcListenConfig() {
try {
if (!ParamUtils.useHttpSwitch()) {
lock.tryLock();
try {
condition.signal();
} finally {
lock.unlock();
boolean lockSuccess = lock.tryLock();
if (lockSuccess) {
try {
condition.signal();
} finally {
lock.unlock();
}
}
}
} catch (Exception e) {
LOGGER.warn("[notify rpc listen fail]", e);
@ -153,7 +157,9 @@ public class ClientWorker implements Closeable {
for (Listener listener : listeners) {
cache.addListener(listener);
}
notifyRpcListenConfig();
if (!cache.isListenSuccess()) {
notifyRpcListenConfig();
}
}
/**
@ -174,9 +180,10 @@ public class ClientWorker implements Closeable {
for (Listener listener : listeners) {
cache.addListener(listener);
}
notifyRpcListenConfig();
// if current cache is already at listening status,do not notify.
if (!cache.isListenSuccess()) {
notifyRpcListenConfig();
}
}
/**
@ -653,7 +660,7 @@ public class ClientWorker implements Closeable {
try {
while (true) {
try {
lock.tryLock();
lock.lock();
//System.out.println("wait execute listen..");
condition.await();
executeRpcListen();
@ -696,11 +703,14 @@ public class ClientWorker implements Closeable {
CacheData cacheData = cacheMap.get().get(groupKey);
if (cacheData != null) {
cacheData.setListenSuccess(false);
try {
lock.tryLock();
condition.signal();
} finally {
lock.unlock();
boolean lockSuccess = lock.tryLock();
if (lockSuccess) {
try {
condition.signal();
} finally {
lock.unlock();
}
}
}
}
@ -712,11 +722,13 @@ public class ClientWorker implements Closeable {
@Override
public void onConnected() {
lock.tryLock();
try {
condition.signal();
} finally {
lock.unlock();
boolean lockSuccess = lock.tryLock();
if (lockSuccess) {
try {
condition.signal();
} finally {
lock.unlock();
}
}
}
@ -816,7 +828,7 @@ public class ClientWorker implements Closeable {
try {
ConfigChangeBatchListenResponse configChangeBatchListenResponse = rpcClientProxy
.listenConfigChange(listenConfigString);
if (configChangeBatchListenResponse.isSuccess()) {
if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedGroupKeys())) {
for (String groupKey : configChangeBatchListenResponse.getChangedGroupKeys()) {

View File

@ -25,12 +25,18 @@ import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.config.utils.ParamUtils;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.StringUtils;
import java.util.HashMap;
import java.util.Map;
/**
* config grpc client proxy.
@ -44,7 +50,19 @@ public class ConfigClientProxy {
private RpcClient rpcClient;
public ConfigClientProxy() {
rpcClient = RpcClientFactory.getClient("config", ConnectionType.RSOCKET);
ConnectionType connectionType = ConnectionType.GRPC;
String connetionType = ParamUtils.configRemoteConnectionType();
if (StringUtils.isNotBlank(connetionType)) {
ConnectionType connectionType1 = ConnectionType.valueOf(connetionType);
if (connectionType1 != null) {
connectionType = connectionType1;
}
}
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_CONFIG);
rpcClient = RpcClientFactory.createClient("config", connectionType, labels);
}
public Response request(Request request) throws NacosException {

View File

@ -218,4 +218,13 @@ public class ParamUtils {
return "Y".equalsIgnoreCase(useHttpSwitch);
}
/**
* get connection type for remote.
*
* @return
*/
public static String configRemoteConnectionType() {
String remoteConnectionType = System.getProperty("nacos.remote.config.connectiontype");
return remoteConnectionType;
}
}

View File

@ -62,7 +62,7 @@ public class NamingGrpcClientProxy implements NamingClientProxy {
public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory,
ServiceInfoHolder serviceInfoHolder) throws NacosException {
this.namespaceId = namespaceId;
this.rpcClient = RpcClientFactory.getClient("naming", ConnectionType.GRPC);
this.rpcClient = RpcClientFactory.createClient("naming", ConnectionType.GRPC);
this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this);
start(serverListFactory, serviceInfoHolder);
}

View File

@ -21,6 +21,12 @@ import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.AbstractListener;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@ -40,6 +46,7 @@ public class ConfigTest {
@Before
public void before() throws Exception {
Properties properties = new Properties();
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848");
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848");
@ -49,6 +56,37 @@ public class ConfigTest {
//Thread.sleep(2000L);
}
@Test
public void test222() throws Exception {
RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET);
client.init(new ServerListFactory() {
@Override
public String genNextServer() {
return "127.0.0.1:8848";
}
@Override
public String getCurrentServer() {
return "127.0.0.1:8848";
}
});
client.start();
ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
syncRequest.setDataId("xiaochun.xxc1");
syncRequest.setGroup("xiaochun.xxc");
syncRequest.setIsBeta("N");
syncRequest.setLastModified(System.currentTimeMillis());
syncRequest.setTag("");
syncRequest.setTenant("");
System.out.println(client.isRunning());
Response response = client.request(syncRequest);
client.request(syncRequest);
client.request(syncRequest);
System.out.println(response);
}
@After
public void cleanup() throws Exception {
configService.shutDown();
@ -59,6 +97,7 @@ public class ConfigTest {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848");
//"
System.out.println("1");
List<ConfigService> configServiceList = new ArrayList<ConfigService>();
for (int i = 0; i < 200; i++) {
@ -67,10 +106,12 @@ public class ConfigTest {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("listener2:" + configInfo);
}
});
configServiceList.add(configService);
}
System.out.println("2");
Thread th = new Thread(new Runnable() {
@Override
@ -80,6 +121,8 @@ public class ConfigTest {
int times = 10000;
while (times > 0) {
try {
System.out.println("3");
boolean result = configService
.publishConfig("test", "test", "value" + System.currentTimeMillis());
@ -111,22 +154,15 @@ public class ConfigTest {
public void run() {
long start = System.currentTimeMillis();
Random random = new Random();
int times = 1000;
int times = 1;
while (times > 0) {
try {
//System.out.println("发布配置");
boolean success = configService.publishConfig(dataId + random.nextInt(20), group,
"value" + System.currentTimeMillis());
if (success) {
// System.out.println("发布配置成功");
} else {
//System.out.println("发布配置失败");
}
times--;
Thread.sleep(500L);
Thread.sleep(1000L);
} catch (Exception e) {
e.printStackTrace();
}
}
@ -149,24 +185,13 @@ public class ConfigTest {
configService.getConfigAndSignListener(dataId + i, group, 3000L, listener);
}
//configService.getConfigAndSignListener(dataId, group, 5000, listener);
//Assert.assertTrue(result);
Thread.sleep(10000L);
// configService.getConfigAndSignListener(dataId, group, 5000, listener);
for (int i = 0; i < 20; i++) {
//configService.removeListener(dataId + i, group, listener);
}
System.out.println("remove listens.");
//configService.removeListener(dataId, group, listener);
//configService.removeConfig(dataId, group);
// configService.publishConfig("lessspring2", group, "lessspring2value");
//
// configService.getConfigAndSignListener("lessspring2", group, 5000, new AbstractListener() {
// @Override
// public void receiveConfigInfo(String configInfo) {
// System.out.println("receiveConfigInfo2 :" + configInfo);
// }
// });
//
Scanner scanner = new Scanner(System.in);
System.out.println("input content");
while (scanner.hasNextLine()) {

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.common.remote;
/**
* ConnectionType.
*
* @author liuzunfei
* @version $Id: ConnectionType.java, v 0.1 2020年07月13日 7:15 PM liuzunfei Exp $
*/
@ -42,6 +43,16 @@ public enum ConnectionType {
String name;
public static ConnectionType getByType(String type) {
ConnectionType[] values = ConnectionType.values();
for (ConnectionType connectionType : values) {
if (connectionType.getType().equals(type)) {
return connectionType;
}
}
return null;
}
private ConnectionType(String type, String name) {
this.type = type;
this.name = name;

View File

@ -21,6 +21,9 @@ import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.google.common.util.concurrent.FutureCallback;
import java.util.HashMap;
import java.util.Map;
/**
* connection on client side.
*
@ -33,11 +36,25 @@ public abstract class Connection {
protected String connectionId;
protected Map<String, String> labels = new HashMap<String, String>();
public Connection(String connetionId, RpcClient.ServerInfo serverInfo) {
this.serverInfo = serverInfo;
this.connectionId = connetionId;
}
public String getLabel(String labelKey) {
return labels.get(labelKey);
}
public void putLabel(String labelKey, String labelValue) {
labels.put(labelKey, labelValue);
}
public void putLabels(Map<String, String> labels) {
labels.putAll(labels);
}
/**
* send request.
*

View File

@ -31,7 +31,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@ -42,6 +44,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR;
/**
* abstract remote client to connect to server.
*
@ -52,18 +56,24 @@ public abstract class RpcClient implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
protected static final long ACTIVE_INTERNAL = 3000L;
private ServerListFactory serverListFactory;
protected String connectionId;
protected LinkedBlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<ConnectionEvent>();
protected AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<RpcClientStatus>(
protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<RpcClientStatus>(
RpcClientStatus.WAIT_INIT);
private long activeTimeStamp = System.currentTimeMillis();
protected ScheduledExecutorService executorService;
protected Connection currentConnetion;
protected volatile Connection currentConnetion;
protected Map<String, String> labels = new HashMap<String, String>();
/**
* listener called where connect status changed.
@ -115,6 +125,14 @@ public abstract class RpcClient implements Closeable {
}
}
protected boolean overActiveTime() {
return System.currentTimeMillis() - this.activeTimeStamp > ACTIVE_INTERNAL;
}
protected void refereshActiveTimestamp() {
this.activeTimeStamp = System.currentTimeMillis();
}
/**
* check is this client is inited.
*
@ -168,6 +186,16 @@ public abstract class RpcClient implements Closeable {
serverListFactory.getClass().getName());
}
/**
* init server list factory.
*
* @param labels labels
*/
public void initLabels(Map<String, String> labels) {
this.labels.putAll(labels);
LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init label ,labels={}", this.labels);
}
/**
* Start this client.
*/
@ -196,8 +224,8 @@ public abstract class RpcClient implements Closeable {
} else if (take.isDisConnected()) {
notifyDisConnected();
}
} catch (InterruptedException e) {
//Do nothing
} catch (Exception e) {
//Donothing
}
}
}
@ -225,8 +253,9 @@ public abstract class RpcClient implements Closeable {
public void requestReply(ServerPushRequest request) {
if (request instanceof ConnectResetRequest) {
try {
if (isRunning()) {
clearContextOnResetRequest();
switchServerAsync();
}
} catch (Exception e) {
@ -247,18 +276,14 @@ public abstract class RpcClient implements Closeable {
*/
protected void switchServerAsync() {
System.out.println("1");
//return if is in switching of other thread.
if (switchingFlag.get()) {
System.out.println("1-1");
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
//only one thread can execute switching meantime.
boolean innerLock = switchingLock.tryLock();
@ -269,27 +294,28 @@ public abstract class RpcClient implements Closeable {
// loop until start client success.
boolean switchSuccess = false;
while (!switchSuccess) {
//1.get a new server
ServerInfo serverInfo = nextRpcServer();
System.out.println("1:" + serverInfo);
//2.create a new channel to new server
try {
Connection connectNew = connectToServer(serverInfo);
if (connectNew != null) {
//successfully create a new connect.
closeConnection(currentConnetion);
currentConnetion = connectNew;
rpcClientStatus.set(RpcClientStatus.RUNNING);
switchSuccess = true;
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
boolean s = eventLinkedBlockingQueue
.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
return;
}
} catch (Exception e) {
e.printStackTrace();
// error to create connection
}
try {
//sleep 1 second to switch next server.
Thread.sleep(1000L);
@ -310,7 +336,7 @@ public abstract class RpcClient implements Closeable {
private void closeConnection(Connection connection) {
if (connection != null) {
connection.close();
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
}
}
@ -328,6 +354,10 @@ public abstract class RpcClient implements Closeable {
*/
public abstract int rpcPortOffset();
protected void clearContextOnResetRequest() {
// Default do nothing.
}
/**
* send request.
*
@ -335,12 +365,32 @@ public abstract class RpcClient implements Closeable {
* @return
*/
public Response request(Request request) throws NacosException {
Response response = this.currentConnetion.request(request);
if (response != null && response instanceof ConnectionUnregisterResponse) {
switchServerAsync();
throw new IllegalStateException("Invalid client status.");
int retryTimes = 3;
Exception exceptionToThrow = null;
while (retryTimes > 0) {
try {
Response response = this.currentConnetion.request(request);
if (response != null && response instanceof ConnectionUnregisterResponse) {
clearContextOnResetRequest();
switchServerAsync();
throw new IllegalStateException("Invalid client status.");
}
refereshActiveTimestamp();
return response;
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Fail to send request,connectionId={}, request={},errorMesssage={}", this.connectionId, request,
e.getMessage());
exceptionToThrow = e;
} finally {
retryTimes--;
}
}
return response;
if (exceptionToThrow != null) {
throw new NacosException(SERVER_ERROR, exceptionToThrow);
}
return null;
}
/**
@ -351,6 +401,7 @@ public abstract class RpcClient implements Closeable {
*/
public void asyncRequest(Request request, FutureCallback<Response> callback) throws NacosException {
this.currentConnetion.asyncRequest(request, callback);
refereshActiveTimestamp();
}
/**
@ -414,7 +465,10 @@ public abstract class RpcClient implements Closeable {
}
protected ServerInfo nextRpcServer() {
getServerListFactory().genNextServer();
String s = getServerListFactory().genNextServer();
System.out.println("0...,switch..." + s);
String serverAddress = getServerListFactory().getCurrentServer();
return resolveServerInfo(serverAddress);
}

View File

@ -16,12 +16,14 @@
package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.grpc.GrpcClient;
import com.alibaba.nacos.common.remote.client.rsocket.RsocketRpcClient;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* RpcClientFactory.to support muti client for diffrent modules of usage.
@ -33,13 +35,48 @@ public class RpcClientFactory {
static Map<String, RpcClient> clientMap = new HashMap<String, RpcClient>();
public static RpcClient getClient(String clientName, ConnectionType connectionType) {
/**
* get all client.
*
* @return client collection.
*/
public static Set<Map.Entry<String, RpcClient>> getAllClientEntrys() {
Set<Map.Entry<String, RpcClient>> entries = clientMap.entrySet();
return entries;
}
/**
* shut down client.
*
* @param clientName client name.
*/
public static void destroyClient(String clientName) throws NacosException {
RpcClient rpcClient = clientMap.get(clientName);
if (rpcClient != null) {
rpcClient.shutdown();
}
clientMap.remove(clientName);
}
public static RpcClient getClient(String clientName) {
return clientMap.get(clientName);
}
/**
* create a rpc client.
*
* @param clientName client name.
* @param connectionType client type.
* @return
*/
public static RpcClient createClient(String clientName, ConnectionType connectionType) {
synchronized (clientMap) {
if (clientMap.get(clientName) == null) {
RpcClient moduleClient = null;
if (ConnectionType.GRPC.equals(connectionType)) {
moduleClient = new GrpcClient();
} else if (ConnectionType.RSOCKET.equals(connectionType)) {
moduleClient = new RsocketRpcClient();
}
@ -53,4 +90,32 @@ public class RpcClientFactory {
}
}
/**
* create a rpc client.
*
* @param clientName client name.
* @param connectionType client type.
* @return
*/
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels) {
synchronized (clientMap) {
if (clientMap.get(clientName) == null) {
RpcClient moduleClient = null;
if (ConnectionType.GRPC.equals(connectionType)) {
moduleClient = new GrpcClient();
} else if (ConnectionType.RSOCKET.equals(connectionType)) {
moduleClient = new RsocketRpcClient();
}
if (moduleClient == null) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}
moduleClient.initLabels(labels);
clientMap.put(clientName, moduleClient);
return moduleClient;
}
return clientMap.get(clientName);
}
}
}

View File

@ -35,12 +35,24 @@ import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.NameResolver;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.concurrent.TimeUnit;
/**
@ -73,10 +85,10 @@ public class GrpcClient extends RpcClient {
* @return if server check success,return a non-null stub.
*/
private RequestGrpc.RequestFutureStub createNewChannelStub(String serverIp, int serverPort) {
ManagedChannel managedChannelTemp = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext()
.build();
RequestGrpc.RequestFutureStub grpcServiceStubTemp = RequestGrpc.newFutureStub(managedChannelTemp);
boolean checkSucess = serverCheck(grpcServiceStubTemp);
@ -89,6 +101,7 @@ public class GrpcClient extends RpcClient {
}
}
/**
* shutdown a channel.
*
@ -114,7 +127,7 @@ public class GrpcClient extends RpcClient {
while (maxRetryTimes > 0) {
try {
if (!isRunning()) {
if (!isRunning() && !overActiveTime()) {
return;
}
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
@ -142,7 +155,7 @@ public class GrpcClient extends RpcClient {
private GrpcMetadata buildMeta() {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.setVersion(VersionUtils.getFullClientVersion()).build();
.setVersion(VersionUtils.getFullClientVersion()).putAllLabels(labels).build();
return meta;
}
@ -173,6 +186,7 @@ public class GrpcClient extends RpcClient {
* @param streamStub streamStub to bind.
*/
private void bindRequestStream(RequestStreamGrpc.RequestStreamStub streamStub) {
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).build();
LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest);
streamStub.requestStream(streamRequest, new StreamObserver<GrpcResponse>() {
@ -206,6 +220,7 @@ public class GrpcClient extends RpcClient {
}
});
}
private void sendAckResponse(String ackId, boolean success) {
try {
PushAckRequest request = PushAckRequest.build(ackId, success);
@ -223,7 +238,7 @@ public class GrpcClient extends RpcClient {
public void run() {
sendBeat();
}
}, 0, 3000, TimeUnit.MILLISECONDS);
}, 0, ACTIVE_INTERNAL, TimeUnit.MILLISECONDS);
}
@Override
@ -240,6 +255,7 @@ public class GrpcClient extends RpcClient {
RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
bindRequestStream(requestStreamStubTemp);
//switch current channel and stub
RequestGrpc.RequestFutureStub grpcFutureServiceStubTemp = RequestGrpc
.newFutureStub(newChannelStubTemp.getChannel());

View File

@ -50,7 +50,8 @@ public class GrpcConnection extends Connection {
/**
* executor to execute future request.
*/
static ExecutorService aynsRequestExecutor = Executors.newScheduledThreadPool(10);
static ExecutorService aynsRequestExecutor = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
/**
* grpc channel.

View File

@ -86,6 +86,15 @@ public class RsocketConnection extends Connection {
}
}
/**
* Getter method for property <tt>rSocketClient</tt>.
*
* @return property value of rSocketClient
*/
public RSocket getrSocketClient() {
return rSocketClient;
}
@Override
public String toString() {
return "RsocketConnection{" + "serverInfo=" + serverInfo + ", connectionId='" + connectionId + '\'' + '}';

View File

@ -40,9 +40,13 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* rsocket implementation of rpc client.
@ -78,52 +82,53 @@ public class RsocketRpcClient extends RpcClient {
try {
ConnectionSetupRequest conconSetupRequest = new ConnectionSetupRequest(connectionId, NetUtils.localIP(),
VersionUtils.getFullClientVersion());
VersionUtils.getFullClientVersion(), labels);
Payload setUpPayload = RsocketUtils.convertRequestToPayload(conconSetupRequest, buildMeta());
RSocket rSocket = RSocketConnector.create().setupPayload(setUpPayload).acceptor(new SocketAcceptor() {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
RSocket rsocket = new RSocketProxy(sendingSocket) {
RSocket rSocket = RSocketConnector.create().keepAlive(Duration.ofMillis(3000L), Duration.ofMillis(6000L))
.setupPayload(setUpPayload).acceptor(new SocketAcceptor() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
final ServerPushRequest request = RsocketUtils.parseServerRequestFromPayload(payload);
try {
handleServerRequest(request);
ServerPushResponse response = new ServerPushResponse();
response.setRequestId(request.getRequestId());
return Mono.just(RsocketUtils.convertResponseToPayload(response));
} catch (Exception e) {
ServerPushResponse response = new ServerPushResponse();
response.setResultCode(ResponseCode.FAIL.getCode());
response.setMessage(e.getMessage());
response.setRequestId(request.getRequestId());
return Mono.just(RsocketUtils.convertResponseToPayload(response));
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
RSocket rsocket = new RSocketProxy(sendingSocket) {
@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
final ServerPushRequest request = RsocketUtils
.parseServerRequestFromPayload(payload);
try {
handleServerRequest(request);
ServerPushResponse response = new ServerPushResponse();
response.setRequestId(request.getRequestId());
return Mono.just(RsocketUtils.convertResponseToPayload(response));
} catch (Exception e) {
ServerPushResponse response = new ServerPushResponse();
response.setResultCode(ResponseCode.FAIL.getCode());
response.setMessage(e.getMessage());
response.setRequestId(request.getRequestId());
return Mono.just(RsocketUtils.convertResponseToPayload(response));
}
} catch (Exception e) {
ServerPushResponse response = new ServerPushResponse();
response.setResultCode(ResponseCode.FAIL.getCode());
response.setMessage(e.getMessage());
return Mono.just(DefaultPayload
.create(RsocketUtils.convertResponseToPayload(response)));
}
}
} catch (Exception e) {
ServerPushResponse response = new ServerPushResponse();
response.setResultCode(ResponseCode.FAIL.getCode());
response.setMessage(e.getMessage());
return Mono.just(DefaultPayload
.create(RsocketUtils.convertResponseToPayload(response)));
}
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
System.out.println("收到服务端fireAndForget" + payload.getDataUtf8());
final ServerPushRequest request = RsocketUtils.parseServerRequestFromPayload(payload);
handleServerRequest(request);
return Mono.just(null);
}
};
@Override
public Mono<Void> fireAndForget(Payload payload) {
final ServerPushRequest request = RsocketUtils
.parseServerRequestFromPayload(payload);
handleServerRequest(request);
return Mono.just(null);
}
};
return Mono.just((RSocket) rsocket);
}
}).connect(TcpClientTransport.create(serverInfo.getServerIp(), serverInfo.getServerPort())).block();
return Mono.just((RSocket) rsocket);
}
}).connect(TcpClientTransport.create(serverInfo.getServerIp(), serverInfo.getServerPort())).block();
RsocketConnection connection = new RsocketConnection(connectionId, serverInfo, rSocket);
fireOnCloseEvent(rSocket);
return connection;
@ -147,8 +152,24 @@ public class RsocketRpcClient extends RpcClient {
}
}
void cancelfireOnCloseEvent(RSocket rSocket) {
System.out.println("cancelfireOnCloseEvent....111");
if (rSocket != null) {
System.out.println("cancelfireOnCloseEvent....222");
rSocket.onClose().subscribe().dispose();
}
}
@Override
protected void clearContextOnResetRequest() {
RsocketConnection rsocket = (RsocketConnection) currentConnetion;
cancelfireOnCloseEvent(rsocket.getrSocketClient());
}
void fireOnCloseEvent(RSocket rSocket) {
rSocket.onClose().subscribe(new Subscriber<Void>() {
Subscriber subscriber = new Subscriber<Void>() {
@Override
public void onSubscribe(Subscription subscription) {
@ -156,12 +177,11 @@ public class RsocketRpcClient extends RpcClient {
@Override
public void onNext(Void aVoid) {
}
@Override
public void onError(Throwable throwable) {
System.out.println("On error ,switch server ...");
System.out.println("On error ,switch server ..." + throwable);
switchServerAsync();
}
@ -170,7 +190,14 @@ public class RsocketRpcClient extends RpcClient {
System.out.println("On complete ,switch server ...");
switchServerAsync();
}
});
};
}
class RsocketHolder {
RSocket rsocket;
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.remote;
import com.alibaba.nacos.api.config.remote.request.ConfigRequestTypeConstants;
import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest;
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* handller to handler config change from other servers.
*
* @author liuzunfei
* @version $Id: ConfigChangeClusterSyncRequestHandler.java, v 0.1 2020年08月11日 4:35 PM liuzunfei Exp $
*/
@Component
public class ConfigChangeClusterSyncRequestHandler extends RequestHandler {
@Autowired
private DumpService dumpService;
@Override
public Request parseBodyString(String bodyString) {
return JacksonUtils.toObj(bodyString, ConfigChangeClusterSyncRequest.class);
}
@Override
public Response handle(Request request, RequestMeta meta) throws NacosException {
ConfigChangeClusterSyncRequest configChangeSyncRequest = (ConfigChangeClusterSyncRequest) request;
if (configChangeSyncRequest.isBeta()) {
dumpService.dump(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp(),
true);
} else {
dumpService.dump(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
}
return new ConfigChangeClusterSyncResponse();
}
@Override
public List<String> getRequestTypes() {
return Lists.newArrayList(ConfigRequestTypeConstants.CONFIG_CHANGE_CLUSTER_SYNC);
}
}

View File

@ -17,7 +17,7 @@
package com.alibaba.nacos.config.server.remote;
import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.api.remote.response.AbstractPushCallBack;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.core.remote.RpcPushService;
import com.alibaba.nacos.core.utils.Loggers;
@ -60,7 +60,8 @@ public class ConfigChangeNotifier {
if (!CollectionUtils.isEmpty(clients)) {
for (final String client : clients) {
rpcPushService.pushWithCallback(client, notifyRequet, new PushCallBack() {
rpcPushService.pushWithCallback(client, notifyRequet, new AbstractPushCallBack(500L) {
@Override
public void onSuccess() {
Loggers.CORE.info("push callback success.,groupKey={},clientId={}", groupKey, client);

View File

@ -0,0 +1,56 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.config.server.remote;
import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest;
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.remote.ClusterRpcClientProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* ConfigClusterRpcClientProxy.
*
* @author liuzunfei
* @version $Id: ConfigClusterRpcClientProxy.java, v 0.1 2020年08月11日 4:28 PM liuzunfei Exp $
*/
@Service
public class ConfigClusterRpcClientProxy {
@Autowired
ClusterRpcClientProxy clusterRpcClientProxy;
/**
* sync config change request.
* @param member
* @param request
* @return
* @throws NacosException exception.
*/
public ConfigChangeClusterSyncResponse syncConfigChange(Member member, ConfigChangeClusterSyncRequest request)
throws NacosException {
Response response = clusterRpcClientProxy.sendRequest(member, request);
if (response != null && response instanceof ConfigChangeClusterSyncResponse) {
return (ConfigChangeClusterSyncResponse) response;
}
return null;
}
}

View File

@ -16,17 +16,23 @@
package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest;
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.remote.ConfigClusterRpcClientProxy;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.InetUtils;
@ -60,6 +66,9 @@ import java.util.concurrent.TimeUnit;
@Service
public class AsyncNotifyService {
@Autowired
private DumpService dumpService;
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
@ -84,12 +93,25 @@ public class AsyncNotifyService {
Collection<Member> ipList = memberManager.allMembers();
// In fact, any type of queue here can be
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
for (Member member : ipList) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
if (MemberUtils.getSupportedConnectionType(member) == null) {
httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
} else {
rpcQueue.add(
new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
}
}
ConfigExecutor.executeAsyncNotify(new AsyncTask(httpclient, queue));
if (!httpQueue.isEmpty()) {
ConfigExecutor.executeAsyncNotify(new AsyncTask(httpclient, httpQueue));
}
if (!rpcQueue.isEmpty()) {
ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
}
}
}
@ -109,6 +131,9 @@ public class AsyncNotifyService {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class);
@Autowired
private ConfigClusterRpcClientProxy configClusterRpcClientProxy;
private ServerMemberManager memberManager;
class AsyncTask implements Runnable {
@ -157,6 +182,83 @@ public class AsyncNotifyService {
}
class AsyncRpcTask implements Runnable {
private Queue<NotifySingleRpcTask> queue;
public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!queue.isEmpty()) {
NotifySingleRpcTask task = queue.poll();
Member member = task.member;
ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
syncRequest.setDataId(task.getDataId());
syncRequest.setGroup(task.getGroup());
syncRequest.setIsBeta(task.isBeta ? "Y" : "N");
syncRequest.setLastModified(task.getLastModified());
syncRequest.setTag(task.tag);
syncRequest.setTenant(task.getTenant());
if (memberManager.hasMember(member.getAddress()) && !memberManager.getSelf().equals(member)) {
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
if (unHealthNeedDelay) {
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIp(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, member.getAddress());
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
try {
ConfigChangeClusterSyncResponse response = configClusterRpcClientProxy
.syncConfigChange(member, syncRequest);
if (response == null || !response.isSuccess()) {
asyncTaskExecute(task);
}
} catch (Exception e) {
asyncTaskExecute(task);
}
}
}
if (memberManager.getSelf().equals(member)) {
if (syncRequest.isBeta()) {
dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP(), true);
} else {
dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP());
}
}
}
}
}
static class NotifySingleRpcTask extends NotifyTask {
private Member member;
private boolean isBeta;
private String tag;
public NotifySingleRpcTask(String dataId, String group, String tenant, String tag, long lastModified,
boolean isBeta, Member member) {
super(dataId, group, tenant, lastModified);
this.member = member;
this.isBeta = isBeta;
this.tag = tag;
}
}
private void asyncTaskExecute(NotifySingleTask task) {
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
@ -165,6 +267,14 @@ public class AsyncNotifyService {
ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
}
private void asyncTaskExecute(NotifySingleRpcTask task) {
int delay = getDelayTime(task);
Queue<NotifySingleRpcTask> queue = new LinkedList<NotifySingleRpcTask>();
queue.add(task);
AsyncRpcTask asyncTask = new AsyncRpcTask(queue);
ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
}
class AsyncNotifyCallBack implements FutureCallback<HttpResponse> {
public AsyncNotifyCallBack(CloseableHttpAsyncClient httpClient, NotifySingleTask task) {
@ -310,7 +420,7 @@ public class AsyncNotifyService {
* @param task notify task
* @return delay
*/
private static int getDelayTime(NotifySingleTask task) {
private static int getDelayTime(NotifyTask task) {
int failCount = task.getFailCount();
int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
if (failCount <= MAX_COUNT) {
@ -325,4 +435,4 @@ public class AsyncNotifyService {
private static final int MAX_COUNT = 6;
}
}

View File

@ -16,6 +16,8 @@
package com.alibaba.nacos.console.controller;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.grpc.GrpcServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
@ -38,7 +40,7 @@ import java.util.Map;
public class ServerLoaderController {
@Autowired
private GrpcServer grpcServer;
private ConnectionManager connectionManager;
/**
* Get server state of current server.
@ -48,7 +50,7 @@ public class ServerLoaderController {
@GetMapping("/max")
public ResponseEntity updateMaxClients(@RequestParam Integer count) {
Map<String, String> responseMap = new HashMap<>(3);
grpcServer.setMaxClientCount(count);
connectionManager.coordinateMaxClientsSmoth(count);
return ResponseEntity.ok().body("success");
}
@ -60,7 +62,7 @@ public class ServerLoaderController {
@GetMapping("/reload")
public ResponseEntity reloadClients(@RequestParam Integer count) {
Map<String, String> responseMap = new HashMap<>(3);
grpcServer.reloadClient(count);
connectionManager.loadClientsSmoth(count);
return ResponseEntity.ok().body("success");
}
@ -72,9 +74,19 @@ public class ServerLoaderController {
@GetMapping("/current")
public ResponseEntity currentCount() {
Map<String, String> responseMap = new HashMap<>(3);
int count = grpcServer.currentClients();
int count = connectionManager.currentClientsCount();
return ResponseEntity.ok().body(count);
}
/**
* Get current clients.
*
* @return state json.
*/
@GetMapping("/all")
public ResponseEntity currentClients() {
Map<String, String> responseMap = new HashMap<>(3);
Map<String, Connection> stringConnectionMap = connectionManager.currentClients();
return ResponseEntity.ok().body(stringConnectionMap);
}
}

View File

@ -92,7 +92,6 @@ management.metrics.export.influx.enabled=false
#management.metrics.export.influx.consistency=one
#management.metrics.export.influx.compressed=true
#*************** Access Log Related Configurations ***************#
### If turn on the access log:
server.tomcat.accesslog.enabled=true
@ -102,8 +101,11 @@ server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D %{User-Agent}i %{Reque
### The directory of access log:
server.tomcat.basedir=
#spring.datasource.platform=mysql
#db.num=1
#db.url.0=jdbc:mysql://10.101.167.27:3306/acm?characterEncoding=utf8&connectTimeout=1000&socketTimeout=10000&autoReconnect=true
#db.user=root
#db.password=root
#*************** Access Control Related Configurations ***************#
### If enable spring security, this option is deprecated in 1.2.0:
#spring.security.enabled=false

View File

@ -38,9 +38,11 @@ public class MemberMetaDataConstants {
public static final String VERSION = "version";
public static final String SUPPORT_REMOTE_C_TYPE = "remoteConnectType";
public static final String[] META_KEY_LIST = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT, WEIGHT,
LAST_REFRESH_TIME, VERSION};
LAST_REFRESH_TIME, VERSION, SUPPORT_REMOTE_C_TYPE};
public static final String[] META_KEY_LIST_WITHOUT_LAST_REFRESH_TIME = new String[] {SITE_KEY, AD_WEIGHT, RAFT_PORT,
WEIGHT, VERSION};
WEIGHT, VERSION, SUPPORT_REMOTE_C_TYPE};
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.core.cluster;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Loggers;
@ -93,6 +94,22 @@ public class MemberUtils {
return target;
}
/**
* get support member connection type.
*
* @param member
* @return
*/
public static ConnectionType getSupportedConnectionType(Member member) {
Map<String, Object> extendInfo = member.getExtendInfo();
if (extendInfo == null || !extendInfo.containsKey(MemberMetaDataConstants.SUPPORT_REMOTE_C_TYPE)) {
return null;
} else {
String type = (String) extendInfo.get(MemberMetaDataConstants.SUPPORT_REMOTE_C_TYPE);
return ConnectionType.getByType(type);
}
}
public static int calculateRaftPort(Member member) {
return member.getPort() - 1000;
}

View File

@ -28,6 +28,7 @@ import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.VersionUtils;
@ -134,6 +135,7 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
this.localAddress = InetUtils.getSelfIp() + ":" + port;
this.self = MemberUtils.singleParse(this.localAddress);
this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
this.self.setExtendVal(MemberMetaDataConstants.SUPPORT_REMOTE_C_TYPE, ConnectionType.RSOCKET.getType());
serverList.put(self.getAddress(), self);
// register NodeChangeEvent publisher to NotifyManager

View File

@ -0,0 +1,166 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.cluster.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static com.alibaba.nacos.api.exception.NacosException.CLIENT_INVALID_PARAM;
/**
* cluster rpc client proxy.
*
* @author liuzunfei
* @version $Id: ClusterRpcClientProxy.java, v 0.1 2020年08月11日 2:11 PM liuzunfei Exp $
*/
@Service
public class ClusterRpcClientProxy extends MemberChangeListener {
@Autowired
ServerMemberManager serverMemberManager;
/**
* init after constructor.
*/
@PostConstruct
public void init() {
try {
NotifyCenter.registerSubscriber(this);
List<Member> members = serverMemberManager.allMembersWithoutSelf();
refresh(members);
Loggers.CLUSTER
.warn("[ClusterRpcClientProxy] succss to refresh cluster rpc client on start up,members ={} ",
members);
} catch (NacosException e) {
Loggers.CLUSTER.warn("[ClusterRpcClientProxy] fail to refresh cluster rpc client,{} ", e.getMessage());
}
}
/**
* init cluster rpc clients.
*
* @param members cluster server list member list.
*/
private void refresh(List<Member> members) throws NacosException {
//ensure to create client of new members
for (Member member : members) {
ConnectionType supportedConnectionType = MemberUtils.getSupportedConnectionType(member);
if (supportedConnectionType != null) {
createRpcClientAndStart(member, supportedConnectionType);
}
}
//shutdown and remove old members.
Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntrys();
Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();
List<String> newMemberKeys = members.stream().map(a -> memberClientKey(a)).collect(Collectors.toList());
while (iterator.hasNext()) {
Map.Entry<String, RpcClient> next1 = iterator.next();
if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {
next1.getValue().shutdown();
iterator.remove();
}
}
}
private String memberClientKey(Member member) {
return "Cluster-" + member.getAddress();
}
private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException {
RpcClient client = RpcClientFactory.createClient(memberClientKey(member), type);
if (!client.getConnectionType().equals(type)) {
RpcClientFactory.destroyClient(memberClientKey(member));
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE);
client = RpcClientFactory.createClient(memberClientKey(member), type, labels);
}
if (client.isWaitInited()) {
Loggers.CLUSTER.info("create a new rpc client to member - > : {}", member);
//one fixed server
client.init(new ServerListFactory() {
@Override
public String genNextServer() {
return member.getAddress();
}
@Override
public String getCurrentServer() {
return member.getAddress();
}
});
client.start();
}
}
/**
* send request to member.
*
* @param member
* @param request
* @return
* @throws NacosException
*/
public Response sendRequest(Member member, Request request) throws NacosException {
RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
if (client != null) {
Response response = client.request(request);
return response;
} else {
throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
}
}
@Override
public void onEvent(MembersChangeEvent event) {
try {
List<Member> members = serverMemberManager.allMembersWithoutSelf();
refresh(members);
} catch (NacosException e) {
Loggers.CLUSTER.warn("[serverlist] fail to refresh cluster rpc client ", event, e.getMessage());
}
}
}

View File

@ -16,10 +16,12 @@
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.request.ServerPushRequest;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import org.apache.commons.lang3.builder.ToStringBuilder;
import java.util.concurrent.Future;
import java.util.Map;
/**
* Connection.
@ -96,15 +98,14 @@ public abstract class Connection {
*
* @param request request.
*/
public abstract Future<Boolean> sendRequestWithFuture(ServerPushRequest request) throws Exception;
public abstract PushFuture sendRequestWithFuture(ServerPushRequest request) throws Exception;
/**
* Send response to this client that associated to this connection.
*
* @param request request.
*/
public abstract void sendRequestWithCallBack(ServerPushRequest request, PushCallBack callBack)
throws Exception;
public abstract void sendRequestWithCallBack(ServerPushRequest request, PushCallBack callBack) throws Exception;
/**
* Close this connection, if this connection is not active yet.
@ -131,5 +132,20 @@ public abstract class Connection {
return metaInfo.connectionId;
}
/**
* check if this connection is sdk source.
*
* @return if this connection is sdk source.
*/
public boolean isSdkSource() {
Map<String, String> labels = metaInfo.labels;
String source = labels.get(RemoteConstants.LABEL_SOURCE);
return RemoteConstants.LABEL_SOURCE_SDK.equalsIgnoreCase(source);
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}

View File

@ -151,6 +151,9 @@ public class ConnectionManager {
List<String> expireCLients = new LinkedList<String>();
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
if (!client.isSdkSource()) {
continue;
}
long lastActiveTimestamp = entry.getValue().getLastActiveTimestamp();
if (client.heartBeatExpire() && currentStamp - lastActiveTimestamp > EXPIRE_MILLSECOND) {
expireCLients.add(client.getConnectionId());
@ -208,10 +211,14 @@ public class ConnectionManager {
this.loadClient = loadClient;
}
public int currentClients() {
public int currentClientsCount() {
return connetions.size();
}
public Map<String, Connection> currentClients() {
return connetions;
}
/**
* expel all connections.
*/

View File

@ -16,7 +16,11 @@
package com.alibaba.nacos.core.remote;
import org.apache.commons.lang3.builder.ToStringBuilder;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* ConnectionMetaInfo.
@ -56,13 +60,21 @@ public class ConnectionMetaInfo {
*/
long lastActiveTime;
public ConnectionMetaInfo(String connectionId, String clientIp, String connectType, String version) {
protected Map<String, String> labels = new HashMap<String, String>();
public String getLabel(String labelKey) {
return labels.get(labelKey);
}
public ConnectionMetaInfo(String connectionId, String clientIp, String connectType, String version,
Map<String, String> labels) {
this.connectionId = connectionId;
this.clientIp = clientIp;
this.connectType = connectType;
this.version = version;
this.createTime = new Date();
this.lastActiveTime = System.currentTimeMillis();
this.labels.putAll(labels);
}
/**
@ -172,4 +184,9 @@ public class ConnectionMetaInfo {
public void setVersion(String version) {
this.version = version;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}

View File

@ -0,0 +1,146 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import java.util.concurrent.TimeoutException;
/**
* default push future.
*
* @author liuzunfei
* @version $Id: DefaultPushFuture.java, v 0.1 2020年08月12日 7:10 PM liuzunfei Exp $
*/
public class DefaultPushFuture implements PushFuture {
private long timeStamp;
private volatile boolean isDone = false;
private boolean isSuccess;
private PushCallBack pushCallBack;
private Exception exception;
private String requestId;
/**
* Getter method for property <tt>pushCallBack</tt>.
*
* @return property value of pushCallBack
*/
public PushCallBack getPushCallBack() {
return pushCallBack;
}
/**
* Getter method for property <tt>timeStamp</tt>.
*
* @return property value of timeStamp
*/
public long getTimeStamp() {
return timeStamp;
}
public DefaultPushFuture() {
}
public DefaultPushFuture(String requestId) {
this(requestId, null);
}
public DefaultPushFuture(String requestId, PushCallBack pushCallBack) {
this.timeStamp = System.currentTimeMillis();
this.pushCallBack = pushCallBack;
this.requestId = requestId;
}
public void setSuccessResult() {
isDone = true;
isSuccess = true;
synchronized (this) {
notifyAll();
}
if (pushCallBack != null) {
if (isSuccess) {
pushCallBack.onSuccess();
}
}
}
public void setFailResult(Exception e) {
isDone = true;
isSuccess = false;
synchronized (this) {
notifyAll();
}
if (pushCallBack != null) {
if (isSuccess) {
pushCallBack.onFail(e);
}
}
}
public String getRequestId() {
return this.requestId;
}
@Override
public boolean isDone() {
return isDone;
}
@Override
public boolean get() throws TimeoutException, InterruptedException {
synchronized (this) {
while (!isDone) {
wait();
}
}
return isSuccess;
}
@Override
public boolean get(long timeout) throws TimeoutException, InterruptedException {
if (timeout < 0) {
synchronized (this) {
while (!isDone) {
wait();
}
}
} else if (timeout > 0) {
long end = System.currentTimeMillis() + timeout;
long waitTime = timeout;
synchronized (this) {
while (!isDone && waitTime > 0) {
wait(waitTime);
waitTime = end - System.currentTimeMillis();
}
}
}
if (isDone) {
return isSuccess;
} else {
throw new TimeoutException();
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
import com.alipay.sofa.jraft.error.RemotingException;
import java.util.concurrent.TimeoutException;
/**
* push future.
*
* @author liuzunfei
* @version $Id: PushFuture.java, v 0.1 2020年08月12日 7:04 PM liuzunfei Exp $
*/
public interface PushFuture {
/**
* @return
*/
boolean isDone();
/**
* @return
* @throws TimeoutException
* @throws InterruptedException
*/
boolean get() throws TimeoutException, InterruptedException;
/**
* @param timeout
* @return
* @throws TimeoutException
* @throws RemotingException
* @throws InterruptedException
*/
boolean get(long timeout) throws TimeoutException, InterruptedException;
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
import org.springframework.stereotype.Component;
/**
* RemoteConnectionEventListener.
* @author liuzunfei
* @version $Id: RemoteConnectionEventListener.java, v 0.1 2020年08月10日 1:04 AM liuzunfei Exp $
*/
@Component
public class RemoteConnectionEventListener extends ClientConnectionEventListener {
@Override
public void clientConnected(Connection connect) {
}
@Override
public void clientDisConnected(Connection connect) {
RpcAckCallbackSynchronizer.clearContext(connect.getConnectionId());
}
}

View File

@ -0,0 +1,154 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.core.utils.Loggers;
import com.alipay.hessian.clhm.ConcurrentLinkedHashMap;
import com.alipay.hessian.clhm.EvictionListener;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
/**
* serber push ack synchronier.
*
* @author liuzunfei
* @version $Id: RpcAckCallbackSynchronizer.java, v 0.1 2020年07月29日 7:56 PM liuzunfei Exp $
*/
public class RpcAckCallbackSynchronizer {
private static final long TIMEOUT = 60000L;
static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
private static final Map<String, Map<String, DefaultPushFuture>> CALLBACK_CONTEXT2 = new ConcurrentLinkedHashMap.Builder<String, Map<String, DefaultPushFuture>>()
.maximumWeightedCapacity(30000).listener(new EvictionListener<String, Map<String, DefaultPushFuture>>() {
@Override
public void onEviction(String s, Map<String, DefaultPushFuture> pushCallBack) {
pushCallBack.entrySet().forEach(new Consumer<Map.Entry<String, DefaultPushFuture>>() {
@Override
public void accept(Map.Entry<String, DefaultPushFuture> stringDefaultPushFutureEntry) {
stringDefaultPushFutureEntry.getValue().setFailResult(new TimeoutException());
}
});
}
}).build();
private static final Map<String, DefaultPushFuture> CALLBACK_CONTEXT = new ConcurrentLinkedHashMap.Builder<String, DefaultPushFuture>()
.maximumWeightedCapacity(30000).listener(new EvictionListener<String, DefaultPushFuture>() {
@Override
public void onEviction(String s, DefaultPushFuture pushCallBack) {
if (System.currentTimeMillis() - pushCallBack.getTimeStamp() > TIMEOUT) {
Loggers.CORE.warn("time out on eviction:" + pushCallBack.getRequestId());
if (pushCallBack.getPushCallBack() != null) {
pushCallBack.getPushCallBack().onTimeout();
}
} else {
pushCallBack.getPushCallBack().onFail(new RuntimeException("callback pool overlimit"));
}
}
}).build();
// static {
// executor.scheduleWithFixedDelay(new Runnable() {
// @Override
// public void run() {
// Set<String> timeOutCalls = new HashSet<>();
// long now = System.currentTimeMillis();
// for (Map.Entry<String, DefaultPushFuture> enrty : CALLBACK_CONTEXT.entrySet()) {
// if (now - enrty.getValue().getTimeStamp() > TIMEOUT) {
// timeOutCalls.add(enrty.getKey());
// }
// }
// for (String ackId : timeOutCalls) {
// DefaultPushFuture remove = CALLBACK_CONTEXT.remove(ackId);
// if (remove != null) {
// Loggers.CORE.warn("time out on scheduler:" + ackId);
// if (remove.getPushCallBack() != null) {
// remove.getPushCallBack().onTimeout();
// }
// }
// }
// }
// }, TIMEOUT, TIMEOUT, TimeUnit.MILLISECONDS);
// }
/**
* notify ackid.
*/
public static void ackNotify(String connectionId, String requestId, boolean success, Exception e) {
if (!CALLBACK_CONTEXT2.containsKey(connectionId)) {
return;
}
Map<String, DefaultPushFuture> stringDefaultPushFutureMap = CALLBACK_CONTEXT2.get(connectionId);
if (stringDefaultPushFutureMap.containsKey(requestId)) {
return;
}
DefaultPushFuture currentCallback = stringDefaultPushFutureMap.get(requestId);
if (currentCallback == null) {
return;
}
if (success) {
currentCallback.setSuccessResult();
} else {
currentCallback.setFailResult(e);
}
}
/**
* notify ackid.
*/
public static void syncCallback(String connectionId, String requestId, DefaultPushFuture defaultPushFuture)
throws Exception {
DefaultPushFuture pushCallBackPrev = CALLBACK_CONTEXT.putIfAbsent(requestId, defaultPushFuture);
if (pushCallBackPrev != null) {
throw new RuntimeException("callback conflict.");
}
}
/**
* clear context of connectionId.
*
* @param connetionId connetionId
*/
public static void clearContext(String connetionId) {
}
/**
* clear context of connectionId. TODO
*
* @param connetionId connetionId
*/
public static void clearFuture(String connetionId, String requestId) {
}
}

View File

@ -23,8 +23,6 @@ import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.Future;
/**
* push response to clients.
*
@ -53,7 +51,8 @@ public class RpcPushService {
connectionManager.unregister(connectionId);
return true;
} catch (Exception e) {
Loggers.RPC.error("error to send push response to connectionId ={},push response={}", connectionId,
Loggers.RPC_DIGEST
.error("error to send push response to connectionId ={},push response={}", connectionId,
request, e);
return false;
}
@ -68,7 +67,7 @@ public class RpcPushService {
* @param connectionId connectionId.
* @param request request.
*/
public Future<Boolean> pushWithFuture(String connectionId, ServerPushRequest request) {
public PushFuture pushWithFuture(String connectionId, ServerPushRequest request) {
Connection connection = connectionManager.getConnection(connectionId);
if (connection != null) {
@ -77,7 +76,8 @@ public class RpcPushService {
} catch (ConnectionAlreadyClosedException e) {
connectionManager.unregister(connectionId);
} catch (Exception e) {
Loggers.RPC.error("error to send push response to connectionId ={},push response={}", connectionId,
Loggers.RPC_DIGEST
.error("error to send push response to connectionId ={},push response={}", connectionId,
request, e);
}
}
@ -99,7 +99,8 @@ public class RpcPushService {
} catch (ConnectionAlreadyClosedException e) {
connectionManager.unregister(connectionId);
} catch (Exception e) {
Loggers.RPC.error("error to send push response to connectionId ={},push response={}", connectionId,
Loggers.RPC_DIGEST
.error("error to send push response to connectionId ={},push response={}", connectionId,
request, e);
}
}
@ -119,7 +120,8 @@ public class RpcPushService {
} catch (ConnectionAlreadyClosedException e) {
connectionManager.unregister(connectionId);
} catch (Exception e) {
Loggers.RPC.error("error to send push response to connectionId ={},push response={}", connectionId,
Loggers.RPC_DIGEST
.error("error to send push response to connectionId ={},push response={}", connectionId,
request, e);
}
}

View File

@ -16,8 +16,13 @@
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
/**
* abstrat rpc server .
*
@ -32,7 +37,41 @@ public abstract class RpcServer {
/**
* Start sever.
*/
public abstract void start() throws Exception;
@PostConstruct
public void start() throws Exception {
Loggers.RPC.info("Nacos {} Rpc server starting at port {}", getConnectionType(),
(ApplicationUtils.getPort() + rpcPortOffset()));
startServer();
Loggers.RPC.info("Nacos {} Rpc server started at port {}", getConnectionType(),
(ApplicationUtils.getPort() + rpcPortOffset()));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
Loggers.RPC.info("Nacos {} Rpc server stopping", getConnectionType());
try {
RpcServer.this.stopServer();
Loggers.RPC.info("Nacos {} Rpc server stopped successfully...", getConnectionType());
} catch (Exception e) {
Loggers.RPC.error("Nacos {} Rpc server stopped fail...", getConnectionType(), e);
}
}
});
}
/**
* get connection type.
*
* @return
*/
public abstract ConnectionType getConnectionType();
/**
* Start sever.
*/
public abstract void startServer() throws Exception;
/**
* the increase offset of nacos server port for rpc server port.
@ -44,17 +83,23 @@ public abstract class RpcServer {
/**
* Stop Server.
*/
public abstract void stop() throws Exception;
public void setMaxClientCount(int maxClient) {
this.connectionManager.coordinateMaxClientsSmoth(maxClient);
public void stopServer() throws Exception {
Loggers.RPC.info("Nacos clear all rpc clients...");
connectionManager.expelAll();
try {
//wait clients to switch server.
Thread.sleep(2000L);
} catch (InterruptedException e) {
//Do nothing.
}
shundownServer();
}
public void reloadClient(int loadCount) {
this.connectionManager.loadClientsSmoth(loadCount);
}
/**
* the increase offset of nacos server port for rpc server port.
*
* @return
*/
public abstract void shundownServer();
public int currentClients() {
return this.connectionManager.currentClients();
}
}

View File

@ -1,235 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.grpc;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.core.utils.Loggers;
import com.alipay.hessian.clhm.ConcurrentLinkedHashMap;
import com.alipay.hessian.clhm.EvictionListener;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* serber push ack synchronier.
*
* @author liuzunfei
* @version $Id: GrpcAckSynchronizer.java, v 0.1 2020年07月29日 7:56 PM liuzunfei Exp $
*/
public class GrpcAckSynchronizer {
private static final Map<String, AckWaitor> ACK_WAITORS = new HashMap<String, AckWaitor>();
private static final long TIMEOUT = 60000L;
static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
private static final Map<String, PushCallBackWraper> CALLBACK_CONTEXT = new ConcurrentLinkedHashMap.Builder<String, PushCallBackWraper>()
.maximumWeightedCapacity(30000).listener(new EvictionListener<String, PushCallBackWraper>() {
@Override
public void onEviction(String s, PushCallBackWraper pushCallBack) {
if (System.currentTimeMillis() - pushCallBack.getTimeStamp() > TIMEOUT && pushCallBack
.tryDeActive()) {
Loggers.CORE.warn("time out on eviction:" + pushCallBack.ackId);
pushCallBack.getPushCallBack().onTimeout();
} else {
pushCallBack.getPushCallBack().onFail(new RuntimeException("callback pool overlimit"));
}
}
}).build();
static {
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
Set<String> timeOutCalls = new HashSet<>();
long now = System.currentTimeMillis();
for (Map.Entry<String, PushCallBackWraper> enrty : CALLBACK_CONTEXT.entrySet()) {
if (now - enrty.getValue().getTimeStamp() > TIMEOUT) {
timeOutCalls.add(enrty.getKey());
}
}
for (String ackId : timeOutCalls) {
PushCallBackWraper remove = CALLBACK_CONTEXT.remove(ackId);
if (remove != null && remove.tryDeActive()) {
Loggers.CORE.warn("time out on scheduler:" + ackId);
remove.pushCallBack.onTimeout();
}
}
}
}, TIMEOUT, TIMEOUT, TimeUnit.MILLISECONDS);
}
/**
* notify ackid.
*
* @param ackId ackId.
*/
public static void ackNotify(String ackId, boolean success) {
PushCallBackWraper currentCallback = CALLBACK_CONTEXT.remove(ackId);
if (currentCallback != null && currentCallback.tryDeActive()) {
if (success) {
currentCallback.pushCallBack.onSuccess();
} else {
currentCallback.pushCallBack.onFail(new RuntimeException("client return fail"));
}
}
AckWaitor waiter = ACK_WAITORS.remove(ackId);
if (waiter != null) {
synchronized (waiter) {
waiter.setSuccess(success);
waiter.notify();
}
}
}
/**
* notify ackid.
*
* @param ackId ackId.
*/
public static void release(String ackId) {
ACK_WAITORS.remove(ackId);
}
/**
* notify ackid.
*
* @param ackId ackId.
*/
public static boolean waitAck(String ackId, long timeout) throws Exception {
AckWaitor waiter = ACK_WAITORS.get(ackId);
if (waiter != null) {
throw new RuntimeException("ackid conflict");
} else {
AckWaitor lock = new AckWaitor();
AckWaitor prev = ACK_WAITORS.putIfAbsent(ackId, lock);
if (prev == null) {
synchronized (lock) {
lock.wait(timeout);
return lock.success;
}
} else {
throw new RuntimeException("ackid conflict.");
}
}
}
/**
* notify ackid.
*
* @param ackId ackId.
*/
public static void syncCallbackOnAck(String ackId, PushCallBack pushCallBack) throws Exception {
PushCallBackWraper pushCallBackPrev = CALLBACK_CONTEXT
.putIfAbsent(ackId, new PushCallBackWraper(pushCallBack, ackId));
if (pushCallBackPrev != null) {
throw new RuntimeException("callback conflict.");
}
}
static class AckWaitor {
boolean success;
/**
* Getter method for property <tt>success</tt>.
*
* @return property value of success
*/
public boolean isSuccess() {
return success;
}
/**
* Setter method for property <tt>success</tt>.
*
* @param success value to be assigned to property success
*/
public void setSuccess(boolean success) {
this.success = success;
}
}
static class PushCallBackWraper {
long timeStamp;
PushCallBack pushCallBack;
String ackId;
private AtomicBoolean active = new AtomicBoolean(true);
public PushCallBackWraper(PushCallBack pushCallBack, String ackId) {
this.pushCallBack = pushCallBack;
this.ackId = ackId;
this.timeStamp = System.currentTimeMillis();
}
public boolean tryDeActive() {
return active.compareAndSet(true, false);
}
/**
* Getter method for property <tt>timeStamp</tt>.
*
* @return property value of timeStamp
*/
public long getTimeStamp() {
return timeStamp;
}
/**
* Getter method for property <tt>pushCallBack</tt>.
*
* @return property value of pushCallBack
*/
public PushCallBack getPushCallBack() {
return pushCallBack;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PushCallBackWraper that = (PushCallBackWraper) o;
return Objects.equals(ackId, that.ackId);
}
@Override
public int hashCode() {
return Objects.hash(ackId);
}
}
}

View File

@ -21,16 +21,13 @@ import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
import com.alibaba.nacos.core.remote.DefaultPushFuture;
import com.alibaba.nacos.core.remote.PushFuture;
import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer;
import com.alibaba.nacos.core.utils.Loggers;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* grpc connection.
*
@ -39,11 +36,6 @@ import java.util.concurrent.TimeUnit;
*/
public class GrpcConnection extends Connection {
static ThreadPoolExecutor pushWorkers = new ThreadPoolExecutor(10, 50, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(50000));
private static final long MAX_TIMEOUTS = 5000L;
private StreamObserver streamObserver;
public GrpcConnection(ConnectionMetaInfo metaInfo, StreamObserver streamObserver) {
@ -57,54 +49,18 @@ public class GrpcConnection extends Connection {
}
@Override
public boolean sendRequest(ServerPushRequest request, long timeout) throws Exception {
public boolean sendRequest(ServerPushRequest request, long timeoutMills) throws Exception {
DefaultPushFuture pushFuture = (DefaultPushFuture) sendRequestWithFuture(request);
try {
Loggers.RPC_DIGEST.info("Grpc sendRequest :" + request);
String requestId = String.valueOf(PushAckIdGenerator.getNextId());
request.setRequestId(requestId);
streamObserver.onNext(GrpcUtils.convert(request, requestId));
try {
return GrpcAckSynchronizer.waitAck(requestId, timeout);
} catch (Exception e) {
//Do nothingreturn fail.
return false;
} finally {
GrpcAckSynchronizer.release(requestId);
}
} catch (Exception e) {
if (e instanceof StatusRuntimeException) {
//return true where client is not active yet.
return true;
}
throw e;
}
}
private void sendRequestWithCallback(ServerPushRequest request, PushCallBack callBack) {
try {
Loggers.RPC_DIGEST.info("Grpc sendRequestWithCallback :" + request);
String requestId = String.valueOf(PushAckIdGenerator.getNextId());
request.setRequestId(requestId);
streamObserver.onNext(GrpcUtils.convert(request, requestId));
GrpcAckSynchronizer.syncCallbackOnAck(requestId, callBack);
} catch (Exception e) {
if (e instanceof StatusRuntimeException) {
//return true where client is not active yet.
callBack.onSuccess();
return;
}
callBack.onFail(e);
return pushFuture.get(timeoutMills);
} finally {
RpcAckCallbackSynchronizer.clearFuture(getConnectionId(), pushFuture.getRequestId());
}
}
@Override
public void sendRequestNoAck(ServerPushRequest request) throws Exception {
try {
Loggers.RPC_DIGEST.info("Grpc sendRequestNoAck :" + request);
streamObserver.onNext(GrpcUtils.convert(request, ""));
} catch (Exception e) {
if (e instanceof StatusRuntimeException) {
@ -115,36 +71,27 @@ public class GrpcConnection extends Connection {
}
@Override
public Future<Boolean> sendRequestWithFuture(ServerPushRequest request) throws Exception {
Loggers.RPC_DIGEST.info("Grpc sendRequestWithFuture :" + request);
return pushWorkers.submit(new PushCallable(request, MAX_TIMEOUTS));
public PushFuture sendRequestWithFuture(ServerPushRequest request) throws Exception {
return sendRequestInner(request, null);
}
@Override
public void sendRequestWithCallBack(ServerPushRequest request, PushCallBack callBack) throws Exception {
Loggers.RPC_DIGEST.info("Grpc sendRequestWithCallBack :" + request);
sendRequestWithCallback(request, callBack);
sendRequestInner(request, callBack);
}
private DefaultPushFuture sendRequestInner(ServerPushRequest request, PushCallBack callBack) throws Exception {
Loggers.RPC_DIGEST.info("Grpc sendRequest :" + request);
String requestId = String.valueOf(PushAckIdGenerator.getNextId());
request.setRequestId(requestId);
sendRequestNoAck(request);
DefaultPushFuture defaultPushFuture = new DefaultPushFuture(requestId, callBack);
RpcAckCallbackSynchronizer.syncCallback(getConnectionId(), requestId, defaultPushFuture);
return defaultPushFuture;
}
@Override
public void closeGrapcefully() {
}
class PushCallable implements Callable<Boolean> {
private ServerPushRequest request;
private long timeoutMills;
public PushCallable(ServerPushRequest request, long timeoutMills) {
this.request = request;
this.timeoutMills = timeoutMills;
}
@Override
public Boolean call() throws Exception {
return sendRequest(request, timeoutMills);
}
}
}

View File

@ -29,6 +29,7 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ServerCheckResponse;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.core.utils.Loggers;
@ -62,9 +63,11 @@ public class GrpcRequestHandlerReactor extends RequestGrpc.RequestImplBase {
return;
} else if (RequestTypeConstants.PUSH_ACK.equals(type)) {
// server push ack response.
String connectionId = grpcRequest.getMetadata().getConnectionId();
PushAckRequest request = JacksonUtils
.toObj(grpcRequest.getBody().getValue().toStringUtf8(), PushAckRequest.class);
GrpcAckSynchronizer.ackNotify(request.getRequestId(), request.isSuccess());
RpcAckCallbackSynchronizer
.ackNotify(connectionId, request.getRequestId(), request.isSuccess(), request.getException());
responseObserver.onNext(GrpcUtils.convert(new ServerCheckResponse()));
responseObserver.onCompleted();
return;
@ -83,6 +86,7 @@ public class GrpcRequestHandlerReactor extends RequestGrpc.RequestImplBase {
responseObserver.onCompleted();
return;
}
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
Response response = requestHandler.handle(request, requestMeta);
responseObserver.onNext(GrpcUtils.convert(response));
responseObserver.onCompleted();

View File

@ -16,17 +16,28 @@
package com.alibaba.nacos.core.remote.grpc;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.core.remote.RpcServer;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Loggers;
import io.grpc.Attributes;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.net.SocketAddress;
import java.util.UUID;
/**
* Grpc implementation as a rpc server.
@ -50,32 +61,35 @@ public class GrpcServer extends RpcServer {
@Autowired
private RequestHandlerRegistry requestHandlerRegistry;
@Autowired
private ConnectionManager connectionManager;
int grpcServerPort = ApplicationUtils.getPort() + rpcPortOffset();
private void init() {
}
@PostConstruct
@Override
public void start() throws Exception {
public ConnectionType getConnectionType() {
return ConnectionType.GRPC;
}
@Override
public void startServer() throws Exception {
init();
server = ServerBuilder.forPort(grpcServerPort).addService(streamRequestHander).addService(requestHander)
.build();
.addTransportFilter(new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
System.out.println("transportReady:" + transportAttrs);
Attributes test = transportAttrs.toBuilder().set(key, UUID.randomUUID().toString()).build();
return test;
}
@Override
public void transportTerminated(Attributes transportAttrs) {
System.out.println("transportTerminated:" + transportAttrs);
super.transportTerminated(transportAttrs);
}
}).intercept(new ConnetionIntereptor()).build();
server.start();
Loggers.RPC.info("Nacos gRPC server start successfully at port :" + grpcServerPort);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
Loggers.RPC.info("Nacos gRPC server stopping...");
GrpcServer.this.stop();
Loggers.RPC.info("Nacos gRPC server stopped successfully...");
}
});
}
@Override
@ -84,17 +98,24 @@ public class GrpcServer extends RpcServer {
}
@Override
public void stop() {
public void shundownServer() {
if (server != null) {
Loggers.RPC.info("Nacos clear all rpc clients...");
connectionManager.expelAll();
try {
//wait clients to switch server.
Thread.sleep(2000L);
} catch (InterruptedException e) {
//Do nothing.
}
server.shutdown();
}
}
static final Attributes.Key key = Attributes.Key.create("conn_id");
static class ConnetionIntereptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
Context ctx = Context.current();
// System.out.println(build);
System.out.println(call.getAttributes().get(key).toString());
return Contexts.interceptCall(Context.current(), call, headers, next);
}
}
}

View File

@ -25,6 +25,7 @@ import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
import io.grpc.Context;
import io.grpc.stub.StreamObserver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -43,12 +44,15 @@ public class GrpcStreamRequestHanderImpl extends RequestStreamGrpc.RequestStream
@Override
public void requestStream(GrpcRequest request, StreamObserver<GrpcResponse> responseObserver) {
Context current = Context.current();
GrpcMetadata metadata = request.getMetadata();
String clientIp = metadata.getClientIp();
String connectionId = metadata.getConnectionId();
String version = metadata.getVersion();
ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(connectionId, clientIp, ConnectionType.GRPC.getType(),
version);
version, metadata.getLabelsMap());
Connection connection = new GrpcConnection(metaInfo, responseObserver);
if (connectionManager.isOverLimit()) {
//Not register to the connection manager if current server is over limit.
@ -59,7 +63,9 @@ public class GrpcStreamRequestHanderImpl extends RequestStreamGrpc.RequestStream
}
} else {
connectionManager.register(connectionId, connection);
}
}
}

View File

@ -24,15 +24,14 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.rsocket.RsocketUtils;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
import com.alibaba.nacos.core.remote.PushFuture;
import com.alibaba.nacos.core.utils.Loggers;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
@ -72,23 +71,12 @@ public class RsocketConnection extends Connection {
}
@Override
public Future<Boolean> sendRequestWithFuture(ServerPushRequest request) throws Exception {
public PushFuture sendRequestWithFuture(ServerPushRequest request) throws Exception {
Loggers.RPC_DIGEST.info("Rsocket sendRequestWithFuture :" + request);
final Mono<Payload> payloadMono = clientSocket
.requestResponse(RsocketUtils.convertRequestToPayload(request, new RequestMeta()));
Future<Boolean> future = new Future<Boolean>() {
private volatile boolean cancel = false;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return cancel = true;
}
@Override
public boolean isCancelled() {
return cancel;
}
PushFuture defaultPushFuture = new PushFuture() {
@Override
public boolean isDone() {
@ -96,43 +84,53 @@ public class RsocketConnection extends Connection {
}
@Override
public Boolean get() throws InterruptedException, ExecutionException {
public boolean get() throws TimeoutException, InterruptedException {
return payloadMono.block() == null;
}
@Override
public Boolean get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return payloadMono.block(Duration.ofMillis(unit.toMillis(timeout))) == null;
public boolean get(long timeout) throws TimeoutException, InterruptedException {
return payloadMono.block(Duration.ofMillis(timeout)) == null;
}
};
return future;
return defaultPushFuture;
}
@Override
public void sendRequestWithCallBack(ServerPushRequest request, PushCallBack callBack) throws Exception {
Loggers.RPC_DIGEST.info("Rsocket sendRequestWithCallBack :" + request);
System.out.println(new Date() + "1");
Mono<Payload> payloadMono = clientSocket
.requestResponse(RsocketUtils.convertRequestToPayload(request, new RequestMeta()));
payloadMono.subscribe(new Consumer<Payload>() {
@Override
public void accept(Payload payload) {
Response response = RsocketUtils.parseResponseFromPayload(payload);
System.out.println(new Date().toString() + response);
if (response.isSuccess()) {
callBack.onSuccess();
} else {
callBack.onFail(new NacosException(response.getErrorCode(), "request fail"));
callBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
callBack.onFail(new Exception(throwable));
}
});
try {
System.out.println(new Date() + "2");
payloadMono.timeout(Duration.ofMillis(callBack.getTimeout()));
System.out.println(new Date() + "3");
} catch (Exception e) {
System.out.println("Timeout:" + e.getMessage());
callBack.onTimeout();
}
}
@Override

View File

@ -37,6 +37,7 @@ import com.alibaba.nacos.core.utils.Loggers;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketServer;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@ -59,6 +60,8 @@ public class RsocketRpcServer extends RpcServer {
private RSocketServer rSocketServer;
CloseableChannel closeChannel;
@Autowired
private RequestHandlerRegistry requestHandlerRegistry;
@ -70,11 +73,15 @@ public class RsocketRpcServer extends RpcServer {
return PORT_OFFSET;
}
@PostConstruct
@Override
public void start() throws Exception {
public void shundownServer() {
}
@Override
public void startServer() throws Exception {
RSocketServer rSocketServerInner = RSocketServer.create();
rSocketServerInner.acceptor(((setup, sendingSocket) -> {
closeChannel = rSocketServerInner.acceptor(((setup, sendingSocket) -> {
Loggers.RPC.info("Receive connection rsocket:" + setup.getDataUtf8());
RsocketUtils.PlainRequest palinrequest = null;
try {
@ -92,7 +99,7 @@ public class RsocketRpcServer extends RpcServer {
.toObj(palinrequest.getBody(), ConnectionSetupRequest.class);
ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(connectionSetupRequest.getConnectionId(),
connectionSetupRequest.getClientIp(), ConnectionType.RSOCKET.getType(),
connectionSetupRequest.getClientVersion());
connectionSetupRequest.getClientVersion(), connectionSetupRequest.getLabels());
Connection connection = new RsocketConnection(metaInfo, sendingSocket);
connectionManager.register(connection.getConnectionId(), connection);
@ -148,12 +155,18 @@ public class RsocketRpcServer extends RpcServer {
})).bind(TcpServerTransport.create("0.0.0.0", (ApplicationUtils.getPort() + PORT_OFFSET))).block();
rSocketServer = rSocketServerInner;
Loggers.RPC.info("Nacos Rsocket server start on port :" + (ApplicationUtils.getPort() + PORT_OFFSET));
}
@Override
public void stop() throws Exception {
public ConnectionType getConnectionType() {
return ConnectionType.RSOCKET;
}
@Override
public void stopServer() throws Exception {
if (this.closeChannel != null && !closeChannel.isDisposed()) {
this.closeChannel.dispose();
}
}
}

View File

@ -20,8 +20,7 @@ import com.alibaba.nacos.api.remote.request.ServerPushRequest;
import com.alibaba.nacos.api.remote.response.PushCallBack;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
import java.util.concurrent.Future;
import com.alibaba.nacos.core.remote.PushFuture;
/**
* Cluster connection.
@ -49,7 +48,7 @@ public class ClusterConnection extends Connection {
}
@Override
public Future<Boolean> sendRequestWithFuture(ServerPushRequest request) throws Exception {
public PushFuture sendRequestWithFuture(ServerPushRequest request) throws Exception {
return null;
}

View File

@ -33,6 +33,7 @@ import com.alibaba.nacos.naming.remote.RemotingConnectionHolder;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
/**
@ -79,7 +80,7 @@ public class ForwardInstanceRequestHandler extends RequestHandler<ForwardInstanc
private void addRemotingConnectionIfAbsent(RequestMeta sourceRequestMeta) {
if (null == remotingConnectionHolder.getRemotingConnection(sourceRequestMeta.getConnectionId())) {
ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(sourceRequestMeta.getConnectionId(),
sourceRequestMeta.getClientIp(), "cluster", sourceRequestMeta.getClientVersion());
sourceRequestMeta.getClientIp(), "cluster", sourceRequestMeta.getClientVersion(), new HashMap<>());
remotingConnectionHolder.clientConnected(new ClusterConnection(metaInfo));
}
}