Wrapper push data make executors can do operation. (#4938)

This commit is contained in:
杨翊 SionYang 2021-02-24 14:12:12 +08:00 committed by GitHub
parent 8bc443ea48
commit 3550d8e5e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 111 additions and 38 deletions

View File

@ -0,0 +1,52 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.push.v2;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
* Nacos push data wrapper.
*
* @author xiweng.yy
*/
public class PushDataWrapper {
private final ServiceInfo originalData;
private final Map<String, Object> processedDatum;
public PushDataWrapper(ServiceInfo originalData) {
this.originalData = originalData;
processedDatum = new HashMap<>(1);
}
public ServiceInfo getOriginalData() {
return originalData;
}
public <T> Optional<T> getProcessedPushData(String key) {
return Optional.ofNullable((T) processedDatum.get(key));
}
public void addProcessedPushData(String key, Object processedData) {
processedDatum.put(key, processedData);
}
}

View File

@ -16,9 +16,9 @@
package com.alibaba.nacos.naming.push.v2.executor;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
/**
* Nacos naming push executor for v2.
@ -34,7 +34,7 @@ public interface PushExecutor {
* @param subscriber subscriber
* @param data push data
*/
void doPush(String clientId, Subscriber subscriber, ServiceInfo data);
void doPush(String clientId, Subscriber subscriber, PushDataWrapper data);
/**
* Do push with callback.
@ -44,5 +44,5 @@ public interface PushExecutor {
* @param data push data
* @param callBack callback
*/
void doPushWithCallback(String clientId, Subscriber subscriber, ServiceInfo data, PushCallBack callBack);
void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack);
}

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import org.springframework.stereotype.Component;
import java.util.Optional;
@ -43,12 +44,12 @@ public class PushExecutorDelegate implements PushExecutor {
}
@Override
public void doPush(String clientId, Subscriber subscriber, ServiceInfo data) {
public void doPush(String clientId, Subscriber subscriber, PushDataWrapper data) {
getPushExecuteService(clientId, subscriber).doPush(clientId, subscriber, data);
}
@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, ServiceInfo data, PushCallBack callBack) {
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {
getPushExecuteService(clientId, subscriber).doPushWithCallback(clientId, subscriber, data, callBack);
}

View File

@ -16,12 +16,12 @@
package com.alibaba.nacos.naming.push.v2.executor;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.request.NotifySubscriberRequest;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.core.remote.RpcPushService;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import org.springframework.stereotype.Component;
/**
@ -39,13 +39,14 @@ public class PushExecutorRpcImpl implements PushExecutor {
}
@Override
public void doPush(String clientId, Subscriber subscriber, ServiceInfo data) {
pushService.pushWithoutAck(clientId, NotifySubscriberRequest.buildSuccessResponse(data));
public void doPush(String clientId, Subscriber subscriber, PushDataWrapper data) {
pushService.pushWithoutAck(clientId, NotifySubscriberRequest.buildSuccessResponse(data.getOriginalData()));
}
@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, ServiceInfo data, PushCallBack callBack) {
pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildSuccessResponse(data), callBack,
GlobalExecutor.getCallbackExecutor());
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
PushCallBack callBack) {
pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildSuccessResponse(data.getOriginalData()),
callBack, GlobalExecutor.getCallbackExecutor());
}
}

View File

@ -19,10 +19,15 @@ package com.alibaba.nacos.naming.push.v2.executor;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import com.alibaba.nacos.naming.utils.ServiceUtil;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* Push execute service for udp.
*
@ -31,6 +36,8 @@ import org.springframework.stereotype.Component;
@Component
public class PushExecutorUdpImpl implements PushExecutor {
private static final String UDP_PUSH_DATA_FOR_V1 = "udpPushDataForV1";
private final UdpPushService pushService;
public PushExecutorUdpImpl(UdpPushService pushService) {
@ -38,13 +45,15 @@ public class PushExecutorUdpImpl implements PushExecutor {
}
@Override
public void doPush(String clientId, Subscriber subscriber, ServiceInfo data) {
pushService.pushDataWithoutCallback(subscriber, replaceServiceInfoName(data));
public void doPush(String clientId, Subscriber subscriber, PushDataWrapper data) {
pushService.pushDataWithoutCallback(subscriber, handleClusterData(replaceServiceInfoName(data), subscriber));
}
@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, ServiceInfo data, PushCallBack callBack) {
pushService.pushDataWithCallback(subscriber, replaceServiceInfoName(data), callBack);
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
PushCallBack callBack) {
pushService.pushDataWithCallback(subscriber, handleClusterData(replaceServiceInfoName(data), subscriber),
callBack);
}
/**
@ -57,16 +66,36 @@ public class PushExecutorUdpImpl implements PushExecutor {
* name directly will has some effect for 2.x client.
* </p>
*
* @param serviceInfo original service info
* @param originalData original service info
* @return new service info for 1.x
*/
private ServiceInfo replaceServiceInfoName(ServiceInfo serviceInfo) {
private ServiceInfo replaceServiceInfoName(PushDataWrapper originalData) {
Optional<ServiceInfo> original = originalData.getProcessedPushData(UDP_PUSH_DATA_FOR_V1);
if (original.isPresent()) {
return original.get();
}
ServiceInfo serviceInfo = originalData.getOriginalData();
ServiceInfo result = new ServiceInfo();
result.setName(NamingUtils.getGroupedName(serviceInfo.getName(), serviceInfo.getGroupName()));
result.setClusters(serviceInfo.getClusters());
result.setHosts(serviceInfo.getHosts());
result.setLastRefTime(serviceInfo.getLastRefTime());
result.setCacheMillis(serviceInfo.getCacheMillis());
originalData.addProcessedPushData(UDP_PUSH_DATA_FOR_V1, result);
return result;
}
/**
* For adapt push cluster feature for v1.x.
*
* @param data original data
* @param subscriber subscriber information
* @return cluster filtered data
* @deprecated Will be removed after client can filter cluster
*/
@Deprecated
private ServiceInfo handleClusterData(ServiceInfo data, Subscriber subscriber) {
return StringUtils.isBlank(subscriber.getCluster()) ? data
: ServiceUtil.selectInstances(data, subscriber.getCluster());
}
}

View File

@ -19,7 +19,6 @@ package com.alibaba.nacos.naming.push.v2.task;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.Loggers;
@ -27,6 +26,7 @@ import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.monitor.NamingTpsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.NoRequiredRetryException;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import com.alibaba.nacos.naming.utils.ServiceUtil;
import java.util.Collection;
@ -53,8 +53,7 @@ public class PushExecuteTask extends AbstractExecuteTask {
@Override
public void run() {
try {
ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
serviceInfo = ServiceUtil.selectInstances(serviceInfo, false, true);
PushDataWrapper wrapper = generatePushData();
for (String each : getTargetClientIds()) {
Client client = delayTaskEngine.getClientManager().getClient(each);
if (null == client) {
@ -62,9 +61,8 @@ public class PushExecuteTask extends AbstractExecuteTask {
continue;
}
Subscriber subscriber = delayTaskEngine.getClientManager().getClient(each).getSubscriber(service);
delayTaskEngine.getPushExecutor()
.doPushWithCallback(each, subscriber, handleClusterData(serviceInfo, subscriber),
new NamingPushCallback(each, subscriber, serviceInfo));
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
@ -72,25 +70,17 @@ public class PushExecuteTask extends AbstractExecuteTask {
}
}
private PushDataWrapper generatePushData() {
ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
serviceInfo = ServiceUtil.selectInstances(serviceInfo, false, true);
return new PushDataWrapper(serviceInfo);
}
private Collection<String> getTargetClientIds() {
return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)
: delayTask.getTargetClients();
}
/**
* For adapt push cluster feature for v1.x.
*
* @param data original data
* @param subscriber subscriber information
* @return cluster filtered data
* @deprecated Will be removed after client can filter cluster
*/
@Deprecated
private ServiceInfo handleClusterData(ServiceInfo data, Subscriber subscriber) {
return StringUtils.isBlank(subscriber.getCluster()) ? data
: ServiceUtil.selectInstances(data, subscriber.getCluster());
}
private class NamingPushCallback implements PushCallBack {
private final String clientId;
@ -151,7 +141,7 @@ public class PushExecuteTask extends AbstractExecuteTask {
NamingTpsMonitor.udpPushSuccess(clientId, ip);
}
}
private void monitorFail(String clientId, String ip, boolean isRpc) {
MetricsMonitor.incrementFailPush();
if (isRpc) {