Remove http data change (#10043)
* Remove http way to report data change * Remove http controller method to receive data change
This commit is contained in:
parent
74b81be92c
commit
920fd4c211
@ -22,8 +22,6 @@ import com.alibaba.nacos.config.server.constant.Constants;
|
||||
import com.alibaba.nacos.config.server.model.SampleResult;
|
||||
import com.alibaba.nacos.config.server.remote.ConfigChangeListenContext;
|
||||
import com.alibaba.nacos.config.server.service.LongPollingService;
|
||||
import com.alibaba.nacos.config.server.service.dump.DumpService;
|
||||
import com.alibaba.nacos.config.server.service.notify.NotifyService;
|
||||
import com.alibaba.nacos.config.server.utils.GroupKey2;
|
||||
import com.alibaba.nacos.core.remote.Connection;
|
||||
import com.alibaba.nacos.core.remote.ConnectionManager;
|
||||
@ -50,44 +48,19 @@ import java.util.Set;
|
||||
@RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH)
|
||||
public class CommunicationController {
|
||||
|
||||
private final DumpService dumpService;
|
||||
|
||||
private final LongPollingService longPollingService;
|
||||
|
||||
private final ConfigChangeListenContext configChangeListenContext;
|
||||
|
||||
private final ConnectionManager connectionManager;
|
||||
|
||||
public CommunicationController(DumpService dumpService, LongPollingService longPollingService,
|
||||
public CommunicationController(LongPollingService longPollingService,
|
||||
ConfigChangeListenContext configChangeListenContext, ConnectionManager connectionManager) {
|
||||
this.dumpService = dumpService;
|
||||
this.longPollingService = longPollingService;
|
||||
this.configChangeListenContext = configChangeListenContext;
|
||||
this.connectionManager = connectionManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify the change of config information.
|
||||
*/
|
||||
@GetMapping("/dataChange")
|
||||
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
|
||||
@RequestParam("group") String group,
|
||||
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
|
||||
@RequestParam(value = "tag", required = false) String tag) {
|
||||
dataId = dataId.trim();
|
||||
group = group.trim();
|
||||
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
|
||||
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
|
||||
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
|
||||
String isBetaStr = request.getHeader("isBeta");
|
||||
if (StringUtils.isNotBlank(isBetaStr) && Boolean.parseBoolean(isBetaStr)) {
|
||||
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
|
||||
} else {
|
||||
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get client config information of subscriber in local machine.
|
||||
*/
|
||||
|
@ -20,16 +20,9 @@ import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSy
|
||||
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
|
||||
import com.alibaba.nacos.api.remote.RequestCallBack;
|
||||
import com.alibaba.nacos.api.utils.NetUtils;
|
||||
import com.alibaba.nacos.auth.util.AuthHeaderUtil;
|
||||
import com.alibaba.nacos.common.http.Callback;
|
||||
import com.alibaba.nacos.common.http.client.NacosAsyncRestTemplate;
|
||||
import com.alibaba.nacos.common.http.param.Header;
|
||||
import com.alibaba.nacos.common.http.param.Query;
|
||||
import com.alibaba.nacos.common.model.RestResult;
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.notify.listener.Subscriber;
|
||||
import com.alibaba.nacos.config.server.constant.Constants;
|
||||
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
|
||||
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
|
||||
import com.alibaba.nacos.config.server.remote.ConfigClusterRpcClientProxy;
|
||||
@ -38,19 +31,13 @@ import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
|
||||
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
|
||||
import com.alibaba.nacos.config.server.utils.LogUtil;
|
||||
import com.alibaba.nacos.core.cluster.Member;
|
||||
import com.alibaba.nacos.core.cluster.MemberUtil;
|
||||
import com.alibaba.nacos.core.cluster.ServerMemberManager;
|
||||
import com.alibaba.nacos.sys.env.EnvUtil;
|
||||
import com.alibaba.nacos.sys.utils.InetUtils;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.text.MessageFormat;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
@ -67,8 +54,6 @@ public class AsyncNotifyService {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class);
|
||||
|
||||
private final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientManager.getNacosAsyncRestTemplate();
|
||||
|
||||
private static final int MIN_RETRY_INTERVAL = 500;
|
||||
|
||||
private static final int INCREASE_STEPS = 1000;
|
||||
@ -109,20 +94,12 @@ public class AsyncNotifyService {
|
||||
Collection<Member> ipList = memberManager.allMembers();
|
||||
|
||||
// In fact, any type of queue here can be
|
||||
Queue<NotifySingleTask> httpQueue = new LinkedList<>();
|
||||
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();
|
||||
|
||||
for (Member member : ipList) {
|
||||
if (!MemberUtil.isSupportedLongCon(member)) {
|
||||
httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
|
||||
evt.isBeta));
|
||||
} else {
|
||||
rpcQueue.add(
|
||||
new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
|
||||
}
|
||||
}
|
||||
if (!httpQueue.isEmpty()) {
|
||||
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
|
||||
// grpc report data change only
|
||||
rpcQueue.add(
|
||||
new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
|
||||
}
|
||||
if (!rpcQueue.isEmpty()) {
|
||||
ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
|
||||
@ -138,52 +115,6 @@ public class AsyncNotifyService {
|
||||
});
|
||||
}
|
||||
|
||||
class AsyncTask implements Runnable {
|
||||
|
||||
private Queue<NotifySingleTask> queue;
|
||||
|
||||
private NacosAsyncRestTemplate restTemplate;
|
||||
|
||||
public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {
|
||||
this.restTemplate = restTemplate;
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
executeAsyncInvoke();
|
||||
}
|
||||
|
||||
private void executeAsyncInvoke() {
|
||||
while (!queue.isEmpty()) {
|
||||
NotifySingleTask task = queue.poll();
|
||||
String targetIp = task.getTargetIP();
|
||||
if (memberManager.hasMember(targetIp)) {
|
||||
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
|
||||
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
|
||||
if (unHealthNeedDelay) {
|
||||
// target ip is unhealthy, then put it in the notification list
|
||||
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
|
||||
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
|
||||
0, task.target);
|
||||
// get delay time and set fail count to the task
|
||||
asyncTaskExecute(task);
|
||||
} else {
|
||||
Header header = Header.newInstance();
|
||||
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
|
||||
String.valueOf(task.getLastModified()));
|
||||
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
|
||||
if (task.isBeta) {
|
||||
header.addParam("isBeta", "true");
|
||||
}
|
||||
AuthHeaderUtil.addIdentityToHeader(header);
|
||||
restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class AsyncRpcTask implements Runnable {
|
||||
|
||||
private Queue<NotifySingleRpcTask> queue;
|
||||
@ -227,19 +158,14 @@ public class AsyncNotifyService {
|
||||
// get delay time and set fail count to the task
|
||||
asyncTaskExecute(task);
|
||||
} else {
|
||||
|
||||
if (!MemberUtil.isSupportedLongCon(member)) {
|
||||
asyncTaskExecute(
|
||||
new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
|
||||
task.getLastModified(), member.getAddress(), task.isBeta));
|
||||
} else {
|
||||
try {
|
||||
configClusterRpcClientProxy
|
||||
.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
|
||||
} catch (Exception e) {
|
||||
MetricsMonitor.getConfigNotifyException().increment();
|
||||
asyncTaskExecute(task);
|
||||
}
|
||||
|
||||
// grpc report data change only
|
||||
try {
|
||||
configClusterRpcClientProxy
|
||||
.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
|
||||
} catch (Exception e) {
|
||||
MetricsMonitor.getConfigNotifyException().increment();
|
||||
asyncTaskExecute(task);
|
||||
}
|
||||
|
||||
}
|
||||
@ -268,14 +194,6 @@ public class AsyncNotifyService {
|
||||
}
|
||||
}
|
||||
|
||||
private void asyncTaskExecute(NotifySingleTask task) {
|
||||
int delay = getDelayTime(task);
|
||||
Queue<NotifySingleTask> queue = new LinkedList<>();
|
||||
queue.add(task);
|
||||
AsyncTask asyncTask = new AsyncTask(nacosAsyncRestTemplate, queue);
|
||||
ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private void asyncTaskExecute(NotifySingleRpcTask task) {
|
||||
int delay = getDelayTime(task);
|
||||
Queue<NotifySingleRpcTask> queue = new LinkedList<>();
|
||||
@ -284,74 +202,6 @@ public class AsyncNotifyService {
|
||||
ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
class AsyncNotifyCallBack implements Callback<String> {
|
||||
|
||||
private NotifySingleTask task;
|
||||
|
||||
public AsyncNotifyCallBack(NotifySingleTask task) {
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(RestResult<String> result) {
|
||||
|
||||
long delayed = System.currentTimeMillis() - task.getLastModified();
|
||||
|
||||
if (result.ok()) {
|
||||
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
|
||||
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_OK, delayed,
|
||||
task.target);
|
||||
} else {
|
||||
LOGGER.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", task.target, task.getDataId(),
|
||||
task.getGroup(), task.getLastModified(), result.getCode());
|
||||
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
|
||||
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_ERROR, delayed,
|
||||
task.target);
|
||||
|
||||
//get delay time and set fail count to the task
|
||||
asyncTaskExecute(task);
|
||||
|
||||
LogUtil.NOTIFY_LOG
|
||||
.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(),
|
||||
task.getGroup(), task.getLastModified());
|
||||
|
||||
MetricsMonitor.getConfigNotifyException().increment();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable ex) {
|
||||
|
||||
long delayed = System.currentTimeMillis() - task.getLastModified();
|
||||
LOGGER.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", task.target, task.getDataId(),
|
||||
task.getGroup(), task.getLastModified(), ex);
|
||||
ConfigTraceService
|
||||
.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(),
|
||||
InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed, task.target);
|
||||
|
||||
//get delay time and set fail count to the task
|
||||
asyncTaskExecute(task);
|
||||
LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(),
|
||||
task.getGroup(), task.getLastModified());
|
||||
|
||||
MetricsMonitor.getConfigNotifyException().increment();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCancel() {
|
||||
|
||||
LogUtil.NOTIFY_LOG.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}", task.target,
|
||||
task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED");
|
||||
|
||||
//get delay time and set fail count to the task
|
||||
asyncTaskExecute(task);
|
||||
LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(),
|
||||
task.getGroup(), task.getLastModified());
|
||||
|
||||
MetricsMonitor.getConfigNotifyException().increment();
|
||||
}
|
||||
}
|
||||
|
||||
class AsyncRpcNotifyCallBack implements RequestCallBack<ConfigChangeClusterSyncResponse> {
|
||||
|
||||
private NotifySingleRpcTask task;
|
||||
@ -413,72 +263,7 @@ public class AsyncNotifyService {
|
||||
MetricsMonitor.getConfigNotifyException().increment();
|
||||
}
|
||||
}
|
||||
|
||||
static class NotifySingleTask extends NotifyTask {
|
||||
|
||||
private String target;
|
||||
|
||||
private String url;
|
||||
|
||||
private boolean isBeta;
|
||||
|
||||
private static final String URL_PATTERN =
|
||||
"http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange" + "?dataId={2}&group={3}";
|
||||
|
||||
private static final String URL_PATTERN_TENANT =
|
||||
"http://{0}{1}" + Constants.COMMUNICATION_CONTROLLER_PATH + "/dataChange"
|
||||
+ "?dataId={2}&group={3}&tenant={4}";
|
||||
|
||||
private int failCount;
|
||||
|
||||
public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target) {
|
||||
this(dataId, group, tenant, lastModified, target, false);
|
||||
}
|
||||
|
||||
public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target,
|
||||
boolean isBeta) {
|
||||
this(dataId, group, tenant, null, lastModified, target, isBeta);
|
||||
}
|
||||
|
||||
public NotifySingleTask(String dataId, String group, String tenant, String tag, long lastModified,
|
||||
String target, boolean isBeta) {
|
||||
super(dataId, group, tenant, lastModified);
|
||||
this.target = target;
|
||||
this.isBeta = isBeta;
|
||||
try {
|
||||
dataId = URLEncoder.encode(dataId, Constants.ENCODE);
|
||||
group = URLEncoder.encode(group, Constants.ENCODE);
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
LOGGER.error("URLEncoder encode error", e);
|
||||
}
|
||||
if (StringUtils.isBlank(tenant)) {
|
||||
this.url = MessageFormat.format(URL_PATTERN, target, EnvUtil.getContextPath(), dataId, group);
|
||||
} else {
|
||||
this.url = MessageFormat
|
||||
.format(URL_PATTERN_TENANT, target, EnvUtil.getContextPath(), dataId, group, tenant);
|
||||
}
|
||||
if (StringUtils.isNotEmpty(tag)) {
|
||||
url = url + "&tag=" + tag;
|
||||
}
|
||||
failCount = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFailCount(int count) {
|
||||
this.failCount = count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFailCount() {
|
||||
return failCount;
|
||||
}
|
||||
|
||||
public String getTargetIP() {
|
||||
return target;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get delayTime and also set failCount to task; The failure time index increases, so as not to retry invalid tasks
|
||||
* in the offline scene, which affects the normal synchronization.
|
||||
|
Loading…
Reference in New Issue
Block a user