Remove context when grpc connection close. (#3407)

This commit is contained in:
杨翊 SionYang 2020-07-21 20:28:24 +08:00 committed by GitHub
parent b28636c6de
commit 5029429a89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 256 additions and 218 deletions

View File

@ -18,15 +18,15 @@ package com.alibaba.nacos.api.naming.remote.response;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants; import com.alibaba.nacos.api.naming.remote.NamingRemoteConstants;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode; import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.api.remote.response.ServerPushResponse;
/** /**
* Notify subscriber response. * Notify subscriber response.
* *
* @author xiweng.yy * @author xiweng.yy
*/ */
public class NotifySubscriberResponse extends Response { public class NotifySubscriberResponse extends ServerPushResponse {
private ServiceInfo serviceInfo; private ServiceInfo serviceInfo;

View File

@ -1,111 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.springframework.stereotype.Service;
/**
* AsyncListenContext. conserve which clientId that insterest in which congif key.
*
* @author liuzunfei
* @version $Id: AsyncListenContext.java, v 0.1 2020年07月14日 10:13 AM liuzunfei Exp $
*/
@Service
public class AsyncListenContext {
private Map<String, Map<String, Set<String>>> listenContexts = new HashMap<String, Map<String, Set<String>>>();
/**
* add listen .
*
* @param requestType requestType
* @param listenKey listenKey.
* @param connectionId connectionId.
*/
public void addListen(String requestType, String listenKey, String connectionId) {
Map<String, Set<String>> listenClients = listenContexts.get(requestType);
if (listenClients == null) {
listenContexts.putIfAbsent(requestType, new HashMap<String, Set<String>>());
listenClients = listenContexts.get(requestType);
}
Set<String> connectionIds = listenClients.get(listenKey);
if (connectionIds == null) {
listenClients.putIfAbsent(listenKey, new HashSet<String>());
connectionIds = listenClients.get(listenKey);
}
boolean addSuccess = connectionIds.add(connectionId);
if (addSuccess) {
//TODO add log ...success to add listen
}
}
/**
* remove listen context for connectionId ..
*
* @param requestType requestType
* @param lisnteKey lisnteKey
* @param connectionId connectionId
*/
public void removeListen(String requestType, String lisnteKey, String connectionId) {
Map<String, Set<String>> stringSetMap = listenContexts.get(requestType);
if (stringSetMap == null || stringSetMap.isEmpty()) {
return;
}
Set<String> connectionIds = stringSetMap.get(lisnteKey);
if (connectionIds == null) {
return;
}
boolean remove = connectionIds.remove(connectionId);
if (remove) {
//TODO add log ...success to remove listen
}
}
public Set<String> getListeners(String requestType, String listenKey) {
if (listenContexts.containsKey(requestType)) {
Map<String, Set<String>> stringSetMap = listenContexts.get(requestType);
return stringSetMap.get(listenKey);
}
return null;
}
/**
* Judge whether contain listener for item.
*
* @param requestType request type
* @param listenKey listen key
* @return true if has contained, otherwise false
*/
public boolean containListener(String requestType, String listenKey) {
if (!listenContexts.containsKey(requestType)) {
return false;
}
return listenContexts.get(requestType).containsKey(listenKey);
}
}

View File

@ -33,6 +33,9 @@ import java.util.Map;
@Service @Service
public class ConnectionManager { public class ConnectionManager {
@Autowired
private ClientConnectionEventListenerRegistry connectionEventListenerRegistry;
Map<String, Connection> connetions = new HashMap<String, Connection>(); Map<String, Connection> connetions = new HashMap<String, Connection>();
@Autowired @Autowired
@ -54,6 +57,7 @@ public class ConnectionManager {
/** /**
* unregister a connection . * unregister a connection .
*
* @param connectionId connectionId. * @param connectionId connectionId.
*/ */
public void unregister(String connectionId) { public void unregister(String connectionId) {

View File

@ -1,84 +0,0 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.remote.connection.Connection;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.utils.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.Set;
/**
* DataChangeListenerNotifier.
*
* @author liuzunfei
* @version $Id: DataChangeLisenerNotifier.java, v 0.1 2020年07月14日 10:44 AM liuzunfei Exp $
*/
@Service
public class DataChangeListenerNotifier {
/**
* connect manager.
*/
@Autowired
ConnectionManager connectionManager;
/**
* asyncListenContext.
*/
@Autowired
AsyncListenContext asyncListenContext;
/**
* adaptor to config module ,when server side congif change ,invoke this method.
*
* @param groupKey groupKey
* @param notifyResponse notifyResponse
*/
public void configDataChanged(String groupKey, Response notifyResponse) {
Set<String> listeners = asyncListenContext.getListeners(NacosRemoteConstants.LISTEN_CONTEXT_CONFIG, groupKey);
sendNotifyResponse(listeners, notifyResponse);
}
/**
* Push service info to subscribe.
*
* @param serviceKey service key
* @param notifyResponse {@link com.alibaba.nacos.api.naming.pojo.ServiceInfo}
*/
public void serviceInfoChanged(String serviceKey, Response notifyResponse) {
Set<String> listeners = asyncListenContext.getListeners(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey);
sendNotifyResponse(listeners, notifyResponse);
}
private void sendNotifyResponse(Collection<String> listeners, Response notifyResponse) {
if (CollectionUtils.isEmpty(listeners)) {
return;
}
for (String each : listeners) {
Connection connection = connectionManager.getConnection(each);
if (null != connection) {
connection.sendResponse(notifyResponse);
}
}
}
}

View File

@ -20,7 +20,7 @@ import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse; import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse;
import com.alibaba.nacos.common.utils.ConcurrentHashSet; import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.remote.DataChangeListenerNotifier; import com.alibaba.nacos.core.remote.RpcPushService;
import com.alibaba.nacos.naming.core.Service; import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceInfoGenerator; import com.alibaba.nacos.naming.core.ServiceInfoGenerator;
import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.misc.UtilsAndCommons;
@ -43,14 +43,16 @@ public class RemotePushService implements ApplicationListener<ServiceChangeEvent
private final ServiceInfoGenerator serviceInfoGenerator; private final ServiceInfoGenerator serviceInfoGenerator;
private final DataChangeListenerNotifier notifier; private final RpcPushService notifier;
/** /**
* ServiceKey --> actual Subscriber. The Subscriber may be only subscribe part of cluster of service. * ServiceKey --> actual Subscriber. The Subscriber may be only subscribe part of cluster of service.
*/ */
private final ConcurrentMap<String, Set<Subscriber>> serviceSubscribesMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Set<Subscriber>> serviceSubscribesMap = new ConcurrentHashMap<>();
public RemotePushService(ServiceInfoGenerator serviceInfoGenerator, DataChangeListenerNotifier notifier) { private final ConcurrentMap<Subscriber, String> subscribeConnectionMap = new ConcurrentHashMap<>();
public RemotePushService(ServiceInfoGenerator serviceInfoGenerator, RpcPushService notifier) {
this.serviceInfoGenerator = serviceInfoGenerator; this.serviceInfoGenerator = serviceInfoGenerator;
this.notifier = notifier; this.notifier = notifier;
} }
@ -58,14 +60,16 @@ public class RemotePushService implements ApplicationListener<ServiceChangeEvent
/** /**
* Register subscribe For service. * Register subscribe For service.
* *
* @param serviceKey service key * @param serviceKey service key
* @param subscriber subscriber * @param subscriber subscriber
* @param connectionId connection Id of subscriber
*/ */
public void registerSubscribeForService(String serviceKey, Subscriber subscriber) { public void registerSubscribeForService(String serviceKey, Subscriber subscriber, String connectionId) {
if (!serviceSubscribesMap.containsKey(serviceKey)) { if (!serviceSubscribesMap.containsKey(serviceKey)) {
serviceSubscribesMap.put(serviceKey, new ConcurrentHashSet<>()); serviceSubscribesMap.put(serviceKey, new ConcurrentHashSet<>());
} }
serviceSubscribesMap.get(serviceKey).add(subscriber); serviceSubscribesMap.get(serviceKey).add(subscriber);
subscribeConnectionMap.put(subscriber, connectionId);
} }
/** /**
@ -79,6 +83,21 @@ public class RemotePushService implements ApplicationListener<ServiceChangeEvent
return; return;
} }
serviceSubscribesMap.get(serviceKey).remove(subscriber); serviceSubscribesMap.get(serviceKey).remove(subscriber);
subscribeConnectionMap.remove(subscriber);
}
/**
* Remove All subscribe for service.
*
* @param serviceKey service key
*/
public void removeAllSubscribeForService(String serviceKey) {
Set<Subscriber> subscribers = serviceSubscribesMap.remove(serviceKey);
if (null != subscribers) {
for (Subscriber each : subscribers) {
subscribeConnectionMap.remove(each);
}
}
} }
public Set<Subscriber> getSubscribes(String namespaceId, String serviceName) { public Set<Subscriber> getSubscribes(String namespaceId, String serviceName) {
@ -95,6 +114,8 @@ public class RemotePushService implements ApplicationListener<ServiceChangeEvent
String serviceKey = UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()); String serviceKey = UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName());
ServiceInfo serviceInfo = serviceInfoGenerator ServiceInfo serviceInfo = serviceInfoGenerator
.generateServiceInfo(service, StringUtils.EMPTY, false, StringUtils.EMPTY); .generateServiceInfo(service, StringUtils.EMPTY, false, StringUtils.EMPTY);
notifier.serviceInfoChanged(serviceKey, NotifySubscriberResponse.buildSuccessResponse(serviceInfo)); for (Subscriber each : serviceSubscribesMap.getOrDefault(serviceKey, new HashSet<>())) {
notifier.push(subscribeConnectionMap.get(each), NotifySubscriberResponse.buildSuccessResponse(serviceInfo));
}
} }
} }

View File

@ -0,0 +1,118 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote;
import com.alibaba.nacos.api.remote.connection.Connection;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Remoting Connection Instance.
*
* @author xiweng.yy
*/
public class RemotingConnection {
private final ConcurrentMap<String, Set<Subscriber>> subscriberIndex = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Set<Instance>> instanceIndex = new ConcurrentHashMap<>();
private final Connection connection;
public RemotingConnection(Connection connection) {
this.connection = connection;
}
public String getConnectionId() {
return connection.getConnectionId();
}
/**
* Add new instance.
*
* @param namespaceId namespace Id
* @param fullServiceName full service name with group name
* @param instance instance
*/
public void addNewInstance(String namespaceId, String fullServiceName, Instance instance) {
String key = KeyBuilder.buildServiceMetaKey(namespaceId, fullServiceName);
if (!instanceIndex.containsKey(key)) {
instanceIndex.put(key, new ConcurrentHashSet<>());
}
instanceIndex.get(key).add(instance);
}
/**
* Remove instance.
*
* @param namespaceId namespace Id
* @param fullServiceName full service name with group name
* @param instance instance
*/
public void removeInstance(String namespaceId, String fullServiceName, Instance instance) {
String key = KeyBuilder.buildServiceMetaKey(namespaceId, fullServiceName);
if (!instanceIndex.containsKey(key)) {
return;
}
instanceIndex.get(key).remove(instance);
}
/**
* Add new subscriber.
*
* @param namespaceId namespace Id
* @param fullServiceName full service name with group name
* @param subscriber subscriber
*/
public void addNewSubscriber(String namespaceId, String fullServiceName, Subscriber subscriber) {
String key = UtilsAndCommons.assembleFullServiceName(namespaceId, fullServiceName);
if (!subscriberIndex.containsKey(key)) {
subscriberIndex.put(key, new ConcurrentHashSet<>());
}
subscriberIndex.get(key).add(subscriber);
}
/**
* Remove subscriber.
*
* @param namespaceId namespace Id
* @param fullServiceName full service name with group name
* @param subscriber subscriber
*/
public void removeSubscriber(String namespaceId, String fullServiceName, Subscriber subscriber) {
String key = UtilsAndCommons.assembleFullServiceName(namespaceId, fullServiceName);
if (!subscriberIndex.containsKey(key)) {
return;
}
subscriberIndex.get(key).remove(subscriber);
}
public ConcurrentMap<String, Set<Subscriber>> getSubscriberIndex() {
return subscriberIndex;
}
public ConcurrentMap<String, Set<Instance>> getInstanceIndex() {
return instanceIndex;
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.connection.Connection;
import com.alibaba.nacos.core.remote.ClientConnectionEventListener;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.push.RemotePushService;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Remoting connection holder.
*
* @author xiweng.yy
*/
@Component
public class RemotingConnectionHolder extends ClientConnectionEventListener {
private final ConcurrentMap<String, RemotingConnection> connectionCache = new ConcurrentHashMap<>();
private final RemotePushService remotePushService;
private final ServiceManager serviceManager;
public RemotingConnectionHolder(RemotePushService remotePushService, ServiceManager serviceManager) {
this.remotePushService = remotePushService;
this.serviceManager = serviceManager;
}
@Override
public void clientConnected(Connection connect) {
connectionCache.put(connect.getConnectionId(), new RemotingConnection(connect));
}
@Override
public void clientDisConnected(Connection connect) {
RemotingConnection remotingConnection = connectionCache.remove(connect.getConnectionId());
try {
for (String each : remotingConnection.getInstanceIndex().keySet()) {
Set<Instance> instances = remotingConnection.getInstanceIndex().get(each);
serviceManager.removeInstance(KeyBuilder.getNamespace(each), KeyBuilder.getServiceName(each), true,
instances.toArray(new Instance[instances.size()]));
}
for (String each : remotingConnection.getSubscriberIndex().keySet()) {
remotePushService.removeAllSubscribeForService(each);
}
} catch (NacosException e) {
Loggers.SRV_LOG
.error(String.format("Remove context of connection %s failed", connect.getConnectionId()), e);
}
}
public RemotingConnection getRemotingConnection(String connectionId) {
return connectionCache.get(connectionId);
}
}

View File

@ -31,8 +31,8 @@ import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.ServiceManager; import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.remote.RemotingConnectionHolder;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
@ -45,8 +45,14 @@ import java.util.List;
@Component @Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest> { public class InstanceRequestHandler extends RequestHandler<InstanceRequest> {
@Autowired private final ServiceManager serviceManager;
private ServiceManager serviceManager;
private final RemotingConnectionHolder remotingConnectionHolder;
public InstanceRequestHandler(ServiceManager serviceManager, RemotingConnectionHolder remotingConnectionHolder) {
this.serviceManager = serviceManager;
this.remotingConnectionHolder = remotingConnectionHolder;
}
@Override @Override
public InstanceRequest parseBodyString(String bodyString) { public InstanceRequest parseBodyString(String bodyString) {
@ -57,7 +63,8 @@ public class InstanceRequestHandler extends RequestHandler<InstanceRequest> {
public Response handle(Request request, RequestMeta meta) throws NacosException { public Response handle(Request request, RequestMeta meta) throws NacosException {
InstanceRequest instanceRequest = (InstanceRequest) request; InstanceRequest instanceRequest = (InstanceRequest) request;
String namespace = instanceRequest.getNamespace(); String namespace = instanceRequest.getNamespace();
String serviceName = NamingUtils.getGroupedName(instanceRequest.getServiceName(), instanceRequest.getGroupName()); String serviceName = NamingUtils
.getGroupedName(instanceRequest.getServiceName(), instanceRequest.getGroupName());
switch (instanceRequest.getType()) { switch (instanceRequest.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE: case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(namespace, serviceName, instanceRequest, meta); return registerInstance(namespace, serviceName, instanceRequest, meta);
@ -82,6 +89,8 @@ public class InstanceRequestHandler extends RequestHandler<InstanceRequest> {
instance.setMarked(true); instance.setMarked(true);
instance.validate(); instance.validate();
serviceManager.addInstance(namespace, serviceName, instance.isEphemeral(), instance); serviceManager.addInstance(namespace, serviceName, instance.isEphemeral(), instance);
remotingConnectionHolder.getRemotingConnection(meta.getConnectionId())
.addNewInstance(namespace, serviceName, instance);
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE); return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
} }
@ -93,6 +102,8 @@ public class InstanceRequestHandler extends RequestHandler<InstanceRequest> {
} }
Instance instance = parseInstance(instanceRequest.getInstance()); Instance instance = parseInstance(instanceRequest.getInstance());
serviceManager.removeInstance(namespace, serviceName, instance.isEphemeral(), instance); serviceManager.removeInstance(namespace, serviceName, instance.isEphemeral(), instance);
remotingConnectionHolder.getRemotingConnection(meta.getConnectionId())
.removeInstance(namespace, serviceName, instance);
return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE); return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
} }

View File

@ -27,13 +27,12 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode; import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.remote.AsyncListenContext;
import com.alibaba.nacos.core.remote.NacosRemoteConstants;
import com.alibaba.nacos.core.remote.RequestHandler; import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.naming.core.ServiceInfoGenerator; import com.alibaba.nacos.naming.core.ServiceInfoGenerator;
import com.alibaba.nacos.naming.misc.UtilsAndCommons; import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.RemotePushService; import com.alibaba.nacos.naming.push.RemotePushService;
import com.alibaba.nacos.naming.remote.RemotingConnectionHolder;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -48,17 +47,17 @@ import java.util.List;
@Component @Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest> { public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest> {
private final AsyncListenContext asyncListenContext;
private final ServiceInfoGenerator serviceInfoGenerator; private final ServiceInfoGenerator serviceInfoGenerator;
private final RemotePushService remotePushService; private final RemotePushService remotePushService;
public SubscribeServiceRequestHandler(AsyncListenContext asyncListenContext, private final RemotingConnectionHolder remotingConnectionHolder;
ServiceInfoGenerator serviceInfoGenerator, RemotePushService remotePushService) {
this.asyncListenContext = asyncListenContext; public SubscribeServiceRequestHandler(ServiceInfoGenerator serviceInfoGenerator,
RemotePushService remotePushService, RemotingConnectionHolder remotingConnectionHolder) {
this.serviceInfoGenerator = serviceInfoGenerator; this.serviceInfoGenerator = serviceInfoGenerator;
this.remotePushService = remotePushService; this.remotePushService = remotePushService;
this.remotingConnectionHolder = remotingConnectionHolder;
} }
@Override @Override
@ -78,11 +77,13 @@ public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServ
Subscriber subscriber = new Subscriber(meta.getClientIp(), "", "unknown", meta.getClientIp(), namespaceId, Subscriber subscriber = new Subscriber(meta.getClientIp(), "", "unknown", meta.getClientIp(), namespaceId,
serviceName); serviceName);
if (subscribeServiceRequest.isSubscribe()) { if (subscribeServiceRequest.isSubscribe()) {
asyncListenContext.addListen(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey, connectionId); remotePushService.registerSubscribeForService(serviceKey, subscriber, connectionId);
remotePushService.registerSubscribeForService(serviceKey, subscriber); remotingConnectionHolder.getRemotingConnection(connectionId)
.addNewSubscriber(namespaceId, serviceName, subscriber);
} else { } else {
asyncListenContext.removeListen(NacosRemoteConstants.LISTEN_CONTEXT_NAMING, serviceKey, connectionId);
remotePushService.removeSubscribeForService(serviceKey, subscriber); remotePushService.removeSubscribeForService(serviceKey, subscriber);
remotingConnectionHolder.getRemotingConnection(connectionId)
.removeSubscriber(namespaceId, serviceName, subscriber);
} }
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo); return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
} }