From 3bde28294d596eef2b43d4af7a7714bcddabed85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E7=BF=8A=20SionYang?= <263976490@qq.com> Date: Sat, 18 Jul 2020 16:07:21 +0800 Subject: [PATCH] [ISSUE #1097] Naming support un/subscribe service by grpc. (#3373) * For #1097, server support subscribe service. * For #1097, client support subscribe service. * For #1097, server and client support unsubscribe service. --- .../naming/remote/NamingRemoteConstants.java | 5 +- .../remote/request/InstanceRequest.java | 6 - .../request/SubscribeServiceRequest.java | 61 +++++++++ .../remote/response/InstanceResponse.java | 12 +- .../response/NotifySubscriberResponse.java | 70 ++++++++++ ...esponse.java => QueryServiceResponse.java} | 34 +++-- .../response/SubscribeServiceResponse.java | 54 ++++++++ .../nacos/api/remote/ResponseRegistry.java | 8 +- .../client/naming/NacosNamingService.java | 72 +++++++--- .../nacos/client/naming/core/HostReactor.java | 32 ++++- .../net/{ => gprc}/NamingGrpcClientProxy.java | 62 ++++++++- .../net/gprc/NamingPushResponseHandler.java | 42 ++++++ .../remote/ServerPushResponseHandler.java | 9 +- .../nacos/common/utils/ConcurrentHashSet.java | 8 +- .../nacos/core/remote/AsyncListenContext.java | 13 ++ .../remote/DataChangeListenerNotifier.java | 33 +++-- .../naming/core/ServiceInfoGenerator.java | 123 ++++++++++++++++++ .../nacos/naming/core/SubscribeManager.java | 10 +- .../nacos/naming/push/PushService.java | 11 +- .../nacos/naming/push/RemotePushService.java | 100 ++++++++++++++ .../SubscribeServiceRequestHandler.java | 58 --------- .../handler/InstanceRequestHandler.java | 3 +- .../handler/ServiceQueryRequestHandler.java | 72 ++-------- .../SubscribeServiceRequestHandler.java | 94 +++++++++++++ 24 files changed, 790 insertions(+), 202 deletions(-) create mode 100644 api/src/main/java/com/alibaba/nacos/api/naming/remote/request/SubscribeServiceRequest.java create mode 100644 api/src/main/java/com/alibaba/nacos/api/naming/remote/response/NotifySubscriberResponse.java rename api/src/main/java/com/alibaba/nacos/api/naming/remote/response/{ServiceQueryResponse.java => QueryServiceResponse.java} (63%) create mode 100644 api/src/main/java/com/alibaba/nacos/api/naming/remote/response/SubscribeServiceResponse.java rename client/src/main/java/com/alibaba/nacos/client/naming/net/{ => gprc}/NamingGrpcClientProxy.java (63%) create mode 100644 client/src/main/java/com/alibaba/nacos/client/naming/net/gprc/NamingPushResponseHandler.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/core/ServiceInfoGenerator.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/push/RemotePushService.java delete mode 100644 naming/src/main/java/com/alibaba/nacos/naming/remote/SubscribeServiceRequestHandler.java create mode 100644 naming/src/main/java/com/alibaba/nacos/naming/remote/handler/SubscribeServiceRequestHandler.java diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/NamingRemoteConstants.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/NamingRemoteConstants.java index 30d10798d..a27514fd8 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/NamingRemoteConstants.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/NamingRemoteConstants.java @@ -28,8 +28,9 @@ public class NamingRemoteConstants { public static final String DE_REGISTER_INSTANCE = "deregisterInstance"; - public static final String SUBSCRIBE_SERVICE = "subscribeService"; - public static final String QUERY_SERVICE = "queryService"; + public static final String SUBSCRIBE_SERVICE = "subscribeService"; + + public static final String NOTIFY_SUBSCRIBER = "notifySubscriber"; } diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/InstanceRequest.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/InstanceRequest.java index 493511818..ee075e7d8 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/InstanceRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/InstanceRequest.java @@ -32,12 +32,6 @@ public class InstanceRequest extends NamingCommonRequest { public InstanceRequest() { } - public InstanceRequest(String namespace, String serviceName, String type, Instance instance) { - super(namespace, serviceName, null); - this.type = type; - this.instance = instance; - } - public InstanceRequest(String namespace, String serviceName, String groupName, String type, Instance instance) { super(namespace, serviceName, groupName); this.type = type; diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/SubscribeServiceRequest.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/SubscribeServiceRequest.java new file mode 100644 index 000000000..46b9738f1 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/request/SubscribeServiceRequest.java @@ -0,0 +1,61 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.naming.remote.request; + +import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; + +/** + * Nacos naming subscribe service request. + * + * @author xiweng.yy + */ +public class SubscribeServiceRequest extends NamingCommonRequest { + + private boolean subscribe; + + private String clusters; + + public SubscribeServiceRequest() { + } + + public SubscribeServiceRequest(String namespace, String serviceName, String clusters, boolean subscribe) { + super(namespace, serviceName, null); + this.clusters = clusters; + this.subscribe = subscribe; + } + + @Override + public String getType() { + return NamingRemoteConstants.SUBSCRIBE_SERVICE; + } + + public String getClusters() { + return clusters; + } + + public void setClusters(String clusters) { + this.clusters = clusters; + } + + public boolean isSubscribe() { + return subscribe; + } + + public void setSubscribe(boolean subscribe) { + this.subscribe = subscribe; + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/InstanceResponse.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/InstanceResponse.java index 5f834a46e..88c84f78d 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/InstanceResponse.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/InstanceResponse.java @@ -30,13 +30,17 @@ public class InstanceResponse extends Response { public InstanceResponse() { } + public InstanceResponse(String type) { + this.type = type; + } + + public void setType(String type) { + this.type = type; + } + @Override public String getType() { return this.type; } - public InstanceResponse(String type) { - this.type = type; - } - } diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/NotifySubscriberResponse.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/NotifySubscriberResponse.java new file mode 100644 index 000000000..fdd2266cf --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/NotifySubscriberResponse.java @@ -0,0 +1,70 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.naming.remote.response; + +import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.api.remote.response.ResponseCode; + +/** + * Notify subscriber response. + * + * @author xiweng.yy + */ +public class NotifySubscriberResponse extends Response { + + private ServiceInfo serviceInfo; + + public NotifySubscriberResponse() { + } + + private NotifySubscriberResponse(ServiceInfo serviceInfo, String message) { + this.serviceInfo = serviceInfo; + setMessage(message); + } + + public static NotifySubscriberResponse buildSuccessResponse(ServiceInfo serviceInfo) { + return new NotifySubscriberResponse(serviceInfo, "success"); + } + + /** + * Build fail response. + * + * @param message error message + * @return faile response + */ + public static NotifySubscriberResponse buildFailResponse(String message) { + NotifySubscriberResponse result = new NotifySubscriberResponse(); + result.setErrorCode(ResponseCode.FAIL.getCode()); + result.setMessage(message); + return result; + } + + @Override + public String getType() { + return NamingRemoteConstants.NOTIFY_SUBSCRIBER; + } + + public ServiceInfo getServiceInfo() { + return serviceInfo; + } + + public void setServiceInfo(ServiceInfo serviceInfo) { + this.serviceInfo = serviceInfo; + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/ServiceQueryResponse.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/QueryServiceResponse.java similarity index 63% rename from api/src/main/java/com/alibaba/nacos/api/naming/remote/response/ServiceQueryResponse.java rename to api/src/main/java/com/alibaba/nacos/api/naming/remote/response/QueryServiceResponse.java index a485006a7..65023d686 100644 --- a/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/ServiceQueryResponse.java +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/QueryServiceResponse.java @@ -26,11 +26,11 @@ import com.alibaba.nacos.api.remote.response.ResponseCode; * * @author xiweng.yy */ -public class ServiceQueryResponse extends Response { +public class QueryServiceResponse extends Response { private ServiceInfo serviceInfo; - public ServiceQueryResponse() { + public QueryServiceResponse() { } @Override @@ -38,21 +38,31 @@ public class ServiceQueryResponse extends Response { return NamingRemoteConstants.QUERY_SERVICE; } - public ServiceQueryResponse(ServiceInfo serviceInfo) { + private QueryServiceResponse(ServiceInfo serviceInfo) { this.serviceInfo = serviceInfo; } - public static ServiceQueryResponse buildSuccessResponse(ServiceInfo serviceInfo) { - ServiceQueryResponse serviceQueryResponse = new ServiceQueryResponse(); - serviceQueryResponse.setServiceInfo(serviceInfo); - return serviceQueryResponse; + /** + * Build Success response. + * + * @param serviceInfo service info + * @return service query response + */ + public static QueryServiceResponse buildSuccessResponse(ServiceInfo serviceInfo) { + return new QueryServiceResponse(serviceInfo); } - public static ServiceQueryResponse buildFailResponse(String message) { - ServiceQueryResponse serviceQueryResponse = new ServiceQueryResponse(); - serviceQueryResponse.setResultCode(ResponseCode.FAIL.getCode()); - serviceQueryResponse.setMessage(message); - return serviceQueryResponse; + /** + * Build fail response. + * + * @param message message + * @return service query response + */ + public static QueryServiceResponse buildFailResponse(String message) { + QueryServiceResponse queryServiceResponse = new QueryServiceResponse(); + queryServiceResponse.setResultCode(ResponseCode.FAIL.getCode()); + queryServiceResponse.setMessage(message); + return queryServiceResponse; } public ServiceInfo getServiceInfo() { diff --git a/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/SubscribeServiceResponse.java b/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/SubscribeServiceResponse.java new file mode 100644 index 000000000..123ec4379 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/naming/remote/response/SubscribeServiceResponse.java @@ -0,0 +1,54 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.naming.remote.response; + +import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; +import com.alibaba.nacos.api.remote.response.Response; + +/** + * Nacos naming subscribe service response. + * + * @author xiweng.yy + */ +public class SubscribeServiceResponse extends Response { + + private ServiceInfo serviceInfo; + + public SubscribeServiceResponse() { + } + + public SubscribeServiceResponse(int resultCode, String message, ServiceInfo serviceInfo) { + super(); + setResultCode(resultCode); + setMessage(message); + this.serviceInfo = serviceInfo; + } + + @Override + public String getType() { + return NamingRemoteConstants.SUBSCRIBE_SERVICE; + } + + public ServiceInfo getServiceInfo() { + return serviceInfo; + } + + public void setServiceInfo(ServiceInfo serviceInfo) { + this.serviceInfo = serviceInfo; + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/ResponseRegistry.java b/api/src/main/java/com/alibaba/nacos/api/remote/ResponseRegistry.java index 22ab7fd4f..b2520a430 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/ResponseRegistry.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/ResponseRegistry.java @@ -24,7 +24,9 @@ import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse; import com.alibaba.nacos.api.config.remote.response.ConfigResponseTypeConstants; import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; import com.alibaba.nacos.api.naming.remote.response.InstanceResponse; -import com.alibaba.nacos.api.naming.remote.response.ServiceQueryResponse; +import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse; +import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse; +import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse; import com.alibaba.nacos.api.remote.response.ConnectResetResponse; import com.alibaba.nacos.api.remote.response.HeartBeatResponse; import com.alibaba.nacos.api.remote.response.ResponseTypeConstants; @@ -60,7 +62,9 @@ public class ResponseRegistry { //naming response registry REGISTRY_RESPONSES.put(NamingRemoteConstants.REGISTER_INSTANCE, InstanceResponse.class); REGISTRY_RESPONSES.put(NamingRemoteConstants.DE_REGISTER_INSTANCE, InstanceResponse.class); - REGISTRY_RESPONSES.put(NamingRemoteConstants.QUERY_SERVICE, ServiceQueryResponse.class); + REGISTRY_RESPONSES.put(NamingRemoteConstants.QUERY_SERVICE, QueryServiceResponse.class); + REGISTRY_RESPONSES.put(NamingRemoteConstants.SUBSCRIBE_SERVICE, SubscribeServiceResponse.class); + REGISTRY_RESPONSES.put(NamingRemoteConstants.NOTIFY_SUBSCRIBER, NotifySubscriberResponse.class); } public static Class getClassByType(String type) { diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java index 19ec45d4d..4dc43c0bd 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java @@ -26,15 +26,16 @@ import com.alibaba.nacos.api.naming.pojo.ListView; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.api.selector.AbstractSelector; -import com.alibaba.nacos.client.naming.beat.BeatInfo; import com.alibaba.nacos.client.naming.beat.BeatReactor; import com.alibaba.nacos.client.naming.core.Balancer; import com.alibaba.nacos.client.naming.core.EventDispatcher; import com.alibaba.nacos.client.naming.core.HostReactor; import com.alibaba.nacos.client.naming.net.NamingProxy; +import com.alibaba.nacos.client.naming.net.gprc.NamingGrpcClientProxy; import com.alibaba.nacos.client.naming.utils.CollectionUtils; import com.alibaba.nacos.client.naming.utils.InitUtils; import com.alibaba.nacos.client.naming.utils.UtilAndComs; +import com.alibaba.nacos.client.remote.ServerListFactory; import com.alibaba.nacos.client.utils.ValidatorUtils; import com.alibaba.nacos.common.utils.ConvertUtils; import com.alibaba.nacos.common.utils.StringUtils; @@ -43,6 +44,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; /** * Nacos Naming Service. @@ -73,6 +75,8 @@ public class NacosNamingService implements NamingService { private NamingProxy serverProxy; + private NamingGrpcClientProxy grpcClientProxy; + public NacosNamingService(String serverList) throws NacosException { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList); @@ -97,6 +101,24 @@ public class NacosNamingService implements NamingService { this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties)); this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties)); + this.grpcClientProxy = new NamingGrpcClientProxy(namespace, hostReactor); + grpcClientProxy.start(new ServerListFactory() { + + private final AtomicInteger index = new AtomicInteger(); + + private final String[] serverLists = serverList.split(","); + + @Override + public String genNextServer() { + int nextIndex = index.getAndIncrement() % serverLists.length; + return serverLists[nextIndex]; + } + + @Override + public String getCurrentServer() { + return serverLists[index.get() % serverLists.length]; + } + }); } private int initClientBeatThreadCount(Properties properties) { @@ -192,12 +214,13 @@ public class NacosNamingService implements NamingService { @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { - String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); - if (instance.isEphemeral()) { - BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); - beatReactor.addBeatInfo(groupedServiceName, beatInfo); - } - serverProxy.registerService(groupedServiceName, groupName, instance); + // String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); + // if (instance.isEphemeral()) { + // BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); + // beatReactor.addBeatInfo(groupedServiceName, beatInfo); + // } + // serverProxy.registerService(groupedServiceName, groupName, instance); + grpcClientProxy.registerService(serviceName, groupName, instance); } @Override @@ -233,11 +256,12 @@ public class NacosNamingService implements NamingService { @Override public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException { - if (instance.isEphemeral()) { - beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), - instance.getPort()); - } - serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance); + // if (instance.isEphemeral()) { + // beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), + // instance.getPort()); + // } + // serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance); + grpcClientProxy.deregisterService(serviceName, groupName, instance); } @Override @@ -287,9 +311,11 @@ public class NacosNamingService implements NamingService { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { - serviceInfo = hostReactor - .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), - StringUtils.join(clusters, ",")); + // serviceInfo = hostReactor + // .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), + // StringUtils.join(clusters, ",")); + serviceInfo = grpcClientProxy.queryInstancesOfService(NamingUtils.getGroupedName(serviceName, groupName), + StringUtils.join(clusters, ","), 0, false); } List list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { @@ -442,8 +468,11 @@ public class NacosNamingService implements NamingService { @Override public void subscribe(String serviceName, String groupName, List clusters, EventListener listener) throws NacosException { - eventDispatcher.addListener(hostReactor - .getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")), + // eventDispatcher.addListener(hostReactor + // .getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")), + // StringUtils.join(clusters, ","), listener); + eventDispatcher.addListener(grpcClientProxy + .subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener); } @@ -465,9 +494,12 @@ public class NacosNamingService implements NamingService { @Override public void unsubscribe(String serviceName, String groupName, List clusters, EventListener listener) throws NacosException { - eventDispatcher - .removeListener(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), - listener); + String fullServiceName = NamingUtils.getGroupedName(serviceName, groupName); + String clustersString = StringUtils.join(clusters, ","); + eventDispatcher.removeListener(fullServiceName, clustersString, listener); + if (!eventDispatcher.isSubscribed(fullServiceName, clustersString)) { + grpcClientProxy.unsubscribe(fullServiceName, clustersString); + } } @Override diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java b/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java index 6b28b8bf3..759e0e0cd 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java @@ -125,6 +125,17 @@ public class HostReactor implements Closeable { */ public ServiceInfo processServiceJson(String json) { ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); + serviceInfo.setJsonFromServer(json); + return processServiceJson(serviceInfo); + } + + /** + * Process service info. + * + * @param serviceInfo new service info + * @return service info + */ + public ServiceInfo processServiceJson(ServiceInfo serviceInfo) { ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (serviceInfo.getHosts() == null || !serviceInfo.validate()) { //empty or error push, just ignore @@ -204,8 +215,6 @@ public class HostReactor implements Closeable { + JacksonUtils.toJson(modHosts)); } - serviceInfo.setJsonFromServer(json); - if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { eventDispatcher.serviceChanged(serviceInfo); DiskCache.write(serviceInfo, cacheDir); @@ -217,10 +226,13 @@ public class HostReactor implements Closeable { + JacksonUtils.toJson(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); eventDispatcher.serviceChanged(serviceInfo); - serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } + if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) { + serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo)); + } + MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { @@ -271,10 +283,10 @@ public class HostReactor implements Closeable { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); - - updatingMap.put(serviceName, new Object()); + + updatingService(serviceName); updateServiceNow(serviceName, clusters); - updatingMap.remove(serviceName); + finishUpdating(serviceName); } else if (updatingMap.containsKey(serviceName)) { @@ -367,6 +379,14 @@ public class HostReactor implements Closeable { NAMING_LOGGER.info("{} do shutdown stop", className); } + public void updatingService(String serviceName) { + updatingMap.put(serviceName, new Object()); + } + + public void finishUpdating(String serviceName) { + updatingMap.remove(serviceName); + } + public class UpdateTask implements Runnable { long lastRefTime = Long.MAX_VALUE; diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingGrpcClientProxy.java b/client/src/main/java/com/alibaba/nacos/client/naming/net/gprc/NamingGrpcClientProxy.java similarity index 63% rename from client/src/main/java/com/alibaba/nacos/client/naming/net/NamingGrpcClientProxy.java rename to client/src/main/java/com/alibaba/nacos/client/naming/net/gprc/NamingGrpcClientProxy.java index 7e333cb2b..d8fe9c115 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingGrpcClientProxy.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/net/gprc/NamingGrpcClientProxy.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.alibaba.nacos.client.naming.net; +package com.alibaba.nacos.client.naming.net.gprc; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.pojo.Instance; @@ -22,9 +22,12 @@ import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; import com.alibaba.nacos.api.naming.remote.request.InstanceRequest; import com.alibaba.nacos.api.naming.remote.request.ServiceQueryRequest; -import com.alibaba.nacos.api.naming.remote.response.ServiceQueryResponse; +import com.alibaba.nacos.api.naming.remote.request.SubscribeServiceRequest; +import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse; +import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.client.naming.core.HostReactor; import com.alibaba.nacos.client.remote.RpcClient; import com.alibaba.nacos.client.remote.RpcClientFactory; import com.alibaba.nacos.client.remote.ServerListFactory; @@ -40,16 +43,26 @@ public class NamingGrpcClientProxy { private final String namespaceId; + private HostReactor hostReactor; + private RpcClient rpcClient; - public NamingGrpcClientProxy(String namespaceId) { + public NamingGrpcClientProxy(String namespaceId, HostReactor hostReactor) { this.namespaceId = namespaceId; - rpcClient = RpcClientFactory.getClient("naming"); + this.hostReactor = hostReactor; + this.rpcClient = RpcClientFactory.getClient("naming"); } + /** + * Start Grpc client proxy. + * + * @param serverListFactory server list factory + * @throws NacosException nacos exception + */ public void start(ServerListFactory serverListFactory) throws NacosException { rpcClient.init(serverListFactory); rpcClient.start(); + rpcClient.registerServerPushResponseHandler(new NamingPushResponseHandler(hostReactor)); } /** @@ -73,14 +86,15 @@ public class NamingGrpcClientProxy { * deregister instance from a service. * * @param serviceName name of service + * @param groupName group name * @param instance instance * @throws NacosException nacos exception */ - public void deregisterService(String serviceName, Instance instance) throws NacosException { + public void deregisterService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER .info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}", namespaceId, serviceName, instance); - InstanceRequest request = new InstanceRequest(namespaceId, serviceName, + InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.DE_REGISTER_INSTANCE, instance); requestToServer(request, Response.class); } @@ -101,10 +115,44 @@ public class NamingGrpcClientProxy { request.setCluster(clusters); request.setHealthyOnly(healthyOnly); request.setUdpPort(udpPort); - ServiceQueryResponse response = requestToServer(request, ServiceQueryResponse.class); + QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class); return response.getServiceInfo(); } + /** + * Subscribe service. + * + * @param serviceName full service name with group + * @param clusters clusters, current only support subscribe all clusters, maybe deprecated + * @return current service info of subscribe service + * @throws NacosException nacos exception + */ + public ServiceInfo subscribe(String serviceName, String clusters) throws NacosException { + ServiceInfo serviceInfo = new ServiceInfo(serviceName, clusters); + if (hostReactor.getServiceInfoMap().containsKey(serviceInfo.getKey())) { + return hostReactor.getServiceInfoMap().get(serviceInfo.getKey()); + } + hostReactor.updatingService(serviceName); + SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceName, clusters, true); + SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class); + ServiceInfo result = response.getServiceInfo(); + hostReactor.getServiceInfoMap().put(result.getKey(), result); + hostReactor.finishUpdating(serviceName); + return result; + } + + /** + * Unsubscribe service. + * + * @param serviceName full service name with group + * @param clusters clusters, current only support subscribe all clusters, maybe deprecated + * @throws NacosException nacos exception + */ + public void unsubscribe(String serviceName, String clusters) throws NacosException { + SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceName, clusters, false); + requestToServer(request, SubscribeServiceResponse.class); + } + private T requestToServer(Request request, Class responseClass) throws NacosException { try { Response response = rpcClient.request(request); diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/net/gprc/NamingPushResponseHandler.java b/client/src/main/java/com/alibaba/nacos/client/naming/net/gprc/NamingPushResponseHandler.java new file mode 100644 index 000000000..878b56732 --- /dev/null +++ b/client/src/main/java/com/alibaba/nacos/client/naming/net/gprc/NamingPushResponseHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.net.gprc; + +import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.client.naming.core.HostReactor; +import com.alibaba.nacos.client.remote.ServerPushResponseHandler; + +/** + * Naming push response handler. + * + * @author xiweng.yy + */ +public class NamingPushResponseHandler implements ServerPushResponseHandler { + + private final HostReactor hostReactor; + + public NamingPushResponseHandler(HostReactor hostReactor) { + this.hostReactor = hostReactor; + } + + @Override + public void responseReply(Response response) { + NotifySubscriberResponse notifyResponse = (NotifySubscriberResponse) response; + hostReactor.processServiceJson(notifyResponse.getServiceInfo()); + } +} diff --git a/client/src/main/java/com/alibaba/nacos/client/remote/ServerPushResponseHandler.java b/client/src/main/java/com/alibaba/nacos/client/remote/ServerPushResponseHandler.java index 2ffe08177..ba8ba7eeb 100644 --- a/client/src/main/java/com/alibaba/nacos/client/remote/ServerPushResponseHandler.java +++ b/client/src/main/java/com/alibaba/nacos/client/remote/ServerPushResponseHandler.java @@ -20,15 +20,16 @@ import com.alibaba.nacos.api.remote.response.Response; /** * ServerPushResponseHandler. + * * @author liuzunfei * @version $Id: ServerPushResponseHandler.java, v 0.1 2020年07月14日 11:41 AM liuzunfei Exp $ */ -public abstract interface ServerPushResponseHandler { +public interface ServerPushResponseHandler { /** - * handle logic when response ceceive. - * @param response. + * Handle logic when response received. + * @param response response */ - public abstract void responseReply(Response response); + void responseReply(Response response); } diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ConcurrentHashSet.java b/common/src/main/java/com/alibaba/nacos/common/utils/ConcurrentHashSet.java index f6617aa2e..4465795ef 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ConcurrentHashSet.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ConcurrentHashSet.java @@ -17,6 +17,7 @@ package com.alibaba.nacos.common.utils; import java.util.AbstractSet; +import java.util.HashSet; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -44,9 +45,14 @@ public class ConcurrentHashSet extends AbstractSet { return map.containsKey(o); } + /** + * The original implement

map.keySet().iterator()

need jdk8, so it can work. + * + * @return iterator + */ @Override public Iterator iterator() { - return map.keySet().iterator(); + return new HashSet(map.keySet()).iterator(); } @Override diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/AsyncListenContext.java b/core/src/main/java/com/alibaba/nacos/core/remote/AsyncListenContext.java index 46ec2dd54..8e6aeaa5d 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/AsyncListenContext.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/AsyncListenContext.java @@ -95,4 +95,17 @@ public class AsyncListenContext { return null; } + /** + * Judge whether contain listener for item. + * + * @param requestType request type + * @param listenKey listen key + * @return true if has contained, otherwise false + */ + public boolean containListener(String requestType, String listenKey) { + if (!listenContexts.containsKey(requestType)) { + return false; + } + return listenContexts.get(requestType).containsKey(listenKey); + } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/DataChangeListenerNotifier.java b/core/src/main/java/com/alibaba/nacos/core/remote/DataChangeListenerNotifier.java index 0fe03fef6..91e1cdcd2 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/DataChangeListenerNotifier.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/DataChangeListenerNotifier.java @@ -22,6 +22,7 @@ import com.alibaba.nacos.common.utils.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Collection; import java.util.Set; /** @@ -55,17 +56,29 @@ public class DataChangeListenerNotifier { public void configDataChanged(String groupKey, Response notifyResponse) { Set listeners = asyncListenContext.getListeners(NacosRemoteConstants.LISTEN_CONTEXT_CONFIG, groupKey); - if (!CollectionUtils.isEmpty(listeners)) { - for (String connectionId : listeners) { - Connection connection = connectionManager.getConnection(connectionId); - if (connection != null) { - connection.sendResponse(notifyResponse); - } + sendNotifyResponse(listeners, notifyResponse); + } + + /** + * Push service info to subscribe. + * + * @param serviceKey service key + * @param notifyResponse {@link com.alibaba.nacos.api.naming.pojo.ServiceInfo} + */ + public void serviceInfoChanged(String serviceKey, Response notifyResponse) { + Set listeners = asyncListenContext.getListeners(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey); + sendNotifyResponse(listeners, notifyResponse); + } + + private void sendNotifyResponse(Collection listeners, Response notifyResponse) { + if (CollectionUtils.isEmpty(listeners)) { + return; + } + for (String each : listeners) { + Connection connection = connectionManager.getConnection(each); + if (null != connection) { + connection.sendResponse(notifyResponse); } } } - - public void serviceIndoChanged(String serviceKey, Response notifyResponse) { - //TODO - } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceInfoGenerator.java b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceInfoGenerator.java new file mode 100644 index 000000000..f10d9d1b8 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceInfoGenerator.java @@ -0,0 +1,123 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.naming.misc.Loggers; +import com.alibaba.nacos.naming.misc.SwitchDomain; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Service information generator. + * + * @author xiweng.yy + */ +@Component +public class ServiceInfoGenerator { + + private final ServiceManager serviceManager; + + private final SwitchDomain switchDomain; + + public ServiceInfoGenerator(ServiceManager serviceManager, SwitchDomain switchDomain) { + this.serviceManager = serviceManager; + this.switchDomain = switchDomain; + } + + public ServiceInfo generateEmptyServiceInfo(String serviceName, String clusters) { + return new ServiceInfo(serviceName, clusters); + } + + /** + * Generate {@link ServiceInfo} for service and clusters. + * + * @param namespaceId namespace id of service + * @param serviceName service name + * @param clusters clusters of instances + * @param healthyOnly only healthy instances + * @param clientIp source client ip + * @return service information + * @throws NacosException when service is disabled + */ + public ServiceInfo generateServiceInfo(String namespaceId, String serviceName, String clusters, boolean healthyOnly, + String clientIp) throws NacosException { + if (!serviceManager.containService(namespaceId, serviceName)) { + return generateEmptyServiceInfo(serviceName, clusters); + } + Service service = serviceManager.getService(namespaceId, serviceName); + if (!service.getEnabled()) { + throw new NacosException(NacosException.SERVER_ERROR, + String.format("Service %s : %s is disable now", namespaceId, serviceName)); + } + return generateServiceInfo(service, clusters, healthyOnly, clientIp); + } + + /** + * Generate {@link ServiceInfo} for service and clusters. + * + * @param service service + * @param clusters clusters of instances + * @param healthyOnly only healthy instances + * @param clientIp source client ip + * @return service information + */ + public ServiceInfo generateServiceInfo(Service service, String clusters, boolean healthyOnly, String clientIp) { + // TODO the origin logic in {@link InstanceController#doSrvIpxt will try to add push. + ServiceInfo result = new ServiceInfo(service.getName(), clusters); + List instances = getInstanceFromService(service, clusters, healthyOnly, clientIp); + result.addAllHosts(instances); + result.setName(service.getName()); + result.setCacheMillis(switchDomain.getDefaultCacheMillis()); + result.setLastRefTime(System.currentTimeMillis()); + result.setChecksum(service.getChecksum()); + result.setClusters(clusters); + // TODO there are some parameters do not include in service info, but added to return in origin logic + return result; + } + + private List getInstanceFromService(Service service, String clusters, boolean healthyOnly, + String clientIp) { + List result = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ","))); + if (service.getSelector() != null && StringUtils.isNotBlank(clientIp)) { + result = service.getSelector().select(clientIp, result); + } + return result.isEmpty() ? result : healthyOnly ? doProtectThreshold(service, result) : result; + } + + private List doProtectThreshold(Service service, List instances) { + Map> healthyInstancesMap = new HashMap<>(2); + healthyInstancesMap.put(Boolean.TRUE, new LinkedList<>()); + healthyInstancesMap.put(Boolean.FALSE, new LinkedList<>()); + for (Instance each : instances) { + healthyInstancesMap.get(each.isHealthy()).add(each); + } + if ((float) healthyInstancesMap.get(Boolean.TRUE).size() / instances.size() <= service.getProtectThreshold()) { + Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", service.getName()); + healthyInstancesMap.get(Boolean.TRUE).addAll(healthyInstancesMap.get(Boolean.FALSE)); + healthyInstancesMap.get(Boolean.FALSE).clear(); + } + return healthyInstancesMap.get(Boolean.TRUE); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/SubscribeManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/SubscribeManager.java index 1cd1651c1..aa7667332 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/SubscribeManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/SubscribeManager.java @@ -27,6 +27,7 @@ import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.pojo.Subscribers; import com.alibaba.nacos.naming.push.PushService; +import com.alibaba.nacos.naming.push.RemotePushService; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -35,6 +36,7 @@ import java.net.HttpURLConnection; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -56,11 +58,17 @@ public class SubscribeManager { @Autowired private PushService pushService; + @Autowired + private RemotePushService remotePushService; + @Autowired private ServerMemberManager memberManager; private List getSubscribersFuzzy(String serviceName, String namespaceId) { - return pushService.getClientsFuzzy(serviceName, namespaceId); + List result = new LinkedList<>(); + result.addAll(pushService.getClientsFuzzy(serviceName, namespaceId)); + result.addAll(remotePushService.getSubscribes(namespaceId, serviceName)); + return result; } private List getSubscribers(String serviceName, String namespaceId) { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java b/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java index a7d1375c5..f3bf34ea7 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java @@ -121,7 +121,10 @@ public class PushService implements ApplicationContextAware, ApplicationListener Service service = event.getService(); String serviceName = service.getName(); String namespaceId = service.getNamespaceId(); - + //merge some change events to reduce the push frequency: + if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName))) { + return; + } Future future = GlobalExecutor.scheduleUdpSender(() -> { try { Loggers.PUSH.info(serviceName + " is changed, add it to push queue."); @@ -371,12 +374,6 @@ public class PushService implements ApplicationContextAware, ApplicationListener * @param service service */ public void serviceChanged(Service service) { - // merge some change events to reduce the push frequency: - if (futureMap - .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) { - return; - } - this.applicationContext.publishEvent(new ServiceChangeEvent(this, service)); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/push/RemotePushService.java b/naming/src/main/java/com/alibaba/nacos/naming/push/RemotePushService.java new file mode 100644 index 000000000..7cf8eed48 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/push/RemotePushService.java @@ -0,0 +1,100 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.push; + +import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; +import com.alibaba.nacos.common.utils.StringUtils; +import com.alibaba.nacos.core.remote.DataChangeListenerNotifier; +import com.alibaba.nacos.naming.core.Service; +import com.alibaba.nacos.naming.core.ServiceInfoGenerator; +import com.alibaba.nacos.naming.misc.UtilsAndCommons; +import com.alibaba.nacos.naming.pojo.Subscriber; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Remote push services. + * + * @author xiweng.yy + */ +@Component +public class RemotePushService implements ApplicationListener { + + private final ServiceInfoGenerator serviceInfoGenerator; + + private final DataChangeListenerNotifier notifier; + + /** + * ServiceKey --> actual Subscriber. The Subscriber may be only subscribe part of cluster of service. + */ + private final ConcurrentMap> serviceSubscribesMap = new ConcurrentHashMap<>(); + + public RemotePushService(ServiceInfoGenerator serviceInfoGenerator, DataChangeListenerNotifier notifier) { + this.serviceInfoGenerator = serviceInfoGenerator; + this.notifier = notifier; + } + + /** + * Register subscribe For service. + * + * @param serviceKey service key + * @param subscriber subscriber + */ + public void registerSubscribeForService(String serviceKey, Subscriber subscriber) { + if (!serviceSubscribesMap.containsKey(serviceKey)) { + serviceSubscribesMap.put(serviceKey, new ConcurrentHashSet<>()); + } + serviceSubscribesMap.get(serviceKey).add(subscriber); + } + + /** + * Remove subscribe For service. + * + * @param serviceKey service key + * @param subscriber subscriber + */ + public void removeSubscribeForService(String serviceKey, Subscriber subscriber) { + if (!serviceSubscribesMap.containsKey(serviceKey)) { + return; + } + serviceSubscribesMap.get(serviceKey).remove(subscriber); + } + + public Set getSubscribes(String namespaceId, String serviceName) { + return getSubscribes(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); + } + + public Set getSubscribes(String serviceKey) { + return serviceSubscribesMap.getOrDefault(serviceKey, new HashSet<>()); + } + + @Override + public void onApplicationEvent(ServiceChangeEvent serviceChangeEvent) { + Service service = serviceChangeEvent.getService(); + String serviceKey = UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()); + ServiceInfo serviceInfo = serviceInfoGenerator + .generateServiceInfo(service, StringUtils.EMPTY, false, StringUtils.EMPTY); + notifier.serviceInfoChanged(serviceKey, NotifySubscriberResponse.buildSuccessResponse(serviceInfo)); + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/SubscribeServiceRequestHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/SubscribeServiceRequestHandler.java deleted file mode 100644 index d84a4cfab..000000000 --- a/naming/src/main/java/com/alibaba/nacos/naming/remote/SubscribeServiceRequestHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 1999-2020 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.naming.remote; - -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; -import com.alibaba.nacos.api.remote.request.Request; -import com.alibaba.nacos.api.remote.request.RequestMeta; -import com.alibaba.nacos.api.remote.response.Response; -import com.alibaba.nacos.core.remote.AsyncListenContext; -import com.alibaba.nacos.core.remote.RequestHandler; -import com.google.common.collect.Lists; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.List; - -/** - * Handler to handle subscribe service. - * - * @author liuzunfei - * @author xiweng.yy - */ -@Component -public class SubscribeServiceRequestHandler extends RequestHandler { - - @Autowired - AsyncListenContext asyncListenContext; - - @Override - public Request parseBodyString(String bodyString) { - return null; - } - - @Override - public Response handle(Request request, RequestMeta meta) throws NacosException { - return null; - } - - @Override - public List getRequestTypes() { - return Lists.newArrayList(NamingRemoteConstants.SUBSCRIBE_SERVICE); - } -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/InstanceRequestHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/InstanceRequestHandler.java index db24c596f..2de23353b 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/InstanceRequestHandler.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/InstanceRequestHandler.java @@ -20,6 +20,7 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; import com.alibaba.nacos.api.naming.remote.request.InstanceRequest; import com.alibaba.nacos.api.naming.remote.response.InstanceResponse; +import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; @@ -56,7 +57,7 @@ public class InstanceRequestHandler extends RequestHandler { public Response handle(Request request, RequestMeta meta) throws NacosException { InstanceRequest instanceRequest = (InstanceRequest) request; String namespace = instanceRequest.getNamespace(); - String serviceName = instanceRequest.getServiceName(); + String serviceName = NamingUtils.getGroupedName(instanceRequest.getServiceName(), instanceRequest.getGroupName()); switch (instanceRequest.getType()) { case NamingRemoteConstants.REGISTER_INSTANCE: return registerInstance(namespace, serviceName, instanceRequest, meta); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ServiceQueryRequestHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ServiceQueryRequestHandler.java index 0b949daa8..7ddd1d63f 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ServiceQueryRequestHandler.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ServiceQueryRequestHandler.java @@ -20,28 +20,17 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; import com.alibaba.nacos.api.naming.remote.request.ServiceQueryRequest; -import com.alibaba.nacos.api.naming.remote.response.ServiceQueryResponse; +import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.core.remote.RequestHandler; -import com.alibaba.nacos.naming.core.Instance; -import com.alibaba.nacos.naming.core.Service; -import com.alibaba.nacos.naming.core.ServiceManager; -import com.alibaba.nacos.naming.misc.Loggers; -import com.alibaba.nacos.naming.misc.SwitchDomain; +import com.alibaba.nacos.naming.core.ServiceInfoGenerator; import com.google.common.collect.Lists; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedList; import java.util.List; -import java.util.Map; /** * Nacos query instances request handler. @@ -51,11 +40,11 @@ import java.util.Map; @Component public class ServiceQueryRequestHandler extends RequestHandler { - @Autowired - private ServiceManager serviceManager; + private final ServiceInfoGenerator serviceInfoGenerator; - @Autowired - private SwitchDomain switchDomain; + public ServiceQueryRequestHandler(ServiceInfoGenerator serviceInfoGenerator) { + this.serviceInfoGenerator = serviceInfoGenerator; + } @Override public ServiceQueryRequest parseBodyString(String bodyString) { @@ -67,50 +56,11 @@ public class ServiceQueryRequestHandler extends RequestHandler instances = getInstanceFromService(service, queryRequest, meta); - result.addAllHosts(instances); - result.setName(serviceName); - result.setCacheMillis(switchDomain.getDefaultCacheMillis()); - result.setLastRefTime(System.currentTimeMillis()); - result.setChecksum(service.getChecksum()); - result.setClusters(queryRequest.getCluster()); - // TODO there are some parameters do not include in service info, but added to return in origin logic - return new ServiceQueryResponse(result); - } - - private List getInstanceFromService(Service service, ServiceQueryRequest queryRequest, RequestMeta meta) { - List result = service.srvIPs(Arrays.asList(StringUtils.split(queryRequest.getCluster(), ","))); - if (service.getSelector() != null && StringUtils.isNotBlank(meta.getClientIp())) { - result = service.getSelector().select(meta.getClientIp(), result); - } - return result.isEmpty() ? result - : queryRequest.isHealthyOnly() ? doProtectThreshold(service, queryRequest, result) : result; - } - - private List doProtectThreshold(Service service, ServiceQueryRequest queryRequest, - List instances) { - Map> healthyInstancesMap = new HashMap<>(); - healthyInstancesMap.put(Boolean.TRUE, new LinkedList<>()); - healthyInstancesMap.put(Boolean.FALSE, new LinkedList<>()); - for (Instance each : instances) { - healthyInstancesMap.get(each.isHealthy()).add(each); - } - if ((float) healthyInstancesMap.get(Boolean.TRUE).size() / instances.size() <= service.getProtectThreshold()) { - Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", service.getName()); - healthyInstancesMap.get(Boolean.TRUE).addAll(healthyInstancesMap.get(Boolean.FALSE)); - healthyInstancesMap.get(Boolean.FALSE).clear(); - } - return healthyInstancesMap.get(Boolean.TRUE); + String cluster = null == queryRequest.getCluster() ? "" : queryRequest.getCluster(); + boolean healthyOnly = queryRequest.isHealthyOnly(); + ServiceInfo result = serviceInfoGenerator + .generateServiceInfo(namespaceId, serviceName, cluster, healthyOnly, meta.getClientIp()); + return QueryServiceResponse.buildSuccessResponse(result); } @Override diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/SubscribeServiceRequestHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/SubscribeServiceRequestHandler.java new file mode 100644 index 000000000..5e9383d23 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/SubscribeServiceRequestHandler.java @@ -0,0 +1,94 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.remote.handler; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; +import com.alibaba.nacos.api.naming.remote.request.SubscribeServiceRequest; +import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse; +import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.request.RequestMeta; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.api.remote.response.ResponseCode; +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.common.utils.StringUtils; +import com.alibaba.nacos.core.remote.AsyncListenContext; +import com.alibaba.nacos.core.remote.NacosRemoteConstants; +import com.alibaba.nacos.core.remote.RequestHandler; +import com.alibaba.nacos.naming.core.ServiceInfoGenerator; +import com.alibaba.nacos.naming.misc.UtilsAndCommons; +import com.alibaba.nacos.naming.pojo.Subscriber; +import com.alibaba.nacos.naming.push.RemotePushService; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * Handler to handle subscribe service. + * + * @author liuzunfei + * @author xiweng.yy + */ +@Component +public class SubscribeServiceRequestHandler extends RequestHandler { + + private final AsyncListenContext asyncListenContext; + + private final ServiceInfoGenerator serviceInfoGenerator; + + private final RemotePushService remotePushService; + + public SubscribeServiceRequestHandler(AsyncListenContext asyncListenContext, + ServiceInfoGenerator serviceInfoGenerator, RemotePushService remotePushService) { + this.asyncListenContext = asyncListenContext; + this.serviceInfoGenerator = serviceInfoGenerator; + this.remotePushService = remotePushService; + } + + @Override + public SubscribeServiceRequest parseBodyString(String bodyString) { + return JacksonUtils.toObj(bodyString, SubscribeServiceRequest.class); + } + + @Override + public Response handle(Request request, RequestMeta meta) throws NacosException { + SubscribeServiceRequest subscribeServiceRequest = (SubscribeServiceRequest) request; + String namespaceId = subscribeServiceRequest.getNamespace(); + String serviceName = subscribeServiceRequest.getServiceName(); + String serviceKey = UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName); + String connectionId = meta.getConnectionId(); + ServiceInfo serviceInfo = serviceInfoGenerator + .generateServiceInfo(namespaceId, serviceName, StringUtils.EMPTY, false, meta.getClientIp()); + Subscriber subscriber = new Subscriber(meta.getClientIp(), "", "unknown", meta.getClientIp(), namespaceId, + serviceName); + if (subscribeServiceRequest.isSubscribe()) { + asyncListenContext.addListen(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey, connectionId); + remotePushService.registerSubscribeForService(serviceKey, subscriber); + } else { + asyncListenContext.removeListen(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey, connectionId); + remotePushService.removeSubscribeForService(serviceKey, subscriber); + } + return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo); + } + + @Override + public List getRequestTypes() { + return Lists.newArrayList(NamingRemoteConstants.SUBSCRIBE_SERVICE); + } +}