From 179f810d898504bc29922d7faa99a8b19bab0460 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E7=A6=B9=E5=85=89?= Date: Mon, 1 Jul 2019 11:45:19 +0800 Subject: [PATCH 1/2] bugfix and opt --- .../service/notify/AsyncNotifyService.java | 120 +++++++----------- 1 file changed, 45 insertions(+), 75 deletions(-) diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java index d48f411d0..e94cfbba0 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java @@ -16,11 +16,14 @@ package com.alibaba.nacos.config.server.service.notify; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.monitor.MetricsMonitor; +import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.service.ServerListService; import com.alibaba.nacos.config.server.service.trace.ConfigTraceService; -import com.alibaba.nacos.config.server.utils.*; +import com.alibaba.nacos.config.server.utils.LogUtil; +import com.alibaba.nacos.config.server.utils.PropertyUtil; +import com.alibaba.nacos.config.server.utils.RunningConfigUtils; +import com.alibaba.nacos.config.server.utils.StringUtils; import com.alibaba.nacos.config.server.utils.event.EventDispatcher.AbstractEventListener; import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event; import org.apache.http.HttpResponse; @@ -68,7 +71,7 @@ public class AsyncNotifyService extends AbstractEventListener { // 并发产生 ConfigDataChangeEvent if (event instanceof ConfigDataChangeEvent) { - ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event; + ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; @@ -79,9 +82,9 @@ public class AsyncNotifyService extends AbstractEventListener { // 其实这里任何类型队列都可以 Queue queue = new LinkedList(); for (int i = 0; i < ipList.size(); i++) { - queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String)ipList.get(i), evt.isBeta)); + queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta)); } - EXCUTOR.execute(new AsyncTask(httpclient, queue)); + EXECUTOR.execute(new AsyncTask(httpclient, queue)); } } @@ -92,11 +95,11 @@ public class AsyncNotifyService extends AbstractEventListener { } public Executor getExecutor() { - return EXCUTOR; + return EXECUTOR; } @SuppressWarnings("PMD.ThreadPoolCreationRule") - private static final Executor EXCUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory()); + private static final Executor EXECUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory()); private RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(PropertyUtil.getNotifyConnectTimeout()) @@ -105,7 +108,7 @@ public class AsyncNotifyService extends AbstractEventListener { private CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom() .setDefaultRequestConfig(requestConfig).build(); - static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class); + private static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class); private ServerListService serverListService; @@ -118,15 +121,11 @@ public class AsyncNotifyService extends AbstractEventListener { @Override public void run() { - executeAsyncInvoke(); - } private void executeAsyncInvoke() { - while (!queue.isEmpty()) { - NotifySingleTask task = queue.poll(); String targetIp = task.getTargetIP(); if (serverListService.getServerList().contains( @@ -139,11 +138,7 @@ public class AsyncNotifyService extends AbstractEventListener { task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task - int delay = getDelayTime(task); - Queue queue = new LinkedList(); - queue.add(task); - AsyncTask asyncTask = new AsyncTask(httpclient, queue); - ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); + asyncTaskExecute(task); } else { HttpGet request = new HttpGet(task.url); request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, @@ -152,7 +147,7 @@ public class AsyncNotifyService extends AbstractEventListener { if (task.isBeta) { request.setHeader("isBeta", "true"); } - httpclient.execute(request, new AyscNotifyCallBack(httpclient, task)); + httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task)); } } } @@ -163,9 +158,18 @@ public class AsyncNotifyService extends AbstractEventListener { } - class AyscNotifyCallBack implements FutureCallback { + private void asyncTaskExecute(NotifySingleTask task) { + int delay = getDelayTime(task); + Queue queue = new LinkedList(); + queue.add(task); + AsyncTask asyncTask = new AsyncTask(httpclient, queue); + ((ScheduledThreadPoolExecutor) EXECUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); + } - public AyscNotifyCallBack(CloseableHttpAsyncClient httpclient, NotifySingleTask task + + class AsyncNotifyCallBack implements FutureCallback { + + public AsyncNotifyCallBack(CloseableHttpAsyncClient httpclient, NotifySingleTask task ) { this.task = task; this.httpclient = httpclient; @@ -183,31 +187,19 @@ public class AsyncNotifyService extends AbstractEventListener { ConfigTraceService.NOTIFY_EVENT_OK, delayed, task.target); } else { - log.error("[notify-error] {}, {}, to {}, result {}", - new Object[] {task.getDataId(), task.getGroup(), - task.target, - response.getStatusLine().getStatusCode()}); + log.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified(), response.getStatusLine().getStatusCode()); ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_ERROR, delayed, task.target); - //get delay time and set fail count to the task - int delay = getDelayTime(task); + asyncTaskExecute(task); - Queue queue = new LinkedList(); - - queue.add(task); - AsyncTask asyncTask = new AsyncTask(httpclient, queue); - - ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); - - LogUtil.notifyLog.error( - "[notify-retry] target:{} dataid:{} group:{} ts:{}", - new Object[] {task.target, task.getDataId(), - task.getGroup(), task.getLastModified()}); + LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified()); MetricsMonitor.getConfigNotifyException().increment(); } @@ -218,10 +210,8 @@ public class AsyncNotifyService extends AbstractEventListener { public void failed(Exception ex) { long delayed = System.currentTimeMillis() - task.getLastModified(); - log.error("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", " - + ex.toString()); - log.debug("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", " - + ex.toString(), ex); + log.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified(), ex.toString()); ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, @@ -229,17 +219,9 @@ public class AsyncNotifyService extends AbstractEventListener { task.target); //get delay time and set fail count to the task - int delay = getDelayTime(task); - Queue queue = new LinkedList(); - - queue.add(task); - AsyncTask asyncTask = new AsyncTask(httpclient, queue); - - ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); - LogUtil.notifyLog.error( - "[notify-retry] target:{} dataid:{} group:{} ts:{}", - new Object[] {task.target, task.getDataId(), - task.getGroup(), task.getLastModified()}); + asyncTaskExecute(task); + LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified()); MetricsMonitor.getConfigNotifyException().increment(); } @@ -247,24 +229,13 @@ public class AsyncNotifyService extends AbstractEventListener { @Override public void cancelled() { - LogUtil.notifyLog.error( - "[notify-exception] target:{} dataid:{} group:{} ts:{}", - new Object[] {task.target, task.getGroup(), - task.getGroup(), task.getLastModified()}, - "CANCELED"); + LogUtil.notifyLog.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED"); //get delay time and set fail count to the task - int delay = getDelayTime(task); - Queue queue = new LinkedList(); - - queue.add(task); - AsyncTask asyncTask = new AsyncTask(httpclient, queue); - - ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); - LogUtil.notifyLog.error( - "[notify-retry] target:{} dataid:{} group:{} ts:{}", - new Object[] {task.target, task.getDataId(), - task.getGroup(), task.getLastModified()}); + asyncTaskExecute(task); + LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", + task.target, task.getDataId(), task.getGroup(), task.getLastModified()); MetricsMonitor.getConfigNotifyException().increment(); } @@ -339,8 +310,7 @@ public class AsyncNotifyService extends AbstractEventListener { @Override public Thread newThread(Runnable r) { - Thread thread = new Thread(r, - "com.alibaba.nacos.AsyncNotifyServiceThread"); + Thread thread = new Thread(r, "com.alibaba.nacos.AsyncNotifyServiceThread"); thread.setDaemon(true); return thread; } @@ -354,15 +324,15 @@ public class AsyncNotifyService extends AbstractEventListener { */ private static int getDelayTime(NotifySingleTask task) { int failCount = task.getFailCount(); - int delay = MINRETRYINTERVAL + failCount * failCount * INCREASESTEPS; - if (failCount <= MAXCOUNT) { + int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS; + if (failCount <= MAX_COUNT) { task.setFailCount(failCount + 1); } return delay; } - private static int MINRETRYINTERVAL = 500; - private static int INCREASESTEPS = 1000; - private static int MAXCOUNT = 6; + private static int MIN_RETRY_INTERVAL = 500; + private static int INCREASE_STEPS = 1000; + private static int MAX_COUNT = 6; } From ea5eaf5441d35d541be4be753018d420ad6164df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E7=A6=B9=E5=85=89?= Date: Mon, 1 Jul 2019 11:59:04 +0800 Subject: [PATCH 2/2] fix2 --- .../config/server/service/notify/AsyncNotifyService.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java index e94cfbba0..5df6ef3ab 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java @@ -169,10 +169,9 @@ public class AsyncNotifyService extends AbstractEventListener { class AsyncNotifyCallBack implements FutureCallback { - public AsyncNotifyCallBack(CloseableHttpAsyncClient httpclient, NotifySingleTask task - ) { + public AsyncNotifyCallBack(CloseableHttpAsyncClient httpClient, NotifySingleTask task) { this.task = task; - this.httpclient = httpclient; + this.httpClient = httpClient; } @Override @@ -241,7 +240,7 @@ public class AsyncNotifyService extends AbstractEventListener { } private NotifySingleTask task; - private CloseableHttpAsyncClient httpclient; + private CloseableHttpAsyncClient httpClient; } static class NotifySingleTask extends NotifyTask {