[ISSUE #1097] Naming support un/subscribe service by grpc. (#3373)

* For #1097, server support subscribe service.

* For #1097, client support subscribe service.

* For #1097, server and client support unsubscribe service.
This commit is contained in:
杨翊 SionYang 2020-07-18 16:07:21 +08:00 committed by GitHub
parent 4a702f8954
commit 3bde28294d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 790 additions and 202 deletions

View File

@ -28,8 +28,9 @@ public class NamingRemoteConstants {
public static final String DE_REGISTER_INSTANCE = "deregisterInstance";
public static final String SUBSCRIBE_SERVICE = "subscribeService";
public static final String QUERY_SERVICE = "queryService";
public static final String SUBSCRIBE_SERVICE = "subscribeService";
public static final String NOTIFY_SUBSCRIBER = "notifySubscriber";
}

View File

@ -32,12 +32,6 @@ public class InstanceRequest extends NamingCommonRequest {
public InstanceRequest() {
}
public InstanceRequest(String namespace, String serviceName, String type, Instance instance) {
super(namespace, serviceName, null);
this.type = type;
this.instance = instance;
}
public InstanceRequest(String namespace, String serviceName, String groupName, String type, Instance instance) {
super(namespace, serviceName, groupName);
this.type = type;

View File

@ -0,0 +1,61 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.naming.remote.request;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
/**
* Nacos naming subscribe service request.
*
* @author xiweng.yy
*/
public class SubscribeServiceRequest extends NamingCommonRequest {
private boolean subscribe;
private String clusters;
public SubscribeServiceRequest() {
}
public SubscribeServiceRequest(String namespace, String serviceName, String clusters, boolean subscribe) {
super(namespace, serviceName, null);
this.clusters = clusters;
this.subscribe = subscribe;
}
@Override
public String getType() {
return NamingRemoteConstants.SUBSCRIBE_SERVICE;
}
public String getClusters() {
return clusters;
}
public void setClusters(String clusters) {
this.clusters = clusters;
}
public boolean isSubscribe() {
return subscribe;
}
public void setSubscribe(boolean subscribe) {
this.subscribe = subscribe;
}
}

View File

@ -30,13 +30,17 @@ public class InstanceResponse extends Response {
public InstanceResponse() {
}
public InstanceResponse(String type) {
this.type = type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String getType() {
return this.type;
}
public InstanceResponse(String type) {
this.type = type;
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.naming.remote.response;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode;
/**
* Notify subscriber response.
*
* @author xiweng.yy
*/
public class NotifySubscriberResponse extends Response {
private ServiceInfo serviceInfo;
public NotifySubscriberResponse() {
}
private NotifySubscriberResponse(ServiceInfo serviceInfo, String message) {
this.serviceInfo = serviceInfo;
setMessage(message);
}
public static NotifySubscriberResponse buildSuccessResponse(ServiceInfo serviceInfo) {
return new NotifySubscriberResponse(serviceInfo, "success");
}
/**
* Build fail response.
*
* @param message error message
* @return faile response
*/
public static NotifySubscriberResponse buildFailResponse(String message) {
NotifySubscriberResponse result = new NotifySubscriberResponse();
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage(message);
return result;
}
@Override
public String getType() {
return NamingRemoteConstants.NOTIFY_SUBSCRIBER;
}
public ServiceInfo getServiceInfo() {
return serviceInfo;
}
public void setServiceInfo(ServiceInfo serviceInfo) {
this.serviceInfo = serviceInfo;
}
}

View File

@ -26,11 +26,11 @@ import com.alibaba.nacos.api.remote.response.ResponseCode;
*
* @author xiweng.yy
*/
public class ServiceQueryResponse extends Response {
public class QueryServiceResponse extends Response {
private ServiceInfo serviceInfo;
public ServiceQueryResponse() {
public QueryServiceResponse() {
}
@Override
@ -38,21 +38,31 @@ public class ServiceQueryResponse extends Response {
return NamingRemoteConstants.QUERY_SERVICE;
}
public ServiceQueryResponse(ServiceInfo serviceInfo) {
private QueryServiceResponse(ServiceInfo serviceInfo) {
this.serviceInfo = serviceInfo;
}
public static ServiceQueryResponse buildSuccessResponse(ServiceInfo serviceInfo) {
ServiceQueryResponse serviceQueryResponse = new ServiceQueryResponse();
serviceQueryResponse.setServiceInfo(serviceInfo);
return serviceQueryResponse;
/**
* Build Success response.
*
* @param serviceInfo service info
* @return service query response
*/
public static QueryServiceResponse buildSuccessResponse(ServiceInfo serviceInfo) {
return new QueryServiceResponse(serviceInfo);
}
public static ServiceQueryResponse buildFailResponse(String message) {
ServiceQueryResponse serviceQueryResponse = new ServiceQueryResponse();
serviceQueryResponse.setResultCode(ResponseCode.FAIL.getCode());
serviceQueryResponse.setMessage(message);
return serviceQueryResponse;
/**
* Build fail response.
*
* @param message message
* @return service query response
*/
public static QueryServiceResponse buildFailResponse(String message) {
QueryServiceResponse queryServiceResponse = new QueryServiceResponse();
queryServiceResponse.setResultCode(ResponseCode.FAIL.getCode());
queryServiceResponse.setMessage(message);
return queryServiceResponse;
}
public ServiceInfo getServiceInfo() {

View File

@ -0,0 +1,54 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.naming.remote.response;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.remote.response.Response;
/**
* Nacos naming subscribe service response.
*
* @author xiweng.yy
*/
public class SubscribeServiceResponse extends Response {
private ServiceInfo serviceInfo;
public SubscribeServiceResponse() {
}
public SubscribeServiceResponse(int resultCode, String message, ServiceInfo serviceInfo) {
super();
setResultCode(resultCode);
setMessage(message);
this.serviceInfo = serviceInfo;
}
@Override
public String getType() {
return NamingRemoteConstants.SUBSCRIBE_SERVICE;
}
public ServiceInfo getServiceInfo() {
return serviceInfo;
}
public void setServiceInfo(ServiceInfo serviceInfo) {
this.serviceInfo = serviceInfo;
}
}

View File

@ -24,7 +24,9 @@ import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigResponseTypeConstants;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.naming.remote.response.InstanceResponse;
import com.alibaba.nacos.api.naming.remote.response.ServiceQueryResponse;
import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse;
import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse;
import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.response.HeartBeatResponse;
import com.alibaba.nacos.api.remote.response.ResponseTypeConstants;
@ -60,7 +62,9 @@ public class ResponseRegistry {
//naming response registry
REGISTRY_RESPONSES.put(NamingRemoteConstants.REGISTER_INSTANCE, InstanceResponse.class);
REGISTRY_RESPONSES.put(NamingRemoteConstants.DE_REGISTER_INSTANCE, InstanceResponse.class);
REGISTRY_RESPONSES.put(NamingRemoteConstants.QUERY_SERVICE, ServiceQueryResponse.class);
REGISTRY_RESPONSES.put(NamingRemoteConstants.QUERY_SERVICE, QueryServiceResponse.class);
REGISTRY_RESPONSES.put(NamingRemoteConstants.SUBSCRIBE_SERVICE, SubscribeServiceResponse.class);
REGISTRY_RESPONSES.put(NamingRemoteConstants.NOTIFY_SUBSCRIBER, NotifySubscriberResponse.class);
}
public static Class getClassByType(String type) {

View File

@ -26,15 +26,16 @@ import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.client.naming.beat.BeatInfo;
import com.alibaba.nacos.client.naming.beat.BeatReactor;
import com.alibaba.nacos.client.naming.core.Balancer;
import com.alibaba.nacos.client.naming.core.EventDispatcher;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.naming.net.NamingProxy;
import com.alibaba.nacos.client.naming.net.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.InitUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.client.remote.ServerListFactory;
import com.alibaba.nacos.client.utils.ValidatorUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.StringUtils;
@ -43,6 +44,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Nacos Naming Service.
@ -73,6 +75,8 @@ public class NacosNamingService implements NamingService {
private NamingProxy serverProxy;
private NamingGrpcClientProxy grpcClientProxy;
public NacosNamingService(String serverList) throws NacosException {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
@ -97,6 +101,24 @@ public class NacosNamingService implements NamingService {
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
isLoadCacheAtStart(properties), initPollingThreadCount(properties));
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, hostReactor);
grpcClientProxy.start(new ServerListFactory() {
private final AtomicInteger index = new AtomicInteger();
private final String[] serverLists = serverList.split(",");
@Override
public String genNextServer() {
int nextIndex = index.getAndIncrement() % serverLists.length;
return serverLists[nextIndex];
}
@Override
public String getCurrentServer() {
return serverLists[index.get() % serverLists.length];
}
});
}
private int initClientBeatThreadCount(Properties properties) {
@ -192,12 +214,13 @@ public class NacosNamingService implements NamingService {
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
// String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// if (instance.isEphemeral()) {
// BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// beatReactor.addBeatInfo(groupedServiceName, beatInfo);
// }
// serverProxy.registerService(groupedServiceName, groupName, instance);
grpcClientProxy.registerService(serviceName, groupName, instance);
}
@Override
@ -233,11 +256,12 @@ public class NacosNamingService implements NamingService {
@Override
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(),
instance.getPort());
}
serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
// if (instance.isEphemeral()) {
// beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(),
// instance.getPort());
// }
// serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
grpcClientProxy.deregisterService(serviceName, groupName, instance);
}
@Override
@ -287,9 +311,11 @@ public class NacosNamingService implements NamingService {
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
// serviceInfo = hostReactor
// .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
// StringUtils.join(clusters, ","));
serviceInfo = grpcClientProxy.queryInstancesOfService(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","), 0, false);
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
@ -442,8 +468,11 @@ public class NacosNamingService implements NamingService {
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
eventDispatcher.addListener(hostReactor
.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")),
// eventDispatcher.addListener(hostReactor
// .getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")),
// StringUtils.join(clusters, ","), listener);
eventDispatcher.addListener(grpcClientProxy
.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")),
StringUtils.join(clusters, ","), listener);
}
@ -465,9 +494,12 @@ public class NacosNamingService implements NamingService {
@Override
public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
eventDispatcher
.removeListener(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","),
listener);
String fullServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String clustersString = StringUtils.join(clusters, ",");
eventDispatcher.removeListener(fullServiceName, clustersString, listener);
if (!eventDispatcher.isSubscribed(fullServiceName, clustersString)) {
grpcClientProxy.unsubscribe(fullServiceName, clustersString);
}
}
@Override

View File

@ -125,6 +125,17 @@ public class HostReactor implements Closeable {
*/
public ServiceInfo processServiceJson(String json) {
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
serviceInfo.setJsonFromServer(json);
return processServiceJson(serviceInfo);
}
/**
* Process service info.
*
* @param serviceInfo new service info
* @return service info
*/
public ServiceInfo processServiceJson(ServiceInfo serviceInfo) {
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
//empty or error push, just ignore
@ -204,8 +215,6 @@ public class HostReactor implements Closeable {
+ JacksonUtils.toJson(modHosts));
}
serviceInfo.setJsonFromServer(json);
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
@ -217,10 +226,13 @@ public class HostReactor implements Closeable {
+ JacksonUtils.toJson(serviceInfo.getHosts()));
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
eventDispatcher.serviceChanged(serviceInfo);
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
}
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
@ -271,10 +283,10 @@ public class HostReactor implements Closeable {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updatingService(serviceName);
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
finishUpdating(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
@ -367,6 +379,14 @@ public class HostReactor implements Closeable {
NAMING_LOGGER.info("{} do shutdown stop", className);
}
public void updatingService(String serviceName) {
updatingMap.put(serviceName, new Object());
}
public void finishUpdating(String serviceName) {
updatingMap.remove(serviceName);
}
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.net;
package com.alibaba.nacos.client.naming.net.gprc;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
@ -22,9 +22,12 @@ import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.naming.remote.request.InstanceRequest;
import com.alibaba.nacos.api.naming.remote.request.ServiceQueryRequest;
import com.alibaba.nacos.api.naming.remote.response.ServiceQueryResponse;
import com.alibaba.nacos.api.naming.remote.request.SubscribeServiceRequest;
import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse;
import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.remote.RpcClient;
import com.alibaba.nacos.client.remote.RpcClientFactory;
import com.alibaba.nacos.client.remote.ServerListFactory;
@ -40,16 +43,26 @@ public class NamingGrpcClientProxy {
private final String namespaceId;
private HostReactor hostReactor;
private RpcClient rpcClient;
public NamingGrpcClientProxy(String namespaceId) {
public NamingGrpcClientProxy(String namespaceId, HostReactor hostReactor) {
this.namespaceId = namespaceId;
rpcClient = RpcClientFactory.getClient("naming");
this.hostReactor = hostReactor;
this.rpcClient = RpcClientFactory.getClient("naming");
}
/**
* Start Grpc client proxy.
*
* @param serverListFactory server list factory
* @throws NacosException nacos exception
*/
public void start(ServerListFactory serverListFactory) throws NacosException {
rpcClient.init(serverListFactory);
rpcClient.start();
rpcClient.registerServerPushResponseHandler(new NamingPushResponseHandler(hostReactor));
}
/**
@ -73,14 +86,15 @@ public class NamingGrpcClientProxy {
* deregister instance from a service.
*
* @param serviceName name of service
* @param groupName group name
* @param instance instance
* @throws NacosException nacos exception
*/
public void deregisterService(String serviceName, Instance instance) throws NacosException {
public void deregisterService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER
.info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}", namespaceId, serviceName,
instance);
InstanceRequest request = new InstanceRequest(namespaceId, serviceName,
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.DE_REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
}
@ -101,10 +115,44 @@ public class NamingGrpcClientProxy {
request.setCluster(clusters);
request.setHealthyOnly(healthyOnly);
request.setUdpPort(udpPort);
ServiceQueryResponse response = requestToServer(request, ServiceQueryResponse.class);
QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);
return response.getServiceInfo();
}
/**
* Subscribe service.
*
* @param serviceName full service name with group
* @param clusters clusters, current only support subscribe all clusters, maybe deprecated
* @return current service info of subscribe service
* @throws NacosException nacos exception
*/
public ServiceInfo subscribe(String serviceName, String clusters) throws NacosException {
ServiceInfo serviceInfo = new ServiceInfo(serviceName, clusters);
if (hostReactor.getServiceInfoMap().containsKey(serviceInfo.getKey())) {
return hostReactor.getServiceInfoMap().get(serviceInfo.getKey());
}
hostReactor.updatingService(serviceName);
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceName, clusters, true);
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
ServiceInfo result = response.getServiceInfo();
hostReactor.getServiceInfoMap().put(result.getKey(), result);
hostReactor.finishUpdating(serviceName);
return result;
}
/**
* Unsubscribe service.
*
* @param serviceName full service name with group
* @param clusters clusters, current only support subscribe all clusters, maybe deprecated
* @throws NacosException nacos exception
*/
public void unsubscribe(String serviceName, String clusters) throws NacosException {
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, serviceName, clusters, false);
requestToServer(request, SubscribeServiceResponse.class);
}
private <T extends Response> T requestToServer(Request request, Class<T> responseClass) throws NacosException {
try {
Response response = rpcClient.request(request);

View File

@ -0,0 +1,42 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.net.gprc;
import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.naming.core.HostReactor;
import com.alibaba.nacos.client.remote.ServerPushResponseHandler;
/**
* Naming push response handler.
*
* @author xiweng.yy
*/
public class NamingPushResponseHandler implements ServerPushResponseHandler<NotifySubscriberResponse> {
private final HostReactor hostReactor;
public NamingPushResponseHandler(HostReactor hostReactor) {
this.hostReactor = hostReactor;
}
@Override
public void responseReply(Response response) {
NotifySubscriberResponse notifyResponse = (NotifySubscriberResponse) response;
hostReactor.processServiceJson(notifyResponse.getServiceInfo());
}
}

View File

@ -20,15 +20,16 @@ import com.alibaba.nacos.api.remote.response.Response;
/**
* ServerPushResponseHandler.
*
* @author liuzunfei
* @version $Id: ServerPushResponseHandler.java, v 0.1 2020年07月14日 11:41 AM liuzunfei Exp $
*/
public abstract interface ServerPushResponseHandler<T> {
public interface ServerPushResponseHandler<T> {
/**
* handle logic when response ceceive.
* @param response.
* Handle logic when response received.
* @param response response
*/
public abstract void responseReply(Response response);
void responseReply(Response response);
}

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.common.utils;
import java.util.AbstractSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
@ -44,9 +45,14 @@ public class ConcurrentHashSet<E> extends AbstractSet<E> {
return map.containsKey(o);
}
/**
* The original implement <p>map.keySet().iterator()</p> need jdk8, so it can work.
*
* @return iterator
*/
@Override
public Iterator<E> iterator() {
return map.keySet().iterator();
return new HashSet<E>(map.keySet()).iterator();
}
@Override

View File

@ -95,4 +95,17 @@ public class AsyncListenContext {
return null;
}
/**
* Judge whether contain listener for item.
*
* @param requestType request type
* @param listenKey listen key
* @return true if has contained, otherwise false
*/
public boolean containListener(String requestType, String listenKey) {
if (!listenContexts.containsKey(requestType)) {
return false;
}
return listenContexts.get(requestType).containsKey(listenKey);
}
}

View File

@ -22,6 +22,7 @@ import com.alibaba.nacos.common.utils.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.Set;
/**
@ -55,17 +56,29 @@ public class DataChangeListenerNotifier {
public void configDataChanged(String groupKey, Response notifyResponse) {
Set<String> listeners = asyncListenContext.getListeners(NacosRemoteConstants.LISTEN_CONTEXT_CONFIG, groupKey);
if (!CollectionUtils.isEmpty(listeners)) {
for (String connectionId : listeners) {
Connection connection = connectionManager.getConnection(connectionId);
if (connection != null) {
connection.sendResponse(notifyResponse);
}
sendNotifyResponse(listeners, notifyResponse);
}
/**
* Push service info to subscribe.
*
* @param serviceKey service key
* @param notifyResponse {@link com.alibaba.nacos.api.naming.pojo.ServiceInfo}
*/
public void serviceInfoChanged(String serviceKey, Response notifyResponse) {
Set<String> listeners = asyncListenContext.getListeners(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey);
sendNotifyResponse(listeners, notifyResponse);
}
private void sendNotifyResponse(Collection<String> listeners, Response notifyResponse) {
if (CollectionUtils.isEmpty(listeners)) {
return;
}
for (String each : listeners) {
Connection connection = connectionManager.getConnection(each);
if (null != connection) {
connection.sendResponse(notifyResponse);
}
}
}
public void serviceIndoChanged(String serviceKey, Response notifyResponse) {
//TODO
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* Service information generator.
*
* @author xiweng.yy
*/
@Component
public class ServiceInfoGenerator {
private final ServiceManager serviceManager;
private final SwitchDomain switchDomain;
public ServiceInfoGenerator(ServiceManager serviceManager, SwitchDomain switchDomain) {
this.serviceManager = serviceManager;
this.switchDomain = switchDomain;
}
public ServiceInfo generateEmptyServiceInfo(String serviceName, String clusters) {
return new ServiceInfo(serviceName, clusters);
}
/**
* Generate {@link ServiceInfo} for service and clusters.
*
* @param namespaceId namespace id of service
* @param serviceName service name
* @param clusters clusters of instances
* @param healthyOnly only healthy instances
* @param clientIp source client ip
* @return service information
* @throws NacosException when service is disabled
*/
public ServiceInfo generateServiceInfo(String namespaceId, String serviceName, String clusters, boolean healthyOnly,
String clientIp) throws NacosException {
if (!serviceManager.containService(namespaceId, serviceName)) {
return generateEmptyServiceInfo(serviceName, clusters);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (!service.getEnabled()) {
throw new NacosException(NacosException.SERVER_ERROR,
String.format("Service %s : %s is disable now", namespaceId, serviceName));
}
return generateServiceInfo(service, clusters, healthyOnly, clientIp);
}
/**
* Generate {@link ServiceInfo} for service and clusters.
*
* @param service service
* @param clusters clusters of instances
* @param healthyOnly only healthy instances
* @param clientIp source client ip
* @return service information
*/
public ServiceInfo generateServiceInfo(Service service, String clusters, boolean healthyOnly, String clientIp) {
// TODO the origin logic in {@link InstanceController#doSrvIpxt will try to add push.
ServiceInfo result = new ServiceInfo(service.getName(), clusters);
List<Instance> instances = getInstanceFromService(service, clusters, healthyOnly, clientIp);
result.addAllHosts(instances);
result.setName(service.getName());
result.setCacheMillis(switchDomain.getDefaultCacheMillis());
result.setLastRefTime(System.currentTimeMillis());
result.setChecksum(service.getChecksum());
result.setClusters(clusters);
// TODO there are some parameters do not include in service info, but added to return in origin logic
return result;
}
private List<Instance> getInstanceFromService(Service service, String clusters, boolean healthyOnly,
String clientIp) {
List<Instance> result = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
if (service.getSelector() != null && StringUtils.isNotBlank(clientIp)) {
result = service.getSelector().select(clientIp, result);
}
return result.isEmpty() ? result : healthyOnly ? doProtectThreshold(service, result) : result;
}
private List<Instance> doProtectThreshold(Service service, List<Instance> instances) {
Map<Boolean, List<Instance>> healthyInstancesMap = new HashMap<>(2);
healthyInstancesMap.put(Boolean.TRUE, new LinkedList<>());
healthyInstancesMap.put(Boolean.FALSE, new LinkedList<>());
for (Instance each : instances) {
healthyInstancesMap.get(each.isHealthy()).add(each);
}
if ((float) healthyInstancesMap.get(Boolean.TRUE).size() / instances.size() <= service.getProtectThreshold()) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", service.getName());
healthyInstancesMap.get(Boolean.TRUE).addAll(healthyInstancesMap.get(Boolean.FALSE));
healthyInstancesMap.get(Boolean.FALSE).clear();
}
return healthyInstancesMap.get(Boolean.TRUE);
}
}

View File

@ -27,6 +27,7 @@ import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.pojo.Subscribers;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.push.RemotePushService;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -35,6 +36,7 @@ import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -56,11 +58,17 @@ public class SubscribeManager {
@Autowired
private PushService pushService;
@Autowired
private RemotePushService remotePushService;
@Autowired
private ServerMemberManager memberManager;
private List<Subscriber> getSubscribersFuzzy(String serviceName, String namespaceId) {
return pushService.getClientsFuzzy(serviceName, namespaceId);
List<Subscriber> result = new LinkedList<>();
result.addAll(pushService.getClientsFuzzy(serviceName, namespaceId));
result.addAll(remotePushService.getSubscribes(namespaceId, serviceName));
return result;
}
private List<Subscriber> getSubscribers(String serviceName, String namespaceId) {

View File

@ -121,7 +121,10 @@ public class PushService implements ApplicationContextAware, ApplicationListener
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
//merge some change events to reduce the push frequency:
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName))) {
return;
}
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
@ -371,12 +374,6 @@ public class PushService implements ApplicationContextAware, ApplicationListener
* @param service service
*/
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap
.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}

View File

@ -0,0 +1,100 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.remote.DataChangeListenerNotifier;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceInfoGenerator;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Remote push services.
*
* @author xiweng.yy
*/
@Component
public class RemotePushService implements ApplicationListener<ServiceChangeEvent> {
private final ServiceInfoGenerator serviceInfoGenerator;
private final DataChangeListenerNotifier notifier;
/**
* ServiceKey --> actual Subscriber. The Subscriber may be only subscribe part of cluster of service.
*/
private final ConcurrentMap<String, Set<Subscriber>> serviceSubscribesMap = new ConcurrentHashMap<>();
public RemotePushService(ServiceInfoGenerator serviceInfoGenerator, DataChangeListenerNotifier notifier) {
this.serviceInfoGenerator = serviceInfoGenerator;
this.notifier = notifier;
}
/**
* Register subscribe For service.
*
* @param serviceKey service key
* @param subscriber subscriber
*/
public void registerSubscribeForService(String serviceKey, Subscriber subscriber) {
if (!serviceSubscribesMap.containsKey(serviceKey)) {
serviceSubscribesMap.put(serviceKey, new ConcurrentHashSet<>());
}
serviceSubscribesMap.get(serviceKey).add(subscriber);
}
/**
* Remove subscribe For service.
*
* @param serviceKey service key
* @param subscriber subscriber
*/
public void removeSubscribeForService(String serviceKey, Subscriber subscriber) {
if (!serviceSubscribesMap.containsKey(serviceKey)) {
return;
}
serviceSubscribesMap.get(serviceKey).remove(subscriber);
}
public Set<Subscriber> getSubscribes(String namespaceId, String serviceName) {
return getSubscribes(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
public Set<Subscriber> getSubscribes(String serviceKey) {
return serviceSubscribesMap.getOrDefault(serviceKey, new HashSet<>());
}
@Override
public void onApplicationEvent(ServiceChangeEvent serviceChangeEvent) {
Service service = serviceChangeEvent.getService();
String serviceKey = UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName());
ServiceInfo serviceInfo = serviceInfoGenerator
.generateServiceInfo(service, StringUtils.EMPTY, false, StringUtils.EMPTY);
notifier.serviceInfoChanged(serviceKey, NotifySubscriberResponse.buildSuccessResponse(serviceInfo));
}
}

View File

@ -1,58 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.core.remote.AsyncListenContext;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Handler to handle subscribe service.
*
* @author liuzunfei
* @author xiweng.yy
*/
@Component
public class SubscribeServiceRequestHandler extends RequestHandler {
@Autowired
AsyncListenContext asyncListenContext;
@Override
public Request parseBodyString(String bodyString) {
return null;
}
@Override
public Response handle(Request request, RequestMeta meta) throws NacosException {
return null;
}
@Override
public List<String> getRequestTypes() {
return Lists.newArrayList(NamingRemoteConstants.SUBSCRIBE_SERVICE);
}
}

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.naming.remote.request.InstanceRequest;
import com.alibaba.nacos.api.naming.remote.response.InstanceResponse;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
@ -56,7 +57,7 @@ public class InstanceRequestHandler extends RequestHandler<InstanceRequest> {
public Response handle(Request request, RequestMeta meta) throws NacosException {
InstanceRequest instanceRequest = (InstanceRequest) request;
String namespace = instanceRequest.getNamespace();
String serviceName = instanceRequest.getServiceName();
String serviceName = NamingUtils.getGroupedName(instanceRequest.getServiceName(), instanceRequest.getGroupName());
switch (instanceRequest.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(namespace, serviceName, instanceRequest, meta);

View File

@ -20,28 +20,17 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.naming.remote.request.ServiceQueryRequest;
import com.alibaba.nacos.api.naming.remote.response.ServiceQueryResponse;
import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.core.ServiceInfoGenerator;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* Nacos query instances request handler.
@ -51,11 +40,11 @@ import java.util.Map;
@Component
public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest> {
@Autowired
private ServiceManager serviceManager;
private final ServiceInfoGenerator serviceInfoGenerator;
@Autowired
private SwitchDomain switchDomain;
public ServiceQueryRequestHandler(ServiceInfoGenerator serviceInfoGenerator) {
this.serviceInfoGenerator = serviceInfoGenerator;
}
@Override
public ServiceQueryRequest parseBodyString(String bodyString) {
@ -67,50 +56,11 @@ public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryReque
ServiceQueryRequest queryRequest = (ServiceQueryRequest) request;
String namespaceId = queryRequest.getNamespace();
String serviceName = queryRequest.getServiceName();
if (!serviceManager.containService(namespaceId, serviceName)) {
return new ServiceQueryResponse(new ServiceInfo(serviceName, queryRequest.getCluster()));
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (!service.getEnabled()) {
throw new NacosException(NacosException.SERVER_ERROR,
String.format("Service %s : %s is disable now", namespaceId, serviceName));
}
// TODO the origin logic in {@link InstanceController#doSrvIpxt will try to add push.
ServiceInfo result = new ServiceInfo(serviceName, queryRequest.getCluster());
List<Instance> instances = getInstanceFromService(service, queryRequest, meta);
result.addAllHosts(instances);
result.setName(serviceName);
result.setCacheMillis(switchDomain.getDefaultCacheMillis());
result.setLastRefTime(System.currentTimeMillis());
result.setChecksum(service.getChecksum());
result.setClusters(queryRequest.getCluster());
// TODO there are some parameters do not include in service info, but added to return in origin logic
return new ServiceQueryResponse(result);
}
private List<Instance> getInstanceFromService(Service service, ServiceQueryRequest queryRequest, RequestMeta meta) {
List<Instance> result = service.srvIPs(Arrays.asList(StringUtils.split(queryRequest.getCluster(), ",")));
if (service.getSelector() != null && StringUtils.isNotBlank(meta.getClientIp())) {
result = service.getSelector().select(meta.getClientIp(), result);
}
return result.isEmpty() ? result
: queryRequest.isHealthyOnly() ? doProtectThreshold(service, queryRequest, result) : result;
}
private List<Instance> doProtectThreshold(Service service, ServiceQueryRequest queryRequest,
List<Instance> instances) {
Map<Boolean, List<Instance>> healthyInstancesMap = new HashMap<>();
healthyInstancesMap.put(Boolean.TRUE, new LinkedList<>());
healthyInstancesMap.put(Boolean.FALSE, new LinkedList<>());
for (Instance each : instances) {
healthyInstancesMap.get(each.isHealthy()).add(each);
}
if ((float) healthyInstancesMap.get(Boolean.TRUE).size() / instances.size() <= service.getProtectThreshold()) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", service.getName());
healthyInstancesMap.get(Boolean.TRUE).addAll(healthyInstancesMap.get(Boolean.FALSE));
healthyInstancesMap.get(Boolean.FALSE).clear();
}
return healthyInstancesMap.get(Boolean.TRUE);
String cluster = null == queryRequest.getCluster() ? "" : queryRequest.getCluster();
boolean healthyOnly = queryRequest.isHealthyOnly();
ServiceInfo result = serviceInfoGenerator
.generateServiceInfo(namespaceId, serviceName, cluster, healthyOnly, meta.getClientIp());
return QueryServiceResponse.buildSuccessResponse(result);
}
@Override

View File

@ -0,0 +1,94 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote.handler;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.naming.remote.request.SubscribeServiceRequest;
import com.alibaba.nacos.api.naming.remote.response.SubscribeServiceResponse;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.remote.AsyncListenContext;
import com.alibaba.nacos.core.remote.NacosRemoteConstants;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.naming.core.ServiceInfoGenerator;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.RemotePushService;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Handler to handle subscribe service.
*
* @author liuzunfei
* @author xiweng.yy
*/
@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest> {
private final AsyncListenContext asyncListenContext;
private final ServiceInfoGenerator serviceInfoGenerator;
private final RemotePushService remotePushService;
public SubscribeServiceRequestHandler(AsyncListenContext asyncListenContext,
ServiceInfoGenerator serviceInfoGenerator, RemotePushService remotePushService) {
this.asyncListenContext = asyncListenContext;
this.serviceInfoGenerator = serviceInfoGenerator;
this.remotePushService = remotePushService;
}
@Override
public SubscribeServiceRequest parseBodyString(String bodyString) {
return JacksonUtils.toObj(bodyString, SubscribeServiceRequest.class);
}
@Override
public Response handle(Request request, RequestMeta meta) throws NacosException {
SubscribeServiceRequest subscribeServiceRequest = (SubscribeServiceRequest) request;
String namespaceId = subscribeServiceRequest.getNamespace();
String serviceName = subscribeServiceRequest.getServiceName();
String serviceKey = UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName);
String connectionId = meta.getConnectionId();
ServiceInfo serviceInfo = serviceInfoGenerator
.generateServiceInfo(namespaceId, serviceName, StringUtils.EMPTY, false, meta.getClientIp());
Subscriber subscriber = new Subscriber(meta.getClientIp(), "", "unknown", meta.getClientIp(), namespaceId,
serviceName);
if (subscribeServiceRequest.isSubscribe()) {
asyncListenContext.addListen(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey, connectionId);
remotePushService.registerSubscribeForService(serviceKey, subscriber);
} else {
asyncListenContext.removeListen(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey, connectionId);
remotePushService.removeSubscribeForService(serviceKey, subscriber);
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
@Override
public List<String> getRequestTypes() {
return Lists.newArrayList(NamingRemoteConstants.SUBSCRIBE_SERVICE);
}
}