diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java index d48f411d0..5df6ef3ab 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java @@ -16,11 +16,14 @@ package com.alibaba.nacos.config.server.service.notify; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.monitor.MetricsMonitor; +import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.service.ServerListService; import com.alibaba.nacos.config.server.service.trace.ConfigTraceService; -import com.alibaba.nacos.config.server.utils.*; +import com.alibaba.nacos.config.server.utils.LogUtil; +import com.alibaba.nacos.config.server.utils.PropertyUtil; +import com.alibaba.nacos.config.server.utils.RunningConfigUtils; +import com.alibaba.nacos.config.server.utils.StringUtils; import com.alibaba.nacos.config.server.utils.event.EventDispatcher.AbstractEventListener; import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event; import org.apache.http.HttpResponse; @@ -68,7 +71,7 @@ public class AsyncNotifyService extends AbstractEventListener { // 并发产生 ConfigDataChangeEvent if (event instanceof ConfigDataChangeEvent) { - ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event; + ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; @@ -79,9 +82,9 @@ public class AsyncNotifyService extends AbstractEventListener { // 其实这里任何类型队列都可以 Queue queue = new LinkedList(); for (int i = 0; i < ipList.size(); i++) { - queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String)ipList.get(i), evt.isBeta)); + queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta)); } - EXCUTOR.execute(new AsyncTask(httpclient, queue)); + EXECUTOR.execute(new AsyncTask(httpclient, queue)); } } @@ -92,11 +95,11 @@ public class AsyncNotifyService extends AbstractEventListener { } public Executor getExecutor() { - return EXCUTOR; + return EXECUTOR; } @SuppressWarnings("PMD.ThreadPoolCreationRule") - private static final Executor EXCUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory()); + private static final Executor EXECUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory()); private RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(PropertyUtil.getNotifyConnectTimeout()) @@ -105,7 +108,7 @@ public class AsyncNotifyService extends AbstractEventListener { private CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom() .setDefaultRequestConfig(requestConfig).build(); - static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class); + private static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class); private ServerListService serverListService; @@ -118,15 +121,11 @@ public class AsyncNotifyService extends AbstractEventListener { @Override public void run() { - executeAsyncInvoke(); - } private void executeAsyncInvoke() { - while (!queue.isEmpty()) { - NotifySingleTask task = queue.poll(); String targetIp = task.getTargetIP(); if (serverListService.getServerList().contains( @@ -139,11 +138,7 @@ public class AsyncNotifyService extends AbstractEventListener { task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task - int delay = getDelayTime(task); - Queue queue = new LinkedList(); - queue.add(task); - AsyncTask asyncTask = new AsyncTask(httpclient, queue); - ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); + asyncTaskExecute(task); } else { HttpGet request = new HttpGet(task.url); request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, @@ -152,7 +147,7 @@ public class AsyncNotifyService extends AbstractEventListener { if (task.isBeta) { request.setHeader("isBeta", "true"); } - httpclient.execute(request, new AyscNotifyCallBack(httpclient, task)); + httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task)); } } } @@ -163,12 +158,20 @@ public class AsyncNotifyService extends AbstractEventListener { } - class AyscNotifyCallBack implements FutureCallback { + private void asyncTaskExecute(NotifySingleTask task) { + int delay = getDelayTime(task); + Queue queue = new LinkedList(); + queue.add(task); + AsyncTask asyncTask = new AsyncTask(httpclient, queue); + ((ScheduledThreadPoolExecutor) EXECUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); + } - public AyscNotifyCallBack(CloseableHttpAsyncClient httpclient, NotifySingleTask task - ) { + + class AsyncNotifyCallBack implements FutureCallback { + + public AsyncNotifyCallBack(CloseableHttpAsyncClient httpClient, NotifySingleTask task) { this.task = task; - this.httpclient = httpclient; + this.httpClient = httpClient; } @Override @@ -183,31 +186,19 @@ public class AsyncNotifyService extends AbstractEventListener { ConfigTraceService.NOTIFY_EVENT_OK, delayed, task.target); } else { - log.error("[notify-error] {}, {}, to {}, result {}", - new Object[] {task.getDataId(), task.getGroup(), - task.target, - response.getStatusLine().getStatusCode()}); + log.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified(), response.getStatusLine().getStatusCode()); ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_ERROR, delayed, task.target); - //get delay time and set fail count to the task - int delay = getDelayTime(task); + asyncTaskExecute(task); - Queue queue = new LinkedList(); - - queue.add(task); - AsyncTask asyncTask = new AsyncTask(httpclient, queue); - - ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); - - LogUtil.notifyLog.error( - "[notify-retry] target:{} dataid:{} group:{} ts:{}", - new Object[] {task.target, task.getDataId(), - task.getGroup(), task.getLastModified()}); + LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified()); MetricsMonitor.getConfigNotifyException().increment(); } @@ -218,10 +209,8 @@ public class AsyncNotifyService extends AbstractEventListener { public void failed(Exception ex) { long delayed = System.currentTimeMillis() - task.getLastModified(); - log.error("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", " - + ex.toString()); - log.debug("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", " - + ex.toString(), ex); + log.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified(), ex.toString()); ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, @@ -229,17 +218,9 @@ public class AsyncNotifyService extends AbstractEventListener { task.target); //get delay time and set fail count to the task - int delay = getDelayTime(task); - Queue queue = new LinkedList(); - - queue.add(task); - AsyncTask asyncTask = new AsyncTask(httpclient, queue); - - ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); - LogUtil.notifyLog.error( - "[notify-retry] target:{} dataid:{} group:{} ts:{}", - new Object[] {task.target, task.getDataId(), - task.getGroup(), task.getLastModified()}); + asyncTaskExecute(task); + LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified()); MetricsMonitor.getConfigNotifyException().increment(); } @@ -247,30 +228,19 @@ public class AsyncNotifyService extends AbstractEventListener { @Override public void cancelled() { - LogUtil.notifyLog.error( - "[notify-exception] target:{} dataid:{} group:{} ts:{}", - new Object[] {task.target, task.getGroup(), - task.getGroup(), task.getLastModified()}, - "CANCELED"); + LogUtil.notifyLog.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED"); //get delay time and set fail count to the task - int delay = getDelayTime(task); - Queue queue = new LinkedList(); - - queue.add(task); - AsyncTask asyncTask = new AsyncTask(httpclient, queue); - - ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); - LogUtil.notifyLog.error( - "[notify-retry] target:{} dataid:{} group:{} ts:{}", - new Object[] {task.target, task.getDataId(), - task.getGroup(), task.getLastModified()}); + asyncTaskExecute(task); + LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified()); MetricsMonitor.getConfigNotifyException().increment(); } private NotifySingleTask task; - private CloseableHttpAsyncClient httpclient; + private CloseableHttpAsyncClient httpClient; } static class NotifySingleTask extends NotifyTask { @@ -339,8 +309,7 @@ public class AsyncNotifyService extends AbstractEventListener { @Override public Thread newThread(Runnable r) { - Thread thread = new Thread(r, - "com.alibaba.nacos.AsyncNotifyServiceThread"); + Thread thread = new Thread(r, "com.alibaba.nacos.AsyncNotifyServiceThread"); thread.setDaemon(true); return thread; } @@ -354,15 +323,15 @@ public class AsyncNotifyService extends AbstractEventListener { */ private static int getDelayTime(NotifySingleTask task) { int failCount = task.getFailCount(); - int delay = MINRETRYINTERVAL + failCount * failCount * INCREASESTEPS; - if (failCount <= MAXCOUNT) { + int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS; + if (failCount <= MAX_COUNT) { task.setFailCount(failCount + 1); } return delay; } - private static int MINRETRYINTERVAL = 500; - private static int INCREASESTEPS = 1000; - private static int MAXCOUNT = 6; + private static int MIN_RETRY_INTERVAL = 500; + private static int INCREASE_STEPS = 1000; + private static int MAX_COUNT = 6; }