loader balance basic support: adjust loader count once and up limit ; switch single (#3752)
This commit is contained in:
parent
93364734ea
commit
682a46d541
@ -78,6 +78,7 @@ import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -225,6 +226,7 @@ public class ClientWorker implements Closeable {
|
||||
|
||||
/**
|
||||
* remove config.
|
||||
*
|
||||
* @param tenant
|
||||
* @param dataId
|
||||
* @param group
|
||||
@ -238,6 +240,7 @@ public class ClientWorker implements Closeable {
|
||||
|
||||
/**
|
||||
* publish config.
|
||||
*
|
||||
* @param dataId
|
||||
* @param group
|
||||
* @param tenant
|
||||
@ -508,6 +511,8 @@ public class ClientWorker implements Closeable {
|
||||
|
||||
private boolean isHealthServer = true;
|
||||
|
||||
private String uuid = UUID.randomUUID().toString();
|
||||
|
||||
private long timeout;
|
||||
|
||||
private ConfigTransportClient agent;
|
||||
@ -761,7 +766,8 @@ public class ClientWorker implements Closeable {
|
||||
Map<String, String> newlabels = new HashMap<String, String>(labels);
|
||||
newlabels.put("taskId", taskId);
|
||||
|
||||
RpcClient rpcClient = RpcClientFactory.createClient("config-" + taskId, getConectiontype(), newlabels);
|
||||
RpcClient rpcClient = RpcClientFactory
|
||||
.createClient("config-" + taskId + "-" + uuid, getConectiontype(), newlabels);
|
||||
if (rpcClient.isWaitInited()) {
|
||||
initHandlerRpcClient(rpcClient);
|
||||
rpcClient.start();
|
||||
@ -836,7 +842,7 @@ public class ClientWorker implements Closeable {
|
||||
* @return
|
||||
*/
|
||||
private ConfigBatchListenRequest buildConfigRequest(List<CacheData> caches) {
|
||||
|
||||
|
||||
ConfigBatchListenRequest configChangeListenRequest = new ConfigBatchListenRequest();
|
||||
for (CacheData cacheData : caches) {
|
||||
configChangeListenRequest.addConfigListenContext(cacheData.group, cacheData.dataId, cacheData.tenant,
|
||||
|
@ -51,8 +51,8 @@ public class ConfigTest {
|
||||
public void before() throws Exception {
|
||||
Properties properties = new Properties();
|
||||
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160..148:8848,127.0.0.1:8848,127.0.0.1:8848");
|
||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
|
||||
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848");
|
||||
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
|
||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848");
|
||||
//"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848");
|
||||
//"11.239.114.187:8848");
|
||||
configService = NacosFactory.createConfigService(properties);
|
||||
@ -128,10 +128,10 @@ public class ConfigTest {
|
||||
@Test
|
||||
public void test2() throws Exception {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848");
|
||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848");
|
||||
//"
|
||||
List<ConfigService> configServiceList = new ArrayList<ConfigService>();
|
||||
for (int i = 0; i < 501; i++) {
|
||||
for (int i = 0; i < 300; i++) {
|
||||
|
||||
ConfigService configService = NacosFactory.createConfigService(properties);
|
||||
configService.addListener("test", "test", new AbstractListener() {
|
||||
@ -208,7 +208,7 @@ public class ConfigTest {
|
||||
|
||||
});
|
||||
|
||||
//th.start();
|
||||
th.start();
|
||||
|
||||
Listener listener = new AbstractListener() {
|
||||
@Override
|
||||
|
@ -22,8 +22,6 @@ import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||
import com.alibaba.nacos.api.remote.request.ServerLoaderInfoRequest;
|
||||
import com.alibaba.nacos.api.remote.response.ServerLoaderInfoResponse;
|
||||
import com.alibaba.nacos.common.executor.ExecutorFactory;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacos.config.server.utils.JSONUtils;
|
||||
import com.alibaba.nacos.config.server.utils.LogUtil;
|
||||
import com.alibaba.nacos.core.cluster.Member;
|
||||
import com.alibaba.nacos.core.cluster.MemberUtils;
|
||||
@ -33,14 +31,12 @@ import com.alibaba.nacos.core.remote.Connection;
|
||||
import com.alibaba.nacos.core.remote.ConnectionManager;
|
||||
import com.alibaba.nacos.core.remote.core.ServerLoaderInfoRequestHandler;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
@ -95,37 +91,24 @@ public class ServerLoaderController {
|
||||
* @return state json.
|
||||
*/
|
||||
@GetMapping("/reload")
|
||||
public ResponseEntity reloadClients(@RequestParam Integer count,
|
||||
public ResponseEntity reloadCount(@RequestParam Integer count,
|
||||
@RequestParam(value = "redirectAddress", required = false) String redirectAddress) {
|
||||
Map<String, String> responseMap = new HashMap<>(3);
|
||||
connectionManager.loadClientsSmoth(count, redirectAddress);
|
||||
connectionManager.loadCount(count, redirectAddress);
|
||||
return ResponseEntity.ok().body("success");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current clients count with specifiec labels.
|
||||
* Get server state of current server.
|
||||
*
|
||||
* @return state json.
|
||||
*/
|
||||
@GetMapping("/current")
|
||||
public ResponseEntity currentCount(@RequestParam(value = "filters", required = false) String filters) {
|
||||
Map<String, String> filterLabels = new HashMap<>(3);
|
||||
try {
|
||||
if (StringUtils.isNotBlank(filters)) {
|
||||
HashMap<String, String> filterMap = (HashMap<String, String>) JSONUtils
|
||||
.deserializeObject(filters, HashMap.class);
|
||||
int count = connectionManager.currentClientsCount(filterMap);
|
||||
return ResponseEntity.ok().body(count);
|
||||
} else {
|
||||
int count = connectionManager.currentClientsCount();
|
||||
return ResponseEntity.ok().body(count);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
|
||||
|
||||
}
|
||||
|
||||
@GetMapping("/reloadsingle")
|
||||
public ResponseEntity reloadSingle(@RequestParam String connectionId,
|
||||
@RequestParam(value = "redirectAddress", required = false) String redirectAddress) {
|
||||
Map<String, String> responseMap = new HashMap<>(3);
|
||||
connectionManager.loadSingle(connectionId, redirectAddress);
|
||||
return ResponseEntity.ok().body("success");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -133,7 +116,7 @@ public class ServerLoaderController {
|
||||
*
|
||||
* @return state json.
|
||||
*/
|
||||
@GetMapping("/all")
|
||||
@GetMapping("/current")
|
||||
public ResponseEntity currentClients() {
|
||||
Map<String, String> responseMap = new HashMap<>(3);
|
||||
Map<String, Connection> stringConnectionMap = connectionManager.currentClients();
|
||||
@ -146,14 +129,14 @@ public class ServerLoaderController {
|
||||
*
|
||||
* @return state json.
|
||||
*/
|
||||
@GetMapping("/clustercon")
|
||||
@GetMapping("/clustermetric")
|
||||
public ResponseEntity clusterLoader() {
|
||||
|
||||
Map<String, String> responseMap = new HashMap<>(3);
|
||||
return ResponseEntity.ok().body(getConInfo());
|
||||
return ResponseEntity.ok().body(getMetrics());
|
||||
}
|
||||
|
||||
private List<ServerLoaderMetris> getConInfo() {
|
||||
private List<ServerLoaderMetris> getMetrics() {
|
||||
|
||||
ScheduledExecutorService executorService = ExecutorFactory
|
||||
.newScheduledExecutorService(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
|
||||
@ -174,7 +157,6 @@ public class ServerLoaderController {
|
||||
if (MemberUtils.isSupportedLongCon(member)) {
|
||||
count++;
|
||||
ServerLoaderInfoRequest serverLoaderInfoRequest = new ServerLoaderInfoRequest();
|
||||
|
||||
completionService.submit(new RpcTask<ServerLoaderInfoRequest>(serverLoaderInfoRequest, member));
|
||||
}
|
||||
}
|
||||
|
@ -220,11 +220,38 @@ public class ConnectionManager {
|
||||
this.maxClient = maxClient;
|
||||
}
|
||||
|
||||
public void loadClientsSmoth(int loadClient, String redirectAddress) {
|
||||
public void loadCount(int loadClient, String redirectAddress) {
|
||||
this.loadClient = loadClient;
|
||||
this.redirectAddress = redirectAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
* send load request to spefic connetionId.
|
||||
*
|
||||
* @param connectionId
|
||||
* @param redirectAddress
|
||||
*/
|
||||
public void loadSingle(String connectionId, String redirectAddress) {
|
||||
Connection connection = getConnection(connectionId);
|
||||
|
||||
if (connection != null) {
|
||||
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
|
||||
if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(":")) {
|
||||
String[] split = redirectAddress.split(":");
|
||||
connectResetRequest.setServerIp(split[0]);
|
||||
connectResetRequest.setServerPort(split[1]);
|
||||
}
|
||||
try {
|
||||
connection.sendRequestNoAck(connectResetRequest);
|
||||
} catch (ConnectionAlreadyClosedException e) {
|
||||
unregister(connectionId);
|
||||
} catch (Exception e) {
|
||||
Loggers.RPC.error("error occurs when expel connetion :", connectionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* get all client count.
|
||||
*
|
||||
|
@ -17,6 +17,7 @@
|
||||
package com.alibaba.nacos.core.remote.rsocket;
|
||||
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
|
||||
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
|
||||
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||
import com.alibaba.nacos.api.remote.response.PlainBodyResponse;
|
||||
@ -107,7 +108,18 @@ public class RsocketRpcServer extends RpcServer {
|
||||
palinrequest.getMetadata().getClientVersion(), palinrequest.getMetadata().getLabels());
|
||||
Connection connection = new RsocketConnection(metaInfo, sendingSocket);
|
||||
|
||||
connectionManager.register(connection.getConnectionId(), connection);
|
||||
if (connectionManager.isOverLimit()) {
|
||||
//Not register to the connection manager if current server is over limit.
|
||||
try {
|
||||
connection.sendRequestNoAck(new ConnectResetRequest());
|
||||
connection.closeGrapcefully();
|
||||
} catch (Exception e) {
|
||||
//Do nothing.
|
||||
}
|
||||
|
||||
} else {
|
||||
connectionManager.register(connection.getConnectionId(), connection);
|
||||
}
|
||||
|
||||
sendingSocket.onClose().subscribe(new Subscriber<Void>() {
|
||||
String connectionId;
|
||||
@ -169,7 +181,7 @@ public class RsocketRpcServer extends RpcServer {
|
||||
try {
|
||||
RsocketUtils.PlainRequest requestType = RsocketUtils.parsePlainRequestFromPayload(payload);
|
||||
Loggers.RPC_DIGEST.debug(String.format("[%s] request receive : %s", "rsocket", requestType.toString()));
|
||||
|
||||
|
||||
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(requestType.getType());
|
||||
if (requestHandler != null) {
|
||||
RequestMeta requestMeta = requestType.getMetadata();
|
||||
@ -177,7 +189,7 @@ public class RsocketRpcServer extends RpcServer {
|
||||
try {
|
||||
Response response = requestHandler.handle(requestType.getBody(), requestMeta);
|
||||
return Mono.just(RsocketUtils.convertResponseToPayload(response));
|
||||
|
||||
|
||||
} catch (NacosException e) {
|
||||
Loggers.RPC_DIGEST.debug(String
|
||||
.format("[%s] fail to handle request, error message : %s ", "rsocket", e.getMessage(),
|
||||
@ -186,7 +198,7 @@ public class RsocketRpcServer extends RpcServer {
|
||||
.convertResponseToPayload(new PlainBodyResponse("exception:" + e.getMessage())));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loggers.RPC_DIGEST.debug(String
|
||||
.format("[%s] no handler for request type : %s :", "rsocket", requestType.getType()));
|
||||
return Mono.just(RsocketUtils.convertResponseToPayload(new PlainBodyResponse("No Handler")));
|
||||
|
124
pom.xml
124
pom.xml
@ -1120,71 +1120,71 @@
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<distributionManagement>
|
||||
<snapshotRepository>
|
||||
<!-- 这里的ID一定要在maven setting文件中存在于server下的ID -->
|
||||
<id>sona</id>
|
||||
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
|
||||
</snapshotRepository>
|
||||
<repository>
|
||||
<id>sona</id>
|
||||
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
|
||||
</repository>
|
||||
</distributionManagement>
|
||||
|
||||
<!--<repositories>-->
|
||||
<!--<repository>-->
|
||||
<!--<id>central</id>-->
|
||||
<!--<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>-->
|
||||
<!--<releases>-->
|
||||
<!--<enabled>true</enabled>-->
|
||||
<!--</releases>-->
|
||||
<!--<snapshots>-->
|
||||
<!--<enabled>false</enabled>-->
|
||||
<!--</snapshots>-->
|
||||
<!--</repository>-->
|
||||
<!--<repository>-->
|
||||
<!--<id>snapshots</id>-->
|
||||
<!--<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>-->
|
||||
<!--<releases>-->
|
||||
<!--<enabled>false</enabled>-->
|
||||
<!--</releases>-->
|
||||
<!--<snapshots>-->
|
||||
<!--<enabled>true</enabled>-->
|
||||
<!--</snapshots>-->
|
||||
<!--</repository>-->
|
||||
<!--</repositories>-->
|
||||
<!--<pluginRepositories>-->
|
||||
<!--<pluginRepository>-->
|
||||
<!--<id>central</id>-->
|
||||
<!--<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>-->
|
||||
<!--<releases>-->
|
||||
<!--<enabled>true</enabled>-->
|
||||
<!--</releases>-->
|
||||
<!--<snapshots>-->
|
||||
<!--<enabled>false</enabled>-->
|
||||
<!--</snapshots>-->
|
||||
<!--</pluginRepository>-->
|
||||
<!--<pluginRepository>-->
|
||||
<!--<id>snapshots</id>-->
|
||||
<!--<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>-->
|
||||
<!--<releases>-->
|
||||
<!--<enabled>false</enabled>-->
|
||||
<!--</releases>-->
|
||||
<!--<snapshots>-->
|
||||
<!--<enabled>true</enabled>-->
|
||||
<!--</snapshots>-->
|
||||
<!--</pluginRepository>-->
|
||||
<!--</pluginRepositories>-->
|
||||
<!--<distributionManagement>-->
|
||||
<!--<repository>-->
|
||||
<!--<id>releases</id>-->
|
||||
<!--<url>http://mvnrepo.alibaba-inc.com/mvn/releases</url>-->
|
||||
<!--</repository>-->
|
||||
<!--<snapshotRepository>-->
|
||||
<!--<id>snapshots</id>-->
|
||||
<!--<url>http://mvnrepo.alibaba-inc.com/mvn/snapshots</url>-->
|
||||
<!--<!– 这里的ID一定要在maven setting文件中存在于server下的ID –>-->
|
||||
<!--<id>sona</id>-->
|
||||
<!--<url>https://oss.sonatype.org/content/repositories/snapshots/</url>-->
|
||||
<!--</snapshotRepository>-->
|
||||
<!--<repository>-->
|
||||
<!--<id>sona</id>-->
|
||||
<!--<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>-->
|
||||
<!--</repository>-->
|
||||
<!--</distributionManagement>-->
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>central</id>
|
||||
<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>
|
||||
<releases>
|
||||
<enabled>true</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>snapshots</id>
|
||||
<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>
|
||||
<releases>
|
||||
<enabled>false</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
</repositories>
|
||||
<pluginRepositories>
|
||||
<pluginRepository>
|
||||
<id>central</id>
|
||||
<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>
|
||||
<releases>
|
||||
<enabled>true</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
</pluginRepository>
|
||||
<pluginRepository>
|
||||
<id>snapshots</id>
|
||||
<url>http://mvnrepo.alibaba-inc.com/mvn/repository</url>
|
||||
<releases>
|
||||
<enabled>false</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</pluginRepository>
|
||||
</pluginRepositories>
|
||||
<distributionManagement>
|
||||
<repository>
|
||||
<id>releases</id>
|
||||
<url>http://mvnrepo.alibaba-inc.com/mvn/releases</url>
|
||||
</repository>
|
||||
<snapshotRepository>
|
||||
<id>snapshots</id>
|
||||
<url>http://mvnrepo.alibaba-inc.com/mvn/snapshots</url>
|
||||
</snapshotRepository>
|
||||
</distributionManagement>
|
||||
</project>
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user