Remove ServiceManager usage part2.

This commit is contained in:
KomachiSion 2022-08-26 16:27:34 +08:00
parent f28efe62e1
commit 9b68e45c97
10 changed files with 4 additions and 368 deletions

View File

@ -1,204 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.InternetAddressUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.Message;
import com.alibaba.nacos.naming.misc.ServerStatusSynchronizer;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.Synchronizer;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* The manager to globally refresh and operate server list.
*
* @author nkorange
* @since 1.0.0
* @deprecated 1.3.0 This object will be deleted sometime after version 1.3.0
*/
@Component("serverListManager")
public class ServerListManager extends MemberChangeListener {
private static final String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE;
private final SwitchDomain switchDomain;
private final ServerMemberManager memberManager;
private final Synchronizer synchronizer = new ServerStatusSynchronizer();
private volatile List<Member> servers;
public ServerListManager(final SwitchDomain switchDomain, final ServerMemberManager memberManager) {
this.switchDomain = switchDomain;
this.memberManager = memberManager;
NotifyCenter.registerSubscriber(this);
this.servers = new ArrayList<>(memberManager.allMembers());
}
@PostConstruct
public void init() {
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
}
/**
* Judge whether contain server in cluster.
*
* @param serverAddress server address
* @return true if contain, otherwise false
*/
public boolean contains(String serverAddress) {
for (Member server : getServers()) {
if (Objects.equals(serverAddress, server.getAddress())) {
return true;
}
}
return false;
}
public List<Member> getServers() {
return servers;
}
@Override
public void onEvent(MembersChangeEvent event) {
this.servers = new ArrayList<>(event.getMembers());
}
/**
* Compatible with older version logic, In version 1.2.1 and before
*
* @param configInfo site:ip:lastReportTime:weight
*/
public synchronized void onReceiveServerStatus(String configInfo) {
Loggers.SRV_LOG.info("receive config info: {}", configInfo);
String[] configs = configInfo.split("\r\n");
if (configs.length == 0) {
return;
}
for (String config : configs) {
// site:ip:lastReportTime:weight
String[] params = config.split("#");
if (params.length <= 3) {
Loggers.SRV_LOG.warn("received malformed distro map data: {}", config);
continue;
}
String[] info = InternetAddressUtil.splitIPPortStr(params[1]);
Member server = Optional.ofNullable(memberManager.find(params[1]))
.orElse(Member.builder().ip(info[0]).state(NodeState.UP).port(Integer.parseInt(info[1])).build());
// This metadata information exists from 1.3.0 onwards "version"
if (server.getExtendVal(MemberMetaDataConstants.VERSION) == null) {
// copy to trigger member change event
server = server.copy();
// received heartbeat from server of version before 1.3.0
if (!server.getState().equals(NodeState.UP)) {
Loggers.SRV_LOG.info("member {} state changed to UP", server);
}
server.setState(NodeState.UP);
}
server.setExtendVal(MemberMetaDataConstants.SITE_KEY, params[0]);
server.setExtendVal(MemberMetaDataConstants.WEIGHT, params.length == 4 ? Integer.parseInt(params[3]) : 1);
memberManager.update(server);
if (!contains(server.getAddress())) {
throw new IllegalArgumentException("server: " + server.getAddress() + " is not in serverlist");
}
}
}
private class ServerStatusReporter implements Runnable {
@Override
public void run() {
try {
if (EnvUtil.getPort() <= 0) {
return;
}
int weight = EnvUtil.getAvailableProcessors(0.5);
if (weight <= 0) {
weight = 1;
}
long curTime = System.currentTimeMillis();
String status =
LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight + "\r\n";
List<Member> allServers = getServers();
if (!contains(EnvUtil.getLocalAddress())) {
Loggers.SRV_LOG
.error("local ip is not in serverlist, ip: {}, serverlist: {}", EnvUtil.getLocalAddress(),
allServers);
return;
}
if (allServers.size() > 0 && !EnvUtil.getLocalAddress().contains(InternetAddressUtil.localHostIP())) {
for (Member server : allServers) {
if (Objects.equals(server.getAddress(), EnvUtil.getLocalAddress())) {
continue;
}
// This metadata information exists from 1.3.0 onwards "version"
if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) {
Loggers.SRV_LOG
.debug("[SERVER-STATUS] target {} has extend val {} = {}, use new api report status",
server.getAddress(), MemberMetaDataConstants.VERSION,
server.getExtendVal(MemberMetaDataConstants.VERSION));
continue;
}
Message msg = new Message();
msg.setData(status);
synchronizer.send(server.getAddress(), msg);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);
} finally {
GlobalExecutor
.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
}
}
}
}

View File

@ -233,7 +233,7 @@ public abstract class BasePersistentServiceProcessor extends RequestProcessor4CP
}
/**
* This notify should only notify once during startup. See {@link com.alibaba.nacos.naming.core.ServiceManager#init()}
* This notify should only notify once during startup.
*/
private void notifierAllServiceMeta(RecordListener listener) throws NacosException {
for (byte[] each : kvStorage.allKeys()) {

View File

@ -29,8 +29,6 @@ public final class Constants {
public static final String INSTANCE_METADATA = "naming_instance_metadata";
public static final String OLD_NAMING_RAFT_GROUP = "naming";
public static final String NAMING_PERSISTENT_SERVICE_GROUP = "naming_persistent_service";
public static final String NAMING_PERSISTENT_SERVICE_GROUP_V2 = "naming_persistent_service_v2";
@ -97,11 +95,6 @@ public final class Constants {
*/
public static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L);
/**
* The Milliseconds for push timeout.
*/
public static final long DEFAULT_PUSH_TIMEOUT_MILLS = TimeUnit.SECONDS.toNanos(3L);
/**
* The custom instance id key.
*/

View File

@ -23,7 +23,6 @@ import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.ServerStatusManager;
import com.alibaba.nacos.naming.constants.ClientConstants;
import com.alibaba.nacos.naming.core.DistroMapper;
@ -62,8 +61,6 @@ public class OperatorController {
private final SwitchManager switchManager;
private final ServerListManager serverListManager;
private final ServerMemberManager memberManager;
private final ServerStatusManager serverStatusManager;
@ -74,11 +71,10 @@ public class OperatorController {
private final ClientManager clientManager;
public OperatorController(SwitchManager switchManager, ServerListManager serverListManager,
ServerMemberManager memberManager, ServerStatusManager serverStatusManager, SwitchDomain switchDomain,
DistroMapper distroMapper, ClientManager clientManager) {
public OperatorController(SwitchManager switchManager, ServerMemberManager memberManager,
ServerStatusManager serverStatusManager, SwitchDomain switchDomain, DistroMapper distroMapper,
ClientManager clientManager) {
this.switchManager = switchManager;
this.serverListManager = serverListManager;
this.memberManager = memberManager;
this.serverStatusManager = serverStatusManager;
this.switchDomain = switchDomain;
@ -237,20 +233,6 @@ public class OperatorController {
return result;
}
/**
* This interface will be removed in a future release.
*
* @param serverStatus server status
* @return "ok"
* @deprecated 1.3.0 This function will be deleted sometime after version 1.3.0
*/
@Deprecated
@RequestMapping("/server/status")
public String serverStatus(@RequestParam String serverStatus) {
serverListManager.onReceiveServerStatus(serverStatus);
return "ok";
}
@PutMapping("/log")
public String setLogLevel(@RequestParam String logName, @RequestParam String logLevel) {
Loggers.setLogLevel(logName, logLevel);

View File

@ -55,10 +55,6 @@ public class GlobalExecutor {
.newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
EnvUtil.getAvailableProcessors(2), new NameThreadFactory("com.alibaba.nacos.naming.timer"));
private static final ScheduledExecutorService SERVER_STATUS_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.status.worker"));
/**
* Service synchronization executor.
*
@ -137,10 +133,6 @@ public class GlobalExecutor {
return NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}
public static void registerServerStatusReporter(Runnable runnable, long delay) {
SERVER_STATUS_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
public static void registerServerStatusUpdater(Runnable runnable) {
NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, SERVER_STATUS_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
}

View File

@ -1,86 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.misc;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.InternetAddressUtil;
import com.alibaba.nacos.naming.constants.FieldsConstants;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import static com.alibaba.nacos.common.constant.RequestUrlConstants.HTTP_PREFIX;
/**
* Report local server status to other server.
*
* @author nacos
* @deprecated 1.3.0 This object will be deleted sometime after version 1.3.0
*/
public class ServerStatusSynchronizer implements Synchronizer {
@Override
public void send(final String serverIp, Message msg) {
if (StringUtils.isEmpty(serverIp)) {
return;
}
final Map<String, String> params = new HashMap<String, String>(2);
params.put(FieldsConstants.SERVICE_STATUS, msg.getData());
String url = HTTP_PREFIX + serverIp + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator/server/status";
if (InternetAddressUtil.containsPort(serverIp)) {
url = HTTP_PREFIX + serverIp + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ "/operator/server/status";
}
try {
HttpClient.asyncHttpGet(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}",
serverIp);
}
}
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIp, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIp, e);
}
}
@Override
public Message get(String server, String key) {
return null;
}
}

View File

@ -17,7 +17,6 @@
package com.alibaba.nacos.naming;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.sys.env.EnvUtil;
@ -53,9 +52,6 @@ public abstract class BaseTest {
+ "\"port\":9870,\"weight\":2.0,\"healthy\":true,\"enabled\":true,\"ephemeral\":true"
+ ",\"clusterName\":\"clusterName\",\"serviceName\":\"serviceName\",\"metadata\":{}}]";
@Mock
public ServiceManager serviceManager;
@Rule
public ExpectedException expectedException = ExpectedException.none();

View File

@ -37,10 +37,6 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import static org.hamcrest.CoreMatchers.isA;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.when;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MockServletContext.class)
@ -58,18 +54,12 @@ public class ClusterControllerTest extends BaseTest {
mockInjectSwitchDomain();
mockInjectDistroMapper();
mockmvc = MockMvcBuilders.standaloneSetup(clusterController).build();
try {
doCallRealMethod().when(serviceManager).checkServiceIsNull(eq(null), anyString(), anyString());
} catch (NacosException e) {
e.printStackTrace();
}
}
@Test
public void testUpdate() throws Exception {
Service service = new Service(TEST_SERVICE_NAME);
service.setNamespaceId("test-namespace");
when(serviceManager.getService(Constants.DEFAULT_NAMESPACE_ID, TEST_SERVICE_NAME)).thenReturn(service);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders
.put(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/cluster").param("clusterName", TEST_CLUSTER_NAME)
@ -101,7 +91,6 @@ public class ClusterControllerTest extends BaseTest {
Service service = new Service(TEST_SERVICE_NAME);
service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID);
when(serviceManager.getService(Constants.DEFAULT_NAMESPACE_ID, TEST_SERVICE_NAME)).thenReturn(service);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders
.put(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/cluster").param("clusterName", TEST_CLUSTER_NAME)

View File

@ -16,7 +16,6 @@
package com.alibaba.nacos.naming.controllers;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.constant.HttpHeaderConsts;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.BaseTest;
@ -24,13 +23,11 @@ import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.InstanceOperationInfo;
import com.fasterxml.jackson.databind.JsonNode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InjectMocks;
import org.springframework.mock.web.MockHttpServletResponse;
import org.springframework.mock.web.MockServletContext;
@ -47,9 +44,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static org.mockito.Mockito.when;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MockServletContext.class)
@ -84,8 +78,6 @@ public class InstanceControllerTest extends BaseTest {
ipList.add(instance);
service.updateIPs(ipList, false);
when(serviceManager.getService(Constants.DEFAULT_NAMESPACE_ID, TEST_SERVICE_NAME)).thenReturn(service);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders
.post(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance").param("serviceName", TEST_SERVICE_NAME)
.param("ip", "1.1.1.1").param("port", "9999");
@ -124,8 +116,6 @@ public class InstanceControllerTest extends BaseTest {
ipList.add(instance);
service.updateIPs(ipList, false);
when(serviceManager.getService(Constants.DEFAULT_NAMESPACE_ID, TEST_SERVICE_NAME)).thenReturn(service);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders
.get(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/list").param("serviceName", TEST_SERVICE_NAME)
.header(HttpHeaderConsts.USER_AGENT_HEADER, "Nacos-Server:v1");
@ -148,8 +138,6 @@ public class InstanceControllerTest extends BaseTest {
@Test
public void getNullServiceInstances() throws Exception {
when(serviceManager.getService(Constants.DEFAULT_NAMESPACE_ID, TEST_SERVICE_NAME)).thenReturn(null);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders
.get(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/list").param("serviceName", TEST_SERVICE_NAME)
.header(HttpHeaderConsts.USER_AGENT_HEADER, "Nacos-Server:v1");
@ -177,10 +165,6 @@ public class InstanceControllerTest extends BaseTest {
instanceList.add(instance);
instanceList.add(instance2);
when(serviceManager
.batchOperate(ArgumentMatchers.anyString(), ArgumentMatchers.any(InstanceOperationInfo.class),
ArgumentMatchers.any(Function.class))).thenReturn(instanceList);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders
.put(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/metadata/batch").param("namespace", "public")
.param("serviceName", TEST_SERVICE_NAME).param("instances",
@ -222,10 +206,6 @@ public class InstanceControllerTest extends BaseTest {
instanceList.add(instance);
instanceList.add(instance2);
when(serviceManager
.batchOperate(ArgumentMatchers.anyString(), ArgumentMatchers.any(InstanceOperationInfo.class),
ArgumentMatchers.any(Function.class))).thenReturn(instanceList);
MockHttpServletRequestBuilder builder = MockMvcRequestBuilders
.delete(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/metadata/batch").param("namespace", "public")
.param("serviceName", TEST_SERVICE_NAME).param("instances",

View File

@ -20,7 +20,6 @@ package com.alibaba.nacos.naming.controllers;
import com.alibaba.nacos.naming.cluster.ServerStatus;
import com.alibaba.nacos.naming.cluster.ServerStatusManager;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
@ -59,9 +58,6 @@ public class OperatorControllerTest {
@Mock
private ServerStatusManager serverStatusManager;
@Mock
private ServiceManager serviceManager;
@Mock
private ClientManager clientManager;
@ -113,8 +109,6 @@ public class OperatorControllerTest {
@Test
public void testMetrics() {
Mockito.when(serverStatusManager.getServerStatus()).thenReturn(ServerStatus.UP);
Mockito.when(serviceManager.getResponsibleServiceCount()).thenReturn(1);
Mockito.when(serviceManager.getResponsibleInstanceCount()).thenReturn(1);
Collection<String> clients = new HashSet<>();
clients.add("1628132208793_127.0.0.1_8080");
clients.add("127.0.0.1:8081#true");