optimize dumpService warning on starting up ; async request suppport in cluster rpc client proxy. (#3808)
* optimize dumpService warning on starting up . * async request suppport in cluster rpc client proxy. * async request suppport in cluster rpc client proxy.
This commit is contained in:
parent
721791a41e
commit
fac4879e92
@ -31,7 +31,7 @@ public class RemoteConstants {
|
|||||||
|
|
||||||
public static final String LABEL_SOURCE_SDK = "sdk";
|
public static final String LABEL_SOURCE_SDK = "sdk";
|
||||||
|
|
||||||
public static final String LABEL_SOURCE_NODE = "node";
|
public static final String LABEL_SOURCE_CLUSTER = "cluster";
|
||||||
|
|
||||||
public static final String LABEL_MODULE = "module";
|
public static final String LABEL_MODULE = "module";
|
||||||
|
|
||||||
|
@ -0,0 +1,86 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.nacos.api.remote;
|
||||||
|
|
||||||
|
import com.alibaba.nacos.api.exception.NacosException;
|
||||||
|
import com.alibaba.nacos.api.remote.request.Request;
|
||||||
|
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||||
|
import com.alibaba.nacos.api.remote.response.Response;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* connection interface,define basic operation.
|
||||||
|
*
|
||||||
|
* @author liuzunfei
|
||||||
|
* @version $Id: Requester.java, v 0.1 2020年09月11日 4:05 PM liuzunfei Exp $
|
||||||
|
*/
|
||||||
|
public interface Requester {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* send request. default time out 3 seconds.
|
||||||
|
*
|
||||||
|
* @param request request.
|
||||||
|
* @param requestMeta requestMeta.
|
||||||
|
* @return response.
|
||||||
|
* @throws NacosException exception throw.
|
||||||
|
*/
|
||||||
|
public Response request(Request request, RequestMeta requestMeta) throws NacosException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* send request.
|
||||||
|
*
|
||||||
|
* @param request request.
|
||||||
|
* @param requestMeta requestMeta.
|
||||||
|
* @param timeoutMills mills of timeouts.
|
||||||
|
* @return response response returned.
|
||||||
|
* @throws NacosException exception throw.
|
||||||
|
*/
|
||||||
|
public Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* send request.
|
||||||
|
*
|
||||||
|
* @param request request.
|
||||||
|
* @param requestMeta meta of request.
|
||||||
|
* @return request future.
|
||||||
|
* @throws NacosException exception throw.
|
||||||
|
*/
|
||||||
|
public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* send aync request. = * @param request request.
|
||||||
|
*
|
||||||
|
* @param requestMeta meta of request.
|
||||||
|
* @param requestCallBack callback of request.
|
||||||
|
* @throws NacosException exception throw.
|
||||||
|
*/
|
||||||
|
public void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack)
|
||||||
|
throws NacosException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get connection labels.
|
||||||
|
*
|
||||||
|
* @return labels.
|
||||||
|
*/
|
||||||
|
public Map<String, String> getLabels();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* close connection.
|
||||||
|
*/
|
||||||
|
public void close();
|
||||||
|
}
|
@ -54,9 +54,9 @@ public class ConfigTest {
|
|||||||
@Before
|
@Before
|
||||||
public void before() throws Exception {
|
public void before() throws Exception {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160..148:8848,127.0.0.1:8848,127.0.0.1:8848");
|
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
|
||||||
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
|
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848");
|
||||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848,11.160.144.148:8848,127.0.0.1:8848");
|
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848,11.160.144.148:8848,127.0.0.1:8848");
|
||||||
//"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848");
|
//"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848");
|
||||||
//"11.239.114.187:8848");
|
//"11.239.114.187:8848");
|
||||||
configService = NacosFactory.createConfigService(properties);
|
configService = NacosFactory.createConfigService(properties);
|
||||||
@ -66,8 +66,8 @@ public class ConfigTest {
|
|||||||
@Test
|
@Test
|
||||||
public void test222() throws Exception {
|
public void test222() throws Exception {
|
||||||
Map<String, String> labels = new HashMap<String, String>();
|
Map<String, String> labels = new HashMap<String, String>();
|
||||||
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE);
|
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_CLUSTER);
|
||||||
|
|
||||||
RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET, labels);
|
RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET, labels);
|
||||||
client.init(new ServerListFactory() {
|
client.init(new ServerListFactory() {
|
||||||
@Override
|
@Override
|
||||||
@ -181,21 +181,27 @@ public class ConfigTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test2() throws Exception {
|
public void test2() throws Exception {
|
||||||
|
final String dataId = "xiaochun.xxc";
|
||||||
|
final String group = "xiaochun.xxc";
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848");
|
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848");
|
||||||
//"
|
//"
|
||||||
List<ConfigService> configServiceList = new ArrayList<ConfigService>();
|
List<ConfigService> configServiceList = new ArrayList<ConfigService>();
|
||||||
for (int i = 0; i < 300; i++) {
|
for (int i = 0; i < 300; i++) {
|
||||||
|
|
||||||
ConfigService configService = NacosFactory.createConfigService(properties);
|
ConfigService configService = NacosFactory.createConfigService(properties);
|
||||||
configService.addListener("test", "test", new AbstractListener() {
|
|
||||||
|
|
||||||
|
Listener listener = new AbstractListener() {
|
||||||
@Override
|
@Override
|
||||||
public void receiveConfigInfo(String configInfo) {
|
public void receiveConfigInfo(String configInfo) {
|
||||||
System.out.println("listener2:" + configInfo);
|
System.out.println(
|
||||||
|
"receiveConfigInfo1 content:" + (System.currentTimeMillis() - Long.valueOf(configInfo)));
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
};
|
||||||
configServiceList.add(configService);
|
|
||||||
|
configService.addListener(dataId, group, listener);
|
||||||
|
|
||||||
System.out.println(configServiceList.size());
|
System.out.println(configServiceList.size());
|
||||||
}
|
}
|
||||||
System.out.println("2");
|
System.out.println("2");
|
||||||
@ -208,13 +214,10 @@ public class ConfigTest {
|
|||||||
int times = 10000;
|
int times = 10000;
|
||||||
while (times > 0) {
|
while (times > 0) {
|
||||||
try {
|
try {
|
||||||
System.out.println("3");
|
boolean result = configService.publishConfig(dataId, group, "" + System.currentTimeMillis());
|
||||||
|
|
||||||
boolean result = configService
|
|
||||||
.publishConfig("test", "test", "value" + System.currentTimeMillis());
|
|
||||||
|
|
||||||
times--;
|
times--;
|
||||||
Thread.sleep(3000L);
|
Thread.sleep(1000L);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
|
||||||
@ -223,7 +226,7 @@ public class ConfigTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
//th.start();
|
th.start();
|
||||||
|
|
||||||
Thread.sleep(1000000L);
|
Thread.sleep(1000000L);
|
||||||
}
|
}
|
||||||
@ -244,12 +247,12 @@ public class ConfigTest {
|
|||||||
int times = 1000;
|
int times = 1000;
|
||||||
while (times > 0) {
|
while (times > 0) {
|
||||||
try {
|
try {
|
||||||
String content1 = "value" + System.currentTimeMillis();
|
String content1 = System.currentTimeMillis() + "";
|
||||||
System.out.println("publish content:" + content1);
|
//System.out.println("publish content:" + content1);
|
||||||
configService.publishConfig(dataId, group, content1);
|
configService.publishConfig(dataId, group, content1);
|
||||||
|
|
||||||
times--;
|
times--;
|
||||||
Thread.sleep(2000L);
|
Thread.sleep(1000L);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
@ -266,8 +269,9 @@ public class ConfigTest {
|
|||||||
Listener listener = new AbstractListener() {
|
Listener listener = new AbstractListener() {
|
||||||
@Override
|
@Override
|
||||||
public void receiveConfigInfo(String configInfo) {
|
public void receiveConfigInfo(String configInfo) {
|
||||||
System.out.println("receiveConfigInfo1 content:" + configInfo + "," + System.currentTimeMillis());
|
System.out.println(
|
||||||
|
"receiveConfigInfo1 content:" + (System.currentTimeMillis() - Long.valueOf(configInfo)));
|
||||||
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ public interface HttpHeaderConsts {
|
|||||||
String ACCEPT_CHARSET = "Accept-Charset";
|
String ACCEPT_CHARSET = "Accept-Charset";
|
||||||
String ACCEPT_ENCODING = "Accept-Encoding";
|
String ACCEPT_ENCODING = "Accept-Encoding";
|
||||||
String CONTENT_ENCODING = "Content-Encoding";
|
String CONTENT_ENCODING = "Content-Encoding";
|
||||||
String CONNECTION = "Connection";
|
String CONNECTION = "Requester";
|
||||||
String REQUEST_ID = "RequestId";
|
String REQUEST_ID = "RequestId";
|
||||||
String REQUEST_MODULE = "Request-Module";
|
String REQUEST_MODULE = "Request-Module";
|
||||||
|
|
||||||
|
@ -16,12 +16,7 @@
|
|||||||
|
|
||||||
package com.alibaba.nacos.common.remote.client;
|
package com.alibaba.nacos.common.remote.client;
|
||||||
|
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
import com.alibaba.nacos.api.remote.Requester;
|
||||||
import com.alibaba.nacos.api.remote.RequestCallBack;
|
|
||||||
import com.alibaba.nacos.api.remote.RequestFuture;
|
|
||||||
import com.alibaba.nacos.api.remote.request.Request;
|
|
||||||
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
|
||||||
import com.alibaba.nacos.api.remote.response.Response;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -33,7 +28,7 @@ import java.util.Map;
|
|||||||
* @version $Id: Connection.java, v 0.1 2020年08月09日 1:32 PM liuzunfei Exp $
|
* @version $Id: Connection.java, v 0.1 2020年08月09日 1:32 PM liuzunfei Exp $
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
|
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
|
||||||
public abstract class Connection {
|
public abstract class Connection implements Requester {
|
||||||
|
|
||||||
private boolean abandon = false;
|
private boolean abandon = false;
|
||||||
|
|
||||||
@ -45,18 +40,6 @@ public abstract class Connection {
|
|||||||
this.serverInfo = serverInfo;
|
this.serverInfo = serverInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getLabel(String labelKey) {
|
|
||||||
return labels.get(labelKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void putLabel(String labelKey, String labelValue) {
|
|
||||||
labels.put(labelKey, labelValue);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void putLabels(Map<String, String> labels) {
|
|
||||||
labels.putAll(labels);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Getter method for property <tt>abandon</tt>.
|
* Getter method for property <tt>abandon</tt>.
|
||||||
*
|
*
|
||||||
@ -67,8 +50,7 @@ public abstract class Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Setter method for property <tt>abandon</tt>.
|
* Setter method for property <tt>abandon</tt>. connection event will be ignored if connection is abandoned.
|
||||||
* connection event will be ignored if connection is abandoned.
|
|
||||||
*
|
*
|
||||||
* @param abandon value to be assigned to property abandon
|
* @param abandon value to be assigned to property abandon
|
||||||
*/
|
*/
|
||||||
@ -77,49 +59,28 @@ public abstract class Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* send request.
|
* Getter method for property <tt>labels</tt>.
|
||||||
* default time out 3 seconds.
|
|
||||||
* @param request request.
|
|
||||||
* @param requestMeta requestMeta.
|
|
||||||
* @return response.
|
|
||||||
* @throws NacosException exception throw.
|
|
||||||
*/
|
|
||||||
public abstract Response request(Request request, RequestMeta requestMeta) throws NacosException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* send request.
|
|
||||||
*
|
*
|
||||||
* @param request request.
|
* @return property value of labels
|
||||||
* @param requestMeta requestMeta.
|
|
||||||
* @param timeoutMills mills of timeouts.
|
|
||||||
* @return response response returned.
|
|
||||||
* @throws NacosException exception throw.
|
|
||||||
*/
|
*/
|
||||||
public abstract Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException;
|
@Override
|
||||||
|
public Map<String, String> getLabels() {
|
||||||
|
return labels;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* send request.
|
* Setter method for property <tt>labels</tt>.
|
||||||
*
|
*
|
||||||
* @param request request.
|
* @param labels value to be assigned to property labels
|
||||||
* @param requestMeta meta of request.
|
|
||||||
* @return request future.
|
|
||||||
* @throws NacosException exception throw.
|
|
||||||
*/
|
*/
|
||||||
public abstract RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException;
|
public void putLabels(Map<String, String> labels) {
|
||||||
|
this.labels = labels;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* send aync request.
|
* Setter method for property <tt>labels</tt>.
|
||||||
= * @param request request.
|
|
||||||
* @param requestMeta meta of request.
|
|
||||||
* @param requestCallBack callback of request.
|
|
||||||
* @throws NacosException exception throw.
|
|
||||||
*/
|
*/
|
||||||
public abstract void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack)
|
public void putLabel(String labelName, String labelValue) {
|
||||||
throws NacosException;
|
this.labels.put(labelName, labelValue);
|
||||||
|
}
|
||||||
/**
|
|
||||||
* close connection.
|
|
||||||
*/
|
|
||||||
public abstract void close();
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -346,6 +346,9 @@ public abstract class RpcClient implements Closeable {
|
|||||||
switchingFlag.compareAndSet(false, true);
|
switchingFlag.compareAndSet(false, true);
|
||||||
// loop until start client success.
|
// loop until start client success.
|
||||||
boolean switchSuccess = false;
|
boolean switchSuccess = false;
|
||||||
|
|
||||||
|
int reConnectTimes = 0;
|
||||||
|
Exception lastException = null;
|
||||||
while (!switchSuccess && !isShutdwon()) {
|
while (!switchSuccess && !isShutdwon()) {
|
||||||
|
|
||||||
//1.get a new server
|
//1.get a new server
|
||||||
@ -353,7 +356,6 @@ public abstract class RpcClient implements Closeable {
|
|||||||
//2.create a new channel to new server
|
//2.create a new channel to new server
|
||||||
try {
|
try {
|
||||||
serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
|
serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
|
||||||
System.out.println(RpcClient.this.name + "trying to connect server:" + serverInfo);
|
|
||||||
|
|
||||||
Connection connectNew = connectToServer(serverInfo);
|
Connection connectNew = connectToServer(serverInfo);
|
||||||
if (connectNew != null) {
|
if (connectNew != null) {
|
||||||
@ -368,20 +370,27 @@ public abstract class RpcClient implements Closeable {
|
|||||||
boolean s = eventLinkedBlockingQueue
|
boolean s = eventLinkedBlockingQueue
|
||||||
.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
|
.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
|
||||||
return;
|
return;
|
||||||
} else {
|
|
||||||
System.out.println(RpcClient.this.name + "-fail to connect server:" + serverInfo);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isShutdwon()) {
|
if (isShutdwon()) {
|
||||||
closeConnection(connectNew);
|
closeConnection(connectNew);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lastException = null;
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
System.out.println(RpcClient.this.name + "-fail to connect server:" + serverInfo
|
lastException = e;
|
||||||
+ " ,error message is " + e.getMessage());
|
|
||||||
} finally {
|
} finally {
|
||||||
recommendServer.set(null);
|
recommendServer.set(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reConnectTimes++;
|
||||||
|
|
||||||
|
if (reConnectTimes % 30 == 0) {
|
||||||
|
System.out.println(
|
||||||
|
RpcClient.this.name + "-fail to connect server,after trying " + reConnectTimes
|
||||||
|
+ " times, last tryed server is " + serverInfo);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
//sleep 100 millsecond to switch next server.
|
//sleep 100 millsecond to switch next server.
|
||||||
@ -431,10 +440,6 @@ public abstract class RpcClient implements Closeable {
|
|||||||
*/
|
*/
|
||||||
public abstract int rpcPortOffset();
|
public abstract int rpcPortOffset();
|
||||||
|
|
||||||
protected void clearContextOnResetRequest() {
|
|
||||||
// Default do nothing.
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get current server.
|
* get current server.
|
||||||
*
|
*
|
||||||
@ -477,7 +482,6 @@ public abstract class RpcClient implements Closeable {
|
|||||||
if (response != null) {
|
if (response != null) {
|
||||||
if (response instanceof ConnectionUnregisterResponse) {
|
if (response instanceof ConnectionUnregisterResponse) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
clearContextOnResetRequest();
|
|
||||||
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
|
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
|
||||||
switchServerAsync();
|
switchServerAsync();
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,10 @@ import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
|
|||||||
import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse;
|
import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse;
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
import com.alibaba.nacos.api.exception.NacosException;
|
||||||
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||||
|
import com.alibaba.nacos.auth.annotation.Secured;
|
||||||
|
import com.alibaba.nacos.auth.common.ActionTypes;
|
||||||
import com.alibaba.nacos.common.utils.MapUtils;
|
import com.alibaba.nacos.common.utils.MapUtils;
|
||||||
|
import com.alibaba.nacos.config.server.auth.ConfigResourceParser;
|
||||||
import com.alibaba.nacos.config.server.model.ConfigInfo;
|
import com.alibaba.nacos.config.server.model.ConfigInfo;
|
||||||
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
|
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
|
||||||
import com.alibaba.nacos.config.server.service.AggrWhitelist;
|
import com.alibaba.nacos.config.server.service.AggrWhitelist;
|
||||||
@ -53,6 +56,7 @@ public class ConfigPublishRequestHandler extends RequestHandler<ConfigPublishReq
|
|||||||
private PersistService persistService;
|
private PersistService persistService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
|
||||||
public ConfigPubishResponse handle(ConfigPublishRequest myRequest, RequestMeta meta) throws NacosException {
|
public ConfigPubishResponse handle(ConfigPublishRequest myRequest, RequestMeta meta) throws NacosException {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -39,6 +39,7 @@ import java.util.HashSet;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -47,11 +48,12 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* @author liuzunfei
|
* @author liuzunfei
|
||||||
* @version $Id: ConfigChangeNotifier.java, v 0.1 2020年07月20日 3:00 PM liuzunfei Exp $
|
* @version $Id: ConfigChangeNotifier.java, v 0.1 2020年07月20日 3:00 PM liuzunfei Exp $
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component(value = "rpcConfigChangeNotifier")
|
||||||
public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
|
public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
|
||||||
|
|
||||||
private static final ScheduledExecutorService ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR = ExecutorFactory.Managed
|
private static final ScheduledExecutorService ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR = ExecutorFactory.Managed
|
||||||
.newScheduledExecutorService(ClassUtils.getCanonicalName(Config.class), 100,
|
.newScheduledExecutorService(ClassUtils.getCanonicalName(Config.class),
|
||||||
|
Runtime.getRuntime().availableProcessors() * 2,
|
||||||
new NameThreadFactory("com.alibaba.nacos.config.server.remote.ConfigChangeNotifier"));
|
new NameThreadFactory("com.alibaba.nacos.config.server.remote.ConfigChangeNotifier"));
|
||||||
|
|
||||||
public RpcConfigChangeNotifier() {
|
public RpcConfigChangeNotifier() {
|
||||||
@ -101,7 +103,8 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Loggers.RPC.info("push {} clients ,groupKey={}", clients == null ? 0 : notifyCount, groupKey);
|
Loggers.RPC.info("push {} clients ,groupKey={},queue size={}", clients == null ? 0 : notifyCount, groupKey,
|
||||||
|
((ScheduledThreadPoolExecutor) ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR).getQueue().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -151,7 +154,7 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
rpcPushService.pushWithCallback(clientId, notifyRequet, new AbstractPushCallBack(500L) {
|
rpcPushService.pushWithCallback(clientId, notifyRequet, new AbstractPushCallBack(3000L) {
|
||||||
int retryTimes = tryTimes;
|
int retryTimes = tryTimes;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -163,14 +166,13 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFail(Throwable e) {
|
public void onFail(Throwable e) {
|
||||||
Loggers.CORE.error("On failt ", e);
|
|
||||||
Loggers.CORE.warn("push fail.dataId={},group={},tenant={},clientId={},tryTimes={}",
|
Loggers.CORE.warn("push fail.dataId={},group={},tenant={},clientId={},tryTimes={}",
|
||||||
notifyRequet.getDataId(), notifyRequet.getGroup(), notifyRequet.getTenant(), clientId,
|
notifyRequet.getDataId(), notifyRequet.getGroup(), notifyRequet.getTenant(), clientId,
|
||||||
retryTimes);
|
retryTimes);
|
||||||
|
|
||||||
push(RpcPushTask.this);
|
push(RpcPushTask.this);
|
||||||
}
|
}
|
||||||
|
|
||||||
}, ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR);
|
}, ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR);
|
||||||
|
|
||||||
tryTimes++;
|
tryTimes++;
|
||||||
|
@ -20,6 +20,7 @@ import com.alibaba.nacos.config.server.configuration.ConditionOnExternalStorage;
|
|||||||
import com.alibaba.nacos.config.server.service.repository.PersistService;
|
import com.alibaba.nacos.config.server.service.repository.PersistService;
|
||||||
import com.alibaba.nacos.core.cluster.ServerMemberManager;
|
import com.alibaba.nacos.core.cluster.ServerMemberManager;
|
||||||
import org.springframework.context.annotation.Conditional;
|
import org.springframework.context.annotation.Conditional;
|
||||||
|
import org.springframework.context.annotation.DependsOn;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
@ -31,6 +32,7 @@ import javax.annotation.PostConstruct;
|
|||||||
*/
|
*/
|
||||||
@Conditional(ConditionOnExternalStorage.class)
|
@Conditional(ConditionOnExternalStorage.class)
|
||||||
@Component
|
@Component
|
||||||
|
@DependsOn({"rpcConfigChangeNotifier"})
|
||||||
public class ExternalDumpService extends DumpService {
|
public class ExternalDumpService extends DumpService {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -18,6 +18,7 @@ package com.alibaba.nacos.core.cluster.remote;
|
|||||||
|
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
import com.alibaba.nacos.api.exception.NacosException;
|
||||||
import com.alibaba.nacos.api.remote.RemoteConstants;
|
import com.alibaba.nacos.api.remote.RemoteConstants;
|
||||||
|
import com.alibaba.nacos.api.remote.RequestCallBack;
|
||||||
import com.alibaba.nacos.api.remote.request.Request;
|
import com.alibaba.nacos.api.remote.request.Request;
|
||||||
import com.alibaba.nacos.api.remote.response.Response;
|
import com.alibaba.nacos.api.remote.response.Response;
|
||||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||||
@ -111,7 +112,7 @@ public class ClusterRpcClientProxy extends MemberChangeListener {
|
|||||||
|
|
||||||
private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException {
|
private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException {
|
||||||
Map<String, String> labels = new HashMap<String, String>(2);
|
Map<String, String> labels = new HashMap<String, String>(2);
|
||||||
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE);
|
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_CLUSTER);
|
||||||
String memberClientKey = memberClientKey(member);
|
String memberClientKey = memberClientKey(member);
|
||||||
RpcClient client = RpcClientFactory.createClusterClient(memberClientKey, type, labels);
|
RpcClient client = RpcClientFactory.createClusterClient(memberClientKey, type, labels);
|
||||||
if (!client.getConnectionType().equals(type)) {
|
if (!client.getConnectionType().equals(type)) {
|
||||||
@ -154,15 +155,44 @@ public class ClusterRpcClientProxy extends MemberChangeListener {
|
|||||||
* @throws NacosException exception may throws.
|
* @throws NacosException exception may throws.
|
||||||
*/
|
*/
|
||||||
public Response sendRequest(Member member, Request request) throws NacosException {
|
public Response sendRequest(Member member, Request request) throws NacosException {
|
||||||
|
return sendRequest(member, request, 3000L);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* send request to member.
|
||||||
|
*
|
||||||
|
* @param member member of server.
|
||||||
|
* @param request request.
|
||||||
|
* @return Response response.
|
||||||
|
* @throws NacosException exception may throws.
|
||||||
|
*/
|
||||||
|
public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {
|
||||||
RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
|
RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
|
||||||
if (client != null) {
|
if (client != null) {
|
||||||
Response response = client.request(request);
|
Response response = client.request(request, timeoutMills);
|
||||||
return response;
|
return response;
|
||||||
} else {
|
} else {
|
||||||
throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
|
throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* aync send request to member with callback.
|
||||||
|
*
|
||||||
|
* @param member member of server.
|
||||||
|
* @param request request.
|
||||||
|
* @param callBack RequestCallBack.
|
||||||
|
* @throws NacosException exception may throws.
|
||||||
|
*/
|
||||||
|
public void asyncRequest(Member member, Request request, RequestCallBack callBack) throws NacosException {
|
||||||
|
RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
|
||||||
|
if (client != null) {
|
||||||
|
client.asyncRequest(request, callBack);
|
||||||
|
} else {
|
||||||
|
throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* send request to member.
|
* send request to member.
|
||||||
*
|
*
|
||||||
|
@ -16,12 +16,8 @@
|
|||||||
|
|
||||||
package com.alibaba.nacos.core.remote;
|
package com.alibaba.nacos.core.remote;
|
||||||
|
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
|
||||||
import com.alibaba.nacos.api.remote.RemoteConstants;
|
import com.alibaba.nacos.api.remote.RemoteConstants;
|
||||||
import com.alibaba.nacos.api.remote.RequestCallBack;
|
import com.alibaba.nacos.api.remote.Requester;
|
||||||
import com.alibaba.nacos.api.remote.RequestFuture;
|
|
||||||
import com.alibaba.nacos.api.remote.request.Request;
|
|
||||||
import com.alibaba.nacos.api.remote.response.Response;
|
|
||||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -33,7 +29,7 @@ import java.util.Map;
|
|||||||
* @version $Id: Connection.java, v 0.1 2020年07月13日 7:08 PM liuzunfei Exp $
|
* @version $Id: Connection.java, v 0.1 2020年07月13日 7:08 PM liuzunfei Exp $
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
|
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
|
||||||
public abstract class Connection {
|
public abstract class Connection implements Requester {
|
||||||
|
|
||||||
private final ConnectionMetaInfo metaInfo;
|
private final ConnectionMetaInfo metaInfo;
|
||||||
|
|
||||||
@ -41,48 +37,6 @@ public abstract class Connection {
|
|||||||
this.metaInfo = metaInfo;
|
this.metaInfo = metaInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Send response to this client that associated to this connection.
|
|
||||||
*
|
|
||||||
* @param request request.
|
|
||||||
* @param timeoutMills timeoutMills.
|
|
||||||
* @return Response resonse.
|
|
||||||
* @throws NacosException exception may throw.
|
|
||||||
*/
|
|
||||||
public abstract Response sendRequest(Request request, long timeoutMills) throws NacosException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send response to this client that associated to this connection.
|
|
||||||
*
|
|
||||||
* @param request request.
|
|
||||||
* @throws NacosException exception may throw.
|
|
||||||
*/
|
|
||||||
public abstract void sendRequestNoAck(Request request) throws NacosException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send response to this client that associated to this connection.
|
|
||||||
*
|
|
||||||
* @param request request.
|
|
||||||
* @return future of request.
|
|
||||||
* @throws NacosException exception may throw.
|
|
||||||
*/
|
|
||||||
public abstract RequestFuture sendRequestWithFuture(Request request) throws NacosException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send response to this client that associated to this connection.
|
|
||||||
*
|
|
||||||
* @param request request.
|
|
||||||
* @param callBack call back.
|
|
||||||
* @throws NacosException exception may throw.
|
|
||||||
*/
|
|
||||||
public abstract void sendRequestWithCallBack(Request request, RequestCallBack callBack)
|
|
||||||
throws NacosException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Close this connection, if this connection is not active yet.
|
|
||||||
*/
|
|
||||||
public abstract void closeGrapcefully();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update last Active Time to now.
|
* Update last Active Time to now.
|
||||||
*/
|
*/
|
||||||
@ -99,6 +53,11 @@ public abstract class Connection {
|
|||||||
return metaInfo.lastActiveTime;
|
return metaInfo.lastActiveTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get connection Id.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
public String getConnectionId() {
|
public String getConnectionId() {
|
||||||
return metaInfo.connectionId;
|
return metaInfo.connectionId;
|
||||||
}
|
}
|
||||||
|
@ -19,8 +19,11 @@ package com.alibaba.nacos.core.remote;
|
|||||||
import com.alibaba.nacos.api.common.Constants;
|
import com.alibaba.nacos.api.common.Constants;
|
||||||
import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
|
import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
|
||||||
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
|
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
|
||||||
|
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||||
|
import com.alibaba.nacos.api.utils.NetUtils;
|
||||||
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
|
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
|
||||||
import com.alibaba.nacos.common.utils.StringUtils;
|
import com.alibaba.nacos.common.utils.StringUtils;
|
||||||
|
import com.alibaba.nacos.common.utils.VersionUtils;
|
||||||
import com.alibaba.nacos.core.monitor.MetricsMonitor;
|
import com.alibaba.nacos.core.monitor.MetricsMonitor;
|
||||||
import com.alibaba.nacos.core.utils.Loggers;
|
import com.alibaba.nacos.core.utils.Loggers;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@ -55,8 +58,6 @@ public class ConnectionManager {
|
|||||||
|
|
||||||
String redirectAddress = null;
|
String redirectAddress = null;
|
||||||
|
|
||||||
private static final long EXPIRE_MILLSECOND = 10000L;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;
|
private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;
|
||||||
|
|
||||||
@ -94,7 +95,7 @@ public class ConnectionManager {
|
|||||||
public void unregister(String connectionId) {
|
public void unregister(String connectionId) {
|
||||||
Connection remove = this.connetions.remove(connectionId);
|
Connection remove = this.connetions.remove(connectionId);
|
||||||
if (remove != null) {
|
if (remove != null) {
|
||||||
remove.closeGrapcefully();
|
remove.close();
|
||||||
Loggers.RPC.info(" connection unregistered successfully,connectionid = {} ", connectionId);
|
Loggers.RPC.info(" connection unregistered successfully,connectionid = {} ", connectionId);
|
||||||
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
|
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
|
||||||
}
|
}
|
||||||
@ -187,7 +188,7 @@ public class ConnectionManager {
|
|||||||
connectResetRequest.setServerIp(split[0]);
|
connectResetRequest.setServerIp(split[0]);
|
||||||
connectResetRequest.setServerPort(split[1]);
|
connectResetRequest.setServerPort(split[1]);
|
||||||
}
|
}
|
||||||
connection.sendRequestNoAck(connectResetRequest);
|
connection.request(connectResetRequest, buildMeta());
|
||||||
Loggers.RPC
|
Loggers.RPC
|
||||||
.info("expel connection ,send switch server response connectionid = {},connectResetRequest={} ",
|
.info("expel connection ,send switch server response connectionid = {},connectResetRequest={} ",
|
||||||
expeledClientId, connectResetRequest);
|
expeledClientId, connectResetRequest);
|
||||||
@ -214,6 +215,13 @@ public class ConnectionManager {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RequestMeta buildMeta() {
|
||||||
|
RequestMeta meta = new RequestMeta();
|
||||||
|
meta.setClientVersion(VersionUtils.getFullClientVersion());
|
||||||
|
meta.setClientIp(NetUtils.localIP());
|
||||||
|
return meta;
|
||||||
|
}
|
||||||
|
|
||||||
public void coordinateMaxClientsSmoth(int maxClient) {
|
public void coordinateMaxClientsSmoth(int maxClient) {
|
||||||
this.maxClient = maxClient;
|
this.maxClient = maxClient;
|
||||||
}
|
}
|
||||||
@ -240,7 +248,7 @@ public class ConnectionManager {
|
|||||||
connectResetRequest.setServerPort(split[1]);
|
connectResetRequest.setServerPort(split[1]);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
connection.sendRequestNoAck(connectResetRequest);
|
connection.request(connectResetRequest, buildMeta());
|
||||||
} catch (ConnectionAlreadyClosedException e) {
|
} catch (ConnectionAlreadyClosedException e) {
|
||||||
unregister(connectionId);
|
unregister(connectionId);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -297,7 +305,7 @@ public class ConnectionManager {
|
|||||||
for (Map.Entry<String, Connection> entry : connetions.entrySet()) {
|
for (Map.Entry<String, Connection> entry : connetions.entrySet()) {
|
||||||
Connection client = entry.getValue();
|
Connection client = entry.getValue();
|
||||||
try {
|
try {
|
||||||
client.sendRequestNoAck(new ConnectResetRequest());
|
client.request(new ConnectResetRequest(), buildMeta());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
//Do Nothing.
|
//Do Nothing.
|
||||||
}
|
}
|
||||||
|
@ -18,10 +18,13 @@ package com.alibaba.nacos.core.remote;
|
|||||||
|
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
import com.alibaba.nacos.api.exception.NacosException;
|
||||||
import com.alibaba.nacos.api.remote.AbstractRequestCallBack;
|
import com.alibaba.nacos.api.remote.AbstractRequestCallBack;
|
||||||
|
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||||
import com.alibaba.nacos.api.remote.request.ServerPushRequest;
|
import com.alibaba.nacos.api.remote.request.ServerPushRequest;
|
||||||
import com.alibaba.nacos.api.remote.response.PushCallBack;
|
import com.alibaba.nacos.api.remote.response.PushCallBack;
|
||||||
import com.alibaba.nacos.api.remote.response.Response;
|
import com.alibaba.nacos.api.remote.response.Response;
|
||||||
|
import com.alibaba.nacos.api.utils.NetUtils;
|
||||||
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
|
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
|
||||||
|
import com.alibaba.nacos.common.utils.VersionUtils;
|
||||||
import com.alibaba.nacos.core.utils.Loggers;
|
import com.alibaba.nacos.core.utils.Loggers;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
@ -52,27 +55,29 @@ public class RpcPushService {
|
|||||||
Connection connection = connectionManager.getConnection(connectionId);
|
Connection connection = connectionManager.getConnection(connectionId);
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
try {
|
try {
|
||||||
connection.sendRequestWithCallBack(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
|
connection
|
||||||
|
.asyncRequest(request, buildMeta(), new AbstractRequestCallBack(requestCallBack.getTimeout()) {
|
||||||
@Override
|
|
||||||
public Executor getExcutor() {
|
@Override
|
||||||
return executor;
|
public Executor getExcutor() {
|
||||||
}
|
return executor;
|
||||||
|
}
|
||||||
@Override
|
|
||||||
public void onResponse(Response response) {
|
@Override
|
||||||
if (response.isSuccess()) {
|
public void onResponse(Response response) {
|
||||||
requestCallBack.onSuccess();
|
if (response.isSuccess()) {
|
||||||
} else {
|
requestCallBack.onSuccess();
|
||||||
requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
|
} else {
|
||||||
}
|
requestCallBack
|
||||||
}
|
.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
|
||||||
|
}
|
||||||
@Override
|
}
|
||||||
public void onException(Throwable e) {
|
|
||||||
requestCallBack.onFail(e);
|
@Override
|
||||||
}
|
public void onException(Throwable e) {
|
||||||
});
|
requestCallBack.onFail(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
} catch (ConnectionAlreadyClosedException e) {
|
} catch (ConnectionAlreadyClosedException e) {
|
||||||
connectionManager.unregister(connectionId);
|
connectionManager.unregister(connectionId);
|
||||||
requestCallBack.onSuccess();
|
requestCallBack.onSuccess();
|
||||||
@ -96,7 +101,7 @@ public class RpcPushService {
|
|||||||
Connection connection = connectionManager.getConnection(connectionId);
|
Connection connection = connectionManager.getConnection(connectionId);
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
try {
|
try {
|
||||||
connection.sendRequestNoAck(request);
|
connection.request(request, buildMeta());
|
||||||
} catch (ConnectionAlreadyClosedException e) {
|
} catch (ConnectionAlreadyClosedException e) {
|
||||||
connectionManager.unregister(connectionId);
|
connectionManager.unregister(connectionId);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -107,4 +112,11 @@ public class RpcPushService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RequestMeta buildMeta() {
|
||||||
|
RequestMeta meta = new RequestMeta();
|
||||||
|
meta.setClientVersion(VersionUtils.getFullClientVersion());
|
||||||
|
meta.setClientIp(NetUtils.localIP());
|
||||||
|
return meta;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -21,13 +21,17 @@ import com.alibaba.nacos.api.grpc.auto.Metadata;
|
|||||||
import com.alibaba.nacos.api.grpc.auto.Payload;
|
import com.alibaba.nacos.api.grpc.auto.Payload;
|
||||||
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
|
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
|
||||||
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
|
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
|
||||||
|
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||||
import com.alibaba.nacos.api.remote.response.Response;
|
import com.alibaba.nacos.api.remote.response.Response;
|
||||||
|
import com.alibaba.nacos.api.utils.NetUtils;
|
||||||
import com.alibaba.nacos.common.remote.ConnectionType;
|
import com.alibaba.nacos.common.remote.ConnectionType;
|
||||||
import com.alibaba.nacos.common.remote.GrpcUtils;
|
import com.alibaba.nacos.common.remote.GrpcUtils;
|
||||||
|
import com.alibaba.nacos.common.utils.VersionUtils;
|
||||||
import com.alibaba.nacos.core.remote.Connection;
|
import com.alibaba.nacos.core.remote.Connection;
|
||||||
import com.alibaba.nacos.core.remote.ConnectionManager;
|
import com.alibaba.nacos.core.remote.ConnectionManager;
|
||||||
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
|
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
|
||||||
import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer;
|
import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer;
|
||||||
|
import com.alibaba.nacos.core.utils.Loggers;
|
||||||
import io.grpc.Context;
|
import io.grpc.Context;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@ -38,6 +42,7 @@ import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* grpc bi stream request .
|
* grpc bi stream request .
|
||||||
|
*
|
||||||
* @author liuzunfei
|
* @author liuzunfei
|
||||||
* @version $Id: GrpcBiStreamRequest.java, v 0.1 2020年09月01日 10:41 PM liuzunfei Exp $
|
* @version $Id: GrpcBiStreamRequest.java, v 0.1 2020年09月01日 10:41 PM liuzunfei Exp $
|
||||||
*/
|
*/
|
||||||
@ -64,13 +69,13 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
|
|||||||
String version = metadata.getClientVersion();
|
String version = metadata.getClientVersion();
|
||||||
ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(connectionId, clientIp,
|
ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(connectionId, clientIp,
|
||||||
ConnectionType.GRPC.getType(), version, metadata.getLabelsMap());
|
ConnectionType.GRPC.getType(), version, metadata.getLabelsMap());
|
||||||
|
|
||||||
Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
|
Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
|
||||||
if (connectionManager.isOverLimit()) {
|
if (connectionManager.isOverLimit()) {
|
||||||
//Not register to the connection manager if current server is over limit.
|
//Not register to the connection manager if current server is over limit.
|
||||||
try {
|
try {
|
||||||
connection.sendRequestNoAck(new ConnectResetRequest());
|
connection.request(new ConnectResetRequest(), buildMeta());
|
||||||
connection.closeGrapcefully();
|
connection.close();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
//Do nothing.
|
//Do nothing.
|
||||||
}
|
}
|
||||||
@ -78,7 +83,9 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
|
|||||||
connectionManager.register(connectionId, connection);
|
connectionManager.register(connectionId, connection);
|
||||||
}
|
}
|
||||||
} else if (plainRequest.getBody() instanceof Response) {
|
} else if (plainRequest.getBody() instanceof Response) {
|
||||||
|
|
||||||
Response response = (Response) plainRequest.getBody();
|
Response response = (Response) plainRequest.getBody();
|
||||||
|
Loggers.RPC_DIGEST.debug(String.format("[%s] response receive :%s ", "grpc", response.toString()));
|
||||||
RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
|
RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,4 +104,12 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
|
|||||||
|
|
||||||
return streamObserver;
|
return streamObserver;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RequestMeta buildMeta() {
|
||||||
|
RequestMeta meta = new RequestMeta();
|
||||||
|
meta.setClientVersion(VersionUtils.getFullClientVersion());
|
||||||
|
meta.setClientIp(NetUtils.localIP());
|
||||||
|
return meta;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import com.alibaba.nacos.api.remote.DefaultRequestFuture;
|
|||||||
import com.alibaba.nacos.api.remote.RequestCallBack;
|
import com.alibaba.nacos.api.remote.RequestCallBack;
|
||||||
import com.alibaba.nacos.api.remote.RequestFuture;
|
import com.alibaba.nacos.api.remote.RequestFuture;
|
||||||
import com.alibaba.nacos.api.remote.request.Request;
|
import com.alibaba.nacos.api.remote.request.Request;
|
||||||
|
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||||
import com.alibaba.nacos.api.remote.response.Response;
|
import com.alibaba.nacos.api.remote.response.Response;
|
||||||
import com.alibaba.nacos.api.utils.NetUtils;
|
import com.alibaba.nacos.api.utils.NetUtils;
|
||||||
import com.alibaba.nacos.common.remote.GrpcUtils;
|
import com.alibaba.nacos.common.remote.GrpcUtils;
|
||||||
@ -35,6 +36,8 @@ import io.grpc.StatusRuntimeException;
|
|||||||
import io.grpc.netty.shaded.io.netty.channel.Channel;
|
import io.grpc.netty.shaded.io.netty.channel.Channel;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* grpc connection.
|
* grpc connection.
|
||||||
*
|
*
|
||||||
@ -53,22 +56,9 @@ public class GrpcConnection extends Connection {
|
|||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void sendRequestNoAck(Request request, RequestMeta meta) throws NacosException {
|
||||||
public Response sendRequest(Request request, long timeoutMills) throws NacosException {
|
|
||||||
DefaultRequestFuture pushFuture = (DefaultRequestFuture) sendRequestWithFuture(request);
|
|
||||||
try {
|
try {
|
||||||
return pushFuture.get(timeoutMills);
|
streamObserver.onNext(GrpcUtils.convert(request, meta));
|
||||||
} catch (Exception e) {
|
|
||||||
throw new NacosException(NacosException.SERVER_ERROR, e);
|
|
||||||
} finally {
|
|
||||||
RpcAckCallbackSynchronizer.clearFuture(getConnectionId(), pushFuture.getRequestId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void sendRequestNoAck(Request request) throws NacosException {
|
|
||||||
try {
|
|
||||||
streamObserver.onNext(GrpcUtils.convert(request, buildMeta(request.getClass().getName())));
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (e instanceof StatusRuntimeException) {
|
if (e instanceof StatusRuntimeException) {
|
||||||
throw new ConnectionAlreadyClosedException(e);
|
throw new ConnectionAlreadyClosedException(e);
|
||||||
@ -83,21 +73,12 @@ public class GrpcConnection extends Connection {
|
|||||||
return meta;
|
return meta;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private DefaultRequestFuture sendRequestInner(Request request, RequestMeta meta, RequestCallBack callBack)
|
||||||
public RequestFuture sendRequestWithFuture(Request request) throws NacosException {
|
throws NacosException {
|
||||||
return sendRequestInner(request, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void sendRequestWithCallBack(Request request, RequestCallBack callBack) throws NacosException {
|
|
||||||
sendRequestInner(request, callBack);
|
|
||||||
}
|
|
||||||
|
|
||||||
private DefaultRequestFuture sendRequestInner(Request request, RequestCallBack callBack) throws NacosException {
|
|
||||||
Loggers.RPC_DIGEST.debug(String.format("[%s] send request : %s", "grpc", request.toString()));
|
Loggers.RPC_DIGEST.debug(String.format("[%s] send request : %s", "grpc", request.toString()));
|
||||||
String requestId = String.valueOf(PushAckIdGenerator.getNextId());
|
String requestId = String.valueOf(PushAckIdGenerator.getNextId());
|
||||||
request.setRequestId(requestId);
|
request.setRequestId(requestId);
|
||||||
sendRequestNoAck(request);
|
sendRequestNoAck(request, meta);
|
||||||
DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(this.getConnectionId(), requestId, callBack,
|
DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(this.getConnectionId(), requestId, callBack,
|
||||||
new DefaultRequestFuture.TimeoutInnerTrigger() {
|
new DefaultRequestFuture.TimeoutInnerTrigger() {
|
||||||
@Override
|
@Override
|
||||||
@ -110,12 +91,44 @@ public class GrpcConnection extends Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeGrapcefully() {
|
public Response request(Request request, RequestMeta requestMeta) throws NacosException {
|
||||||
|
return request(request, requestMeta, 3000L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException {
|
||||||
|
DefaultRequestFuture pushFuture = (DefaultRequestFuture) sendRequestInner(request, requestMeta, null);
|
||||||
|
try {
|
||||||
|
return pushFuture.get(timeoutMills);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new NacosException(NacosException.SERVER_ERROR, e);
|
||||||
|
} finally {
|
||||||
|
RpcAckCallbackSynchronizer.clearFuture(getConnectionId(), pushFuture.getRequestId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException {
|
||||||
|
return sendRequestInner(request, requestMeta, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack)
|
||||||
|
throws NacosException {
|
||||||
|
sendRequestInner(request, requestMeta, requestCallBack);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getLabels() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
try {
|
try {
|
||||||
streamObserver.onCompleted();
|
streamObserver.onCompleted();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Loggers.RPC.debug(String.format("[%s] connection close exception : %s", "grpc", e.getMessage()));
|
Loggers.RPC.debug(String.format("[%s] connection close exception : %s", "grpc", e.getMessage()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,9 +23,7 @@ import com.alibaba.nacos.api.remote.RpcScheduledExecutor;
|
|||||||
import com.alibaba.nacos.api.remote.request.Request;
|
import com.alibaba.nacos.api.remote.request.Request;
|
||||||
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||||
import com.alibaba.nacos.api.remote.response.Response;
|
import com.alibaba.nacos.api.remote.response.Response;
|
||||||
import com.alibaba.nacos.api.utils.NetUtils;
|
|
||||||
import com.alibaba.nacos.common.remote.RsocketUtils;
|
import com.alibaba.nacos.common.remote.RsocketUtils;
|
||||||
import com.alibaba.nacos.common.utils.VersionUtils;
|
|
||||||
import com.alibaba.nacos.core.remote.Connection;
|
import com.alibaba.nacos.core.remote.Connection;
|
||||||
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
|
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
|
||||||
import com.alibaba.nacos.core.utils.Loggers;
|
import com.alibaba.nacos.core.utils.Loggers;
|
||||||
@ -34,6 +32,7 @@ import io.rsocket.RSocket;
|
|||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
@ -58,14 +57,30 @@ public class RsocketConnection extends Connection {
|
|||||||
this.clientSocket = clientSocket;
|
this.clientSocket = clientSocket;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private static <T> CompletableFuture<T> failAfter(final long timeouts) {
|
||||||
public Response sendRequest(Request request, long timeoutMills) throws NacosException {
|
final CompletableFuture<T> promise = new CompletableFuture<T>();
|
||||||
|
RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object call() throws Exception {
|
||||||
|
final TimeoutException ex = new TimeoutException("Timeout after " + timeouts);
|
||||||
|
return promise.completeExceptionally(ex);
|
||||||
|
}
|
||||||
|
}, timeouts, MILLISECONDS);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response request(Request request, RequestMeta requestMeta) throws NacosException {
|
||||||
|
return request(request, requestMeta, 3000L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException {
|
||||||
Loggers.RPC_DIGEST.debug(String.format("[%s] send request : %s", "rsocket", request));
|
Loggers.RPC_DIGEST.debug(String.format("[%s] send request : %s", "rsocket", request));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Mono<Payload> payloadMono = clientSocket
|
Mono<Payload> payloadMono = clientSocket
|
||||||
.requestResponse(RsocketUtils.convertRequestToPayload(request, buildMeta()));
|
.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta));
|
||||||
Payload block = payloadMono.block(Duration.ofMillis(timeoutMills));
|
Payload block = payloadMono.block(Duration.ofMillis(timeoutMills));
|
||||||
return RsocketUtils.parseResponseFromPayload(block);
|
return RsocketUtils.parseResponseFromPayload(block);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -74,23 +89,10 @@ public class RsocketConnection extends Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendRequestNoAck(Request request) throws NacosException {
|
public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException {
|
||||||
Loggers.RPC_DIGEST.debug(String.format("[%s] send no ack request : %s", "rsocket", request));
|
|
||||||
clientSocket.fireAndForget(RsocketUtils.convertRequestToPayload(request, buildMeta())).block();
|
|
||||||
}
|
|
||||||
|
|
||||||
private RequestMeta buildMeta() {
|
|
||||||
RequestMeta meta = new RequestMeta();
|
|
||||||
meta.setClientVersion(VersionUtils.getFullClientVersion());
|
|
||||||
meta.setClientIp(NetUtils.localIP());
|
|
||||||
return meta;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RequestFuture sendRequestWithFuture(Request request) throws NacosException {
|
|
||||||
Loggers.RPC_DIGEST.debug(String.format("[%s] send future request : %s", "rsocket", request));
|
Loggers.RPC_DIGEST.debug(String.format("[%s] send future request : %s", "rsocket", request));
|
||||||
final Mono<Payload> payloadMono = clientSocket
|
final Mono<Payload> payloadMono = clientSocket
|
||||||
.requestResponse(RsocketUtils.convertRequestToPayload(request, buildMeta()));
|
.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta));
|
||||||
final CompletableFuture<Payload> payloadCompletableFuture = payloadMono.toFuture();
|
final CompletableFuture<Payload> payloadCompletableFuture = payloadMono.toFuture();
|
||||||
|
|
||||||
RequestFuture defaultPushFuture = new RequestFuture() {
|
RequestFuture defaultPushFuture = new RequestFuture() {
|
||||||
@ -116,14 +118,14 @@ public class RsocketConnection extends Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendRequestWithCallBack(Request request, RequestCallBack requestCallBack) throws NacosException {
|
public void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack)
|
||||||
|
throws NacosException {
|
||||||
Loggers.RPC_DIGEST.debug(String.format("[%s] send callback request : %s", "rsocket", request));
|
Loggers.RPC_DIGEST.debug(String.format("[%s] send callback request : %s", "rsocket", request));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Mono<Payload> response = clientSocket
|
Mono<Payload> response = clientSocket
|
||||||
.requestResponse(RsocketUtils.convertRequestToPayload(request, buildMeta()));
|
.requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta));
|
||||||
|
|
||||||
response.toFuture().acceptEither(failAfter(requestCallBack.getTimeout()), new Consumer<Payload>() {
|
response.toFuture().acceptEither(failAfter(requestCallBack.getTimeout()), new Consumer<Payload>() {
|
||||||
@Override
|
@Override
|
||||||
public void accept(Payload payload) {
|
public void accept(Payload payload) {
|
||||||
@ -139,20 +141,13 @@ public class RsocketConnection extends Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> CompletableFuture<T> failAfter(final long timeouts) {
|
@Override
|
||||||
final CompletableFuture<T> promise = new CompletableFuture<T>();
|
public Map<String, String> getLabels() {
|
||||||
RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable<Object>() {
|
return null;
|
||||||
@Override
|
|
||||||
public Object call() throws Exception {
|
|
||||||
final TimeoutException ex = new TimeoutException("Timeout after " + timeouts);
|
|
||||||
return promise.completeExceptionally(ex);
|
|
||||||
}
|
|
||||||
}, timeouts, MILLISECONDS);
|
|
||||||
return promise;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeGrapcefully() {
|
public void close() {
|
||||||
if (clientSocket != null && !clientSocket.isDisposed()) {
|
if (clientSocket != null && !clientSocket.isDisposed()) {
|
||||||
clientSocket.dispose();
|
clientSocket.dispose();
|
||||||
}
|
}
|
||||||
|
@ -22,8 +22,10 @@ import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
|
|||||||
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||||
import com.alibaba.nacos.api.remote.response.PlainBodyResponse;
|
import com.alibaba.nacos.api.remote.response.PlainBodyResponse;
|
||||||
import com.alibaba.nacos.api.remote.response.Response;
|
import com.alibaba.nacos.api.remote.response.Response;
|
||||||
|
import com.alibaba.nacos.api.utils.NetUtils;
|
||||||
import com.alibaba.nacos.common.remote.ConnectionType;
|
import com.alibaba.nacos.common.remote.ConnectionType;
|
||||||
import com.alibaba.nacos.common.remote.RsocketUtils;
|
import com.alibaba.nacos.common.remote.RsocketUtils;
|
||||||
|
import com.alibaba.nacos.common.utils.VersionUtils;
|
||||||
import com.alibaba.nacos.core.remote.BaseRpcServer;
|
import com.alibaba.nacos.core.remote.BaseRpcServer;
|
||||||
import com.alibaba.nacos.core.remote.Connection;
|
import com.alibaba.nacos.core.remote.Connection;
|
||||||
import com.alibaba.nacos.core.remote.ConnectionManager;
|
import com.alibaba.nacos.core.remote.ConnectionManager;
|
||||||
@ -106,8 +108,8 @@ public class RsocketRpcServer extends BaseRpcServer {
|
|||||||
if (connectionManager.isOverLimit()) {
|
if (connectionManager.isOverLimit()) {
|
||||||
//Not register to the connection manager if current server is over limit.
|
//Not register to the connection manager if current server is over limit.
|
||||||
try {
|
try {
|
||||||
connection.sendRequestNoAck(new ConnectResetRequest());
|
connection.request(new ConnectResetRequest(), buildMeta());
|
||||||
connection.closeGrapcefully();
|
connection.close();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
//Do nothing.
|
//Do nothing.
|
||||||
}
|
}
|
||||||
@ -146,7 +148,7 @@ public class RsocketRpcServer extends BaseRpcServer {
|
|||||||
connectionManager.unregister(connectionId);
|
connectionManager.unregister(connectionId);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
RSocketProxy rSocketProxy = new NacosRsocket(sendingSocket, connectionid);
|
RSocketProxy rSocketProxy = new NacosRsocket(sendingSocket, connectionid);
|
||||||
|
|
||||||
return Mono.just(rSocketProxy);
|
return Mono.just(rSocketProxy);
|
||||||
@ -218,4 +220,11 @@ public class RsocketRpcServer extends BaseRpcServer {
|
|||||||
this.closeChannel.dispose();
|
this.closeChannel.dispose();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RequestMeta buildMeta() {
|
||||||
|
RequestMeta meta = new RequestMeta();
|
||||||
|
meta.setClientVersion(VersionUtils.getFullClientVersion());
|
||||||
|
meta.setClientIp(NetUtils.localIP());
|
||||||
|
return meta;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,10 +20,13 @@ import com.alibaba.nacos.api.exception.NacosException;
|
|||||||
import com.alibaba.nacos.api.remote.RequestCallBack;
|
import com.alibaba.nacos.api.remote.RequestCallBack;
|
||||||
import com.alibaba.nacos.api.remote.RequestFuture;
|
import com.alibaba.nacos.api.remote.RequestFuture;
|
||||||
import com.alibaba.nacos.api.remote.request.Request;
|
import com.alibaba.nacos.api.remote.request.Request;
|
||||||
|
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||||
import com.alibaba.nacos.api.remote.response.Response;
|
import com.alibaba.nacos.api.remote.response.Response;
|
||||||
import com.alibaba.nacos.core.remote.Connection;
|
import com.alibaba.nacos.core.remote.Connection;
|
||||||
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
|
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cluster connection.
|
* Cluster connection.
|
||||||
*
|
*
|
||||||
@ -36,25 +39,33 @@ public class ClusterConnection extends Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Response sendRequest(Request request, long timeoutMills) throws NacosException {
|
public Response request(Request request, RequestMeta requestMeta) throws NacosException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendRequestNoAck(Request request) throws NacosException {
|
public Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException {
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RequestFuture sendRequestWithFuture(Request request) throws NacosException {
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendRequestWithCallBack(Request request, RequestCallBack callBack) throws NacosException {
|
public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException {
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeGrapcefully() {
|
public void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack)
|
||||||
|
throws NacosException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getLabels() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
2
pom.xml
2
pom.xml
@ -150,7 +150,7 @@
|
|||||||
<jackson-databind.version>2.10.4</jackson-databind.version>
|
<jackson-databind.version>2.10.4</jackson-databind.version>
|
||||||
<jackson-core-asl.version>1.9.13</jackson-core-asl.version>
|
<jackson-core-asl.version>1.9.13</jackson-core-asl.version>
|
||||||
<jjwt.version>0.11.2</jjwt.version>
|
<jjwt.version>0.11.2</jjwt.version>
|
||||||
<netty-all.version>4.1.42.Final</netty-all.version>
|
<netty-all.version>4.1.51.Final</netty-all.version>
|
||||||
<!--<netty-common.version>4.1.31.Final</netty-common.version>-->
|
<!--<netty-common.version>4.1.31.Final</netty-common.version>-->
|
||||||
<mina-core.version>2.0.0-RC1</mina-core.version>
|
<mina-core.version>2.0.0-RC1</mina-core.version>
|
||||||
<guava.version>24.1.1-jre</guava.version>
|
<guava.version>24.1.1-jre</guava.version>
|
||||||
|
Loading…
Reference in New Issue
Block a user