Merge pull request #1454 from zhaoyuguang/zen1

Fix #1453
This commit is contained in:
Fury Zhu 2019-07-02 11:44:49 +08:00 committed by GitHub
commit 702289ad69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -16,11 +16,14 @@
package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.service.ServerListService;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.*;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.config.server.utils.RunningConfigUtils;
import com.alibaba.nacos.config.server.utils.StringUtils;
import com.alibaba.nacos.config.server.utils.event.EventDispatcher.AbstractEventListener;
import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event;
import org.apache.http.HttpResponse;
@ -68,7 +71,7 @@ public class AsyncNotifyService extends AbstractEventListener {
// 并发产生 ConfigDataChangeEvent
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
@ -79,9 +82,9 @@ public class AsyncNotifyService extends AbstractEventListener {
// 其实这里任何类型队列都可以
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (int i = 0; i < ipList.size(); i++) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String)ipList.get(i), evt.isBeta));
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));
}
EXCUTOR.execute(new AsyncTask(httpclient, queue));
EXECUTOR.execute(new AsyncTask(httpclient, queue));
}
}
@ -92,11 +95,11 @@ public class AsyncNotifyService extends AbstractEventListener {
}
public Executor getExecutor() {
return EXCUTOR;
return EXECUTOR;
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final Executor EXCUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());
private static final Executor EXECUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());
private RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(PropertyUtil.getNotifyConnectTimeout())
@ -105,7 +108,7 @@ public class AsyncNotifyService extends AbstractEventListener {
private CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
.setDefaultRequestConfig(requestConfig).build();
static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class);
private static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class);
private ServerListService serverListService;
@ -118,15 +121,11 @@ public class AsyncNotifyService extends AbstractEventListener {
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {
NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (serverListService.getServerList().contains(
@ -139,11 +138,7 @@ public class AsyncNotifyService extends AbstractEventListener {
task.getLastModified(),
LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
// get delay time and set fail count to the task
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
asyncTaskExecute(task);
} else {
HttpGet request = new HttpGet(task.url);
request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
@ -152,7 +147,7 @@ public class AsyncNotifyService extends AbstractEventListener {
if (task.isBeta) {
request.setHeader("isBeta", "true");
}
httpclient.execute(request, new AyscNotifyCallBack(httpclient, task));
httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
}
}
}
@ -163,12 +158,20 @@ public class AsyncNotifyService extends AbstractEventListener {
}
class AyscNotifyCallBack implements FutureCallback<HttpResponse> {
private void asyncTaskExecute(NotifySingleTask task) {
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
((ScheduledThreadPoolExecutor) EXECUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
}
public AyscNotifyCallBack(CloseableHttpAsyncClient httpclient, NotifySingleTask task
) {
class AsyncNotifyCallBack implements FutureCallback<HttpResponse> {
public AsyncNotifyCallBack(CloseableHttpAsyncClient httpClient, NotifySingleTask task) {
this.task = task;
this.httpclient = httpclient;
this.httpClient = httpClient;
}
@Override
@ -183,31 +186,19 @@ public class AsyncNotifyService extends AbstractEventListener {
ConfigTraceService.NOTIFY_EVENT_OK, delayed,
task.target);
} else {
log.error("[notify-error] {}, {}, to {}, result {}",
new Object[] {task.getDataId(), task.getGroup(),
task.target,
response.getStatusLine().getStatusCode()});
log.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified(), response.getStatusLine().getStatusCode());
ConfigTraceService.logNotifyEvent(task.getDataId(),
task.getGroup(), task.getTenant(), null, task.getLastModified(),
LOCAL_IP,
ConfigTraceService.NOTIFY_EVENT_ERROR, delayed,
task.target);
//get delay time and set fail count to the task
int delay = getDelayTime(task);
asyncTaskExecute(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
LogUtil.notifyLog.error(
"[notify-retry] target:{} dataid:{} group:{} ts:{}",
new Object[] {task.target, task.getDataId(),
task.getGroup(), task.getLastModified()});
LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified());
MetricsMonitor.getConfigNotifyException().increment();
}
@ -218,10 +209,8 @@ public class AsyncNotifyService extends AbstractEventListener {
public void failed(Exception ex) {
long delayed = System.currentTimeMillis() - task.getLastModified();
log.error("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", "
+ ex.toString());
log.debug("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", "
+ ex.toString(), ex);
log.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified(), ex.toString());
ConfigTraceService.logNotifyEvent(task.getDataId(),
task.getGroup(), task.getTenant(), null, task.getLastModified(),
LOCAL_IP,
@ -229,17 +218,9 @@ public class AsyncNotifyService extends AbstractEventListener {
task.target);
//get delay time and set fail count to the task
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
LogUtil.notifyLog.error(
"[notify-retry] target:{} dataid:{} group:{} ts:{}",
new Object[] {task.target, task.getDataId(),
task.getGroup(), task.getLastModified()});
asyncTaskExecute(task);
LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified());
MetricsMonitor.getConfigNotifyException().increment();
}
@ -247,30 +228,19 @@ public class AsyncNotifyService extends AbstractEventListener {
@Override
public void cancelled() {
LogUtil.notifyLog.error(
"[notify-exception] target:{} dataid:{} group:{} ts:{}",
new Object[] {task.target, task.getGroup(),
task.getGroup(), task.getLastModified()},
"CANCELED");
LogUtil.notifyLog.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED");
//get delay time and set fail count to the task
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
LogUtil.notifyLog.error(
"[notify-retry] target:{} dataid:{} group:{} ts:{}",
new Object[] {task.target, task.getDataId(),
task.getGroup(), task.getLastModified()});
asyncTaskExecute(task);
LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified());
MetricsMonitor.getConfigNotifyException().increment();
}
private NotifySingleTask task;
private CloseableHttpAsyncClient httpclient;
private CloseableHttpAsyncClient httpClient;
}
static class NotifySingleTask extends NotifyTask {
@ -339,8 +309,7 @@ public class AsyncNotifyService extends AbstractEventListener {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,
"com.alibaba.nacos.AsyncNotifyServiceThread");
Thread thread = new Thread(r, "com.alibaba.nacos.AsyncNotifyServiceThread");
thread.setDaemon(true);
return thread;
}
@ -354,15 +323,15 @@ public class AsyncNotifyService extends AbstractEventListener {
*/
private static int getDelayTime(NotifySingleTask task) {
int failCount = task.getFailCount();
int delay = MINRETRYINTERVAL + failCount * failCount * INCREASESTEPS;
if (failCount <= MAXCOUNT) {
int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
if (failCount <= MAX_COUNT) {
task.setFailCount(failCount + 1);
}
return delay;
}
private static int MINRETRYINTERVAL = 500;
private static int INCREASESTEPS = 1000;
private static int MAXCOUNT = 6;
private static int MIN_RETRY_INTERVAL = 500;
private static int INCREASE_STEPS = 1000;
private static int MAX_COUNT = 6;
}