From 682a46d54160d16d57faa198135009a2b0e6ffb7 Mon Sep 17 00:00:00 2001 From: "nov.lzf" Date: Thu, 3 Sep 2020 20:29:34 +0800 Subject: [PATCH] loader balance basic support: adjust loader count once and up limit ; switch single (#3752) --- .../client/config/impl/ClientWorker.java | 10 +- .../com/alibaba/nacos/client/ConfigTest.java | 10 +- .../controller/ServerLoaderController.java | 44 ++----- .../nacos/core/remote/ConnectionManager.java | 29 +++- .../core/remote/rsocket/RsocketRpcServer.java | 20 ++- pom.xml | 124 +++++++++--------- 6 files changed, 132 insertions(+), 105 deletions(-) diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index ccd7141d9..c63f692fa 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -78,6 +78,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; @@ -225,6 +226,7 @@ public class ClientWorker implements Closeable { /** * remove config. + * * @param tenant * @param dataId * @param group @@ -238,6 +240,7 @@ public class ClientWorker implements Closeable { /** * publish config. + * * @param dataId * @param group * @param tenant @@ -508,6 +511,8 @@ public class ClientWorker implements Closeable { private boolean isHealthServer = true; + private String uuid = UUID.randomUUID().toString(); + private long timeout; private ConfigTransportClient agent; @@ -761,7 +766,8 @@ public class ClientWorker implements Closeable { Map newlabels = new HashMap(labels); newlabels.put("taskId", taskId); - RpcClient rpcClient = RpcClientFactory.createClient("config-" + taskId, getConectiontype(), newlabels); + RpcClient rpcClient = RpcClientFactory + .createClient("config-" + taskId + "-" + uuid, getConectiontype(), newlabels); if (rpcClient.isWaitInited()) { initHandlerRpcClient(rpcClient); rpcClient.start(); @@ -836,7 +842,7 @@ public class ClientWorker implements Closeable { * @return */ private ConfigBatchListenRequest buildConfigRequest(List caches) { - + ConfigBatchListenRequest configChangeListenRequest = new ConfigBatchListenRequest(); for (CacheData cacheData : caches) { configChangeListenRequest.addConfigListenContext(cacheData.group, cacheData.dataId, cacheData.tenant, diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java index 817a2e356..95c9f254d 100644 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java @@ -51,8 +51,8 @@ public class ConfigTest { public void before() throws Exception { Properties properties = new Properties(); //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160..148:8848,127.0.0.1:8848,127.0.0.1:8848"); - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); - //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848"); + //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848"); //"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848"); //"11.239.114.187:8848"); configService = NacosFactory.createConfigService(properties); @@ -128,10 +128,10 @@ public class ConfigTest { @Test public void test2() throws Exception { Properties properties = new Properties(); - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848"); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848"); //" List configServiceList = new ArrayList(); - for (int i = 0; i < 501; i++) { + for (int i = 0; i < 300; i++) { ConfigService configService = NacosFactory.createConfigService(properties); configService.addListener("test", "test", new AbstractListener() { @@ -208,7 +208,7 @@ public class ConfigTest { }); - //th.start(); + th.start(); Listener listener = new AbstractListener() { @Override diff --git a/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java b/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java index 3b4ffb810..9cd8eedd8 100644 --- a/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java +++ b/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java @@ -22,8 +22,6 @@ import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.request.ServerLoaderInfoRequest; import com.alibaba.nacos.api.remote.response.ServerLoaderInfoResponse; import com.alibaba.nacos.common.executor.ExecutorFactory; -import com.alibaba.nacos.common.utils.StringUtils; -import com.alibaba.nacos.config.server.utils.JSONUtils; import com.alibaba.nacos.config.server.utils.LogUtil; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.MemberUtils; @@ -33,14 +31,12 @@ import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.core.ServerLoaderInfoRequestHandler; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; -import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -95,37 +91,24 @@ public class ServerLoaderController { * @return state json. */ @GetMapping("/reload") - public ResponseEntity reloadClients(@RequestParam Integer count, + public ResponseEntity reloadCount(@RequestParam Integer count, @RequestParam(value = "redirectAddress", required = false) String redirectAddress) { Map responseMap = new HashMap<>(3); - connectionManager.loadClientsSmoth(count, redirectAddress); + connectionManager.loadCount(count, redirectAddress); return ResponseEntity.ok().body("success"); } /** - * Get current clients count with specifiec labels. + * Get server state of current server. * * @return state json. */ - @GetMapping("/current") - public ResponseEntity currentCount(@RequestParam(value = "filters", required = false) String filters) { - Map filterLabels = new HashMap<>(3); - try { - if (StringUtils.isNotBlank(filters)) { - HashMap filterMap = (HashMap) JSONUtils - .deserializeObject(filters, HashMap.class); - int count = connectionManager.currentClientsCount(filterMap); - return ResponseEntity.ok().body(count); - } else { - int count = connectionManager.currentClientsCount(); - return ResponseEntity.ok().body(count); - } - - } catch (IOException e) { - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); - - } - + @GetMapping("/reloadsingle") + public ResponseEntity reloadSingle(@RequestParam String connectionId, + @RequestParam(value = "redirectAddress", required = false) String redirectAddress) { + Map responseMap = new HashMap<>(3); + connectionManager.loadSingle(connectionId, redirectAddress); + return ResponseEntity.ok().body("success"); } /** @@ -133,7 +116,7 @@ public class ServerLoaderController { * * @return state json. */ - @GetMapping("/all") + @GetMapping("/current") public ResponseEntity currentClients() { Map responseMap = new HashMap<>(3); Map stringConnectionMap = connectionManager.currentClients(); @@ -146,14 +129,14 @@ public class ServerLoaderController { * * @return state json. */ - @GetMapping("/clustercon") + @GetMapping("/clustermetric") public ResponseEntity clusterLoader() { Map responseMap = new HashMap<>(3); - return ResponseEntity.ok().body(getConInfo()); + return ResponseEntity.ok().body(getMetrics()); } - private List getConInfo() { + private List getMetrics() { ScheduledExecutorService executorService = ExecutorFactory .newScheduledExecutorService(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @@ -174,7 +157,6 @@ public class ServerLoaderController { if (MemberUtils.isSupportedLongCon(member)) { count++; ServerLoaderInfoRequest serverLoaderInfoRequest = new ServerLoaderInfoRequest(); - completionService.submit(new RpcTask(serverLoaderInfoRequest, member)); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java index edaf86aa6..bccde9238 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java @@ -220,11 +220,38 @@ public class ConnectionManager { this.maxClient = maxClient; } - public void loadClientsSmoth(int loadClient, String redirectAddress) { + public void loadCount(int loadClient, String redirectAddress) { this.loadClient = loadClient; this.redirectAddress = redirectAddress; } + /** + * send load request to spefic connetionId. + * + * @param connectionId + * @param redirectAddress + */ + public void loadSingle(String connectionId, String redirectAddress) { + Connection connection = getConnection(connectionId); + + if (connection != null) { + ConnectResetRequest connectResetRequest = new ConnectResetRequest(); + if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(":")) { + String[] split = redirectAddress.split(":"); + connectResetRequest.setServerIp(split[0]); + connectResetRequest.setServerPort(split[1]); + } + try { + connection.sendRequestNoAck(connectResetRequest); + } catch (ConnectionAlreadyClosedException e) { + unregister(connectionId); + } catch (Exception e) { + Loggers.RPC.error("error occurs when expel connetion :", connectionId, e); + } + } + + } + /** * get all client count. * diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java index 06b568e7f..693689b48 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java @@ -17,6 +17,7 @@ package com.alibaba.nacos.core.remote.rsocket; import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.request.ConnectResetRequest; import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.PlainBodyResponse; @@ -107,7 +108,18 @@ public class RsocketRpcServer extends RpcServer { palinrequest.getMetadata().getClientVersion(), palinrequest.getMetadata().getLabels()); Connection connection = new RsocketConnection(metaInfo, sendingSocket); - connectionManager.register(connection.getConnectionId(), connection); + if (connectionManager.isOverLimit()) { + //Not register to the connection manager if current server is over limit. + try { + connection.sendRequestNoAck(new ConnectResetRequest()); + connection.closeGrapcefully(); + } catch (Exception e) { + //Do nothing. + } + + } else { + connectionManager.register(connection.getConnectionId(), connection); + } sendingSocket.onClose().subscribe(new Subscriber() { String connectionId; @@ -169,7 +181,7 @@ public class RsocketRpcServer extends RpcServer { try { RsocketUtils.PlainRequest requestType = RsocketUtils.parsePlainRequestFromPayload(payload); Loggers.RPC_DIGEST.debug(String.format("[%s] request receive : %s", "rsocket", requestType.toString())); - + RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(requestType.getType()); if (requestHandler != null) { RequestMeta requestMeta = requestType.getMetadata(); @@ -177,7 +189,7 @@ public class RsocketRpcServer extends RpcServer { try { Response response = requestHandler.handle(requestType.getBody(), requestMeta); return Mono.just(RsocketUtils.convertResponseToPayload(response)); - + } catch (NacosException e) { Loggers.RPC_DIGEST.debug(String .format("[%s] fail to handle request, error message : %s ", "rsocket", e.getMessage(), @@ -186,7 +198,7 @@ public class RsocketRpcServer extends RpcServer { .convertResponseToPayload(new PlainBodyResponse("exception:" + e.getMessage()))); } } - + Loggers.RPC_DIGEST.debug(String .format("[%s] no handler for request type : %s :", "rsocket", requestType.getType())); return Mono.just(RsocketUtils.convertResponseToPayload(new PlainBodyResponse("No Handler"))); diff --git a/pom.xml b/pom.xml index cc7fdf8a8..29d23bcca 100644 --- a/pom.xml +++ b/pom.xml @@ -1120,71 +1120,71 @@ - - - - sona - https://oss.sonatype.org/content/repositories/snapshots/ - - - sona - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + central + http://mvnrepo.alibaba-inc.com/mvn/repository + + true + + + false + + + + snapshots + http://mvnrepo.alibaba-inc.com/mvn/repository + + false + + + true + + + + + + central + http://mvnrepo.alibaba-inc.com/mvn/repository + + true + + + false + + + + snapshots + http://mvnrepo.alibaba-inc.com/mvn/repository + + false + + + true + + + + + + releases + http://mvnrepo.alibaba-inc.com/mvn/releases + + + snapshots + http://mvnrepo.alibaba-inc.com/mvn/snapshots + +