bugfix and opt
This commit is contained in:
parent
dc7485b108
commit
179f810d89
@ -16,11 +16,14 @@
|
||||
package com.alibaba.nacos.config.server.service.notify;
|
||||
|
||||
import com.alibaba.nacos.config.server.constant.Constants;
|
||||
import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent;
|
||||
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
|
||||
import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent;
|
||||
import com.alibaba.nacos.config.server.service.ServerListService;
|
||||
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
|
||||
import com.alibaba.nacos.config.server.utils.*;
|
||||
import com.alibaba.nacos.config.server.utils.LogUtil;
|
||||
import com.alibaba.nacos.config.server.utils.PropertyUtil;
|
||||
import com.alibaba.nacos.config.server.utils.RunningConfigUtils;
|
||||
import com.alibaba.nacos.config.server.utils.StringUtils;
|
||||
import com.alibaba.nacos.config.server.utils.event.EventDispatcher.AbstractEventListener;
|
||||
import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event;
|
||||
import org.apache.http.HttpResponse;
|
||||
@ -68,7 +71,7 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
|
||||
// 并发产生 ConfigDataChangeEvent
|
||||
if (event instanceof ConfigDataChangeEvent) {
|
||||
ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;
|
||||
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
|
||||
long dumpTs = evt.lastModifiedTs;
|
||||
String dataId = evt.dataId;
|
||||
String group = evt.group;
|
||||
@ -79,9 +82,9 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
// 其实这里任何类型队列都可以
|
||||
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
|
||||
for (int i = 0; i < ipList.size(); i++) {
|
||||
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String)ipList.get(i), evt.isBeta));
|
||||
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));
|
||||
}
|
||||
EXCUTOR.execute(new AsyncTask(httpclient, queue));
|
||||
EXECUTOR.execute(new AsyncTask(httpclient, queue));
|
||||
}
|
||||
}
|
||||
|
||||
@ -92,11 +95,11 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
}
|
||||
|
||||
public Executor getExecutor() {
|
||||
return EXCUTOR;
|
||||
return EXECUTOR;
|
||||
}
|
||||
|
||||
@SuppressWarnings("PMD.ThreadPoolCreationRule")
|
||||
private static final Executor EXCUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());
|
||||
private static final Executor EXECUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());
|
||||
|
||||
private RequestConfig requestConfig = RequestConfig.custom()
|
||||
.setConnectTimeout(PropertyUtil.getNotifyConnectTimeout())
|
||||
@ -105,7 +108,7 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
private CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
|
||||
.setDefaultRequestConfig(requestConfig).build();
|
||||
|
||||
static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class);
|
||||
|
||||
private ServerListService serverListService;
|
||||
|
||||
@ -118,15 +121,11 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
executeAsyncInvoke();
|
||||
|
||||
}
|
||||
|
||||
private void executeAsyncInvoke() {
|
||||
|
||||
while (!queue.isEmpty()) {
|
||||
|
||||
NotifySingleTask task = queue.poll();
|
||||
String targetIp = task.getTargetIP();
|
||||
if (serverListService.getServerList().contains(
|
||||
@ -139,11 +138,7 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
task.getLastModified(),
|
||||
LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
|
||||
// get delay time and set fail count to the task
|
||||
int delay = getDelayTime(task);
|
||||
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
|
||||
queue.add(task);
|
||||
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
|
||||
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
|
||||
asyncTaskExecute(task);
|
||||
} else {
|
||||
HttpGet request = new HttpGet(task.url);
|
||||
request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
|
||||
@ -152,7 +147,7 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
if (task.isBeta) {
|
||||
request.setHeader("isBeta", "true");
|
||||
}
|
||||
httpclient.execute(request, new AyscNotifyCallBack(httpclient, task));
|
||||
httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -163,9 +158,18 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
|
||||
}
|
||||
|
||||
class AyscNotifyCallBack implements FutureCallback<HttpResponse> {
|
||||
private void asyncTaskExecute(NotifySingleTask task) {
|
||||
int delay = getDelayTime(task);
|
||||
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
|
||||
queue.add(task);
|
||||
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
|
||||
((ScheduledThreadPoolExecutor) EXECUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public AyscNotifyCallBack(CloseableHttpAsyncClient httpclient, NotifySingleTask task
|
||||
|
||||
class AsyncNotifyCallBack implements FutureCallback<HttpResponse> {
|
||||
|
||||
public AsyncNotifyCallBack(CloseableHttpAsyncClient httpclient, NotifySingleTask task
|
||||
) {
|
||||
this.task = task;
|
||||
this.httpclient = httpclient;
|
||||
@ -183,31 +187,19 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
ConfigTraceService.NOTIFY_EVENT_OK, delayed,
|
||||
task.target);
|
||||
} else {
|
||||
log.error("[notify-error] {}, {}, to {}, result {}",
|
||||
new Object[] {task.getDataId(), task.getGroup(),
|
||||
task.target,
|
||||
response.getStatusLine().getStatusCode()});
|
||||
log.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}",
|
||||
task.target, task.getDataId(), task.getGroup(), task.getLastModified(), response.getStatusLine().getStatusCode());
|
||||
ConfigTraceService.logNotifyEvent(task.getDataId(),
|
||||
task.getGroup(), task.getTenant(), null, task.getLastModified(),
|
||||
LOCAL_IP,
|
||||
ConfigTraceService.NOTIFY_EVENT_ERROR, delayed,
|
||||
task.target);
|
||||
|
||||
|
||||
//get delay time and set fail count to the task
|
||||
int delay = getDelayTime(task);
|
||||
asyncTaskExecute(task);
|
||||
|
||||
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
|
||||
|
||||
queue.add(task);
|
||||
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
|
||||
|
||||
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
|
||||
|
||||
LogUtil.notifyLog.error(
|
||||
"[notify-retry] target:{} dataid:{} group:{} ts:{}",
|
||||
new Object[] {task.target, task.getDataId(),
|
||||
task.getGroup(), task.getLastModified()});
|
||||
LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
|
||||
task.target, task.getDataId(), task.getGroup(), task.getLastModified());
|
||||
|
||||
MetricsMonitor.getConfigNotifyException().increment();
|
||||
}
|
||||
@ -218,10 +210,8 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
public void failed(Exception ex) {
|
||||
|
||||
long delayed = System.currentTimeMillis() - task.getLastModified();
|
||||
log.error("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", "
|
||||
+ ex.toString());
|
||||
log.debug("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", "
|
||||
+ ex.toString(), ex);
|
||||
log.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}",
|
||||
task.target, task.getDataId(), task.getGroup(), task.getLastModified(), ex.toString());
|
||||
ConfigTraceService.logNotifyEvent(task.getDataId(),
|
||||
task.getGroup(), task.getTenant(), null, task.getLastModified(),
|
||||
LOCAL_IP,
|
||||
@ -229,17 +219,9 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
task.target);
|
||||
|
||||
//get delay time and set fail count to the task
|
||||
int delay = getDelayTime(task);
|
||||
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
|
||||
|
||||
queue.add(task);
|
||||
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
|
||||
|
||||
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
|
||||
LogUtil.notifyLog.error(
|
||||
"[notify-retry] target:{} dataid:{} group:{} ts:{}",
|
||||
new Object[] {task.target, task.getDataId(),
|
||||
task.getGroup(), task.getLastModified()});
|
||||
asyncTaskExecute(task);
|
||||
LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
|
||||
task.target, task.getDataId(), task.getGroup(), task.getLastModified());
|
||||
|
||||
MetricsMonitor.getConfigNotifyException().increment();
|
||||
}
|
||||
@ -247,24 +229,13 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
@Override
|
||||
public void cancelled() {
|
||||
|
||||
LogUtil.notifyLog.error(
|
||||
"[notify-exception] target:{} dataid:{} group:{} ts:{}",
|
||||
new Object[] {task.target, task.getGroup(),
|
||||
task.getGroup(), task.getLastModified()},
|
||||
"CANCELED");
|
||||
LogUtil.notifyLog.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}",
|
||||
task.target, task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED");
|
||||
|
||||
//get delay time and set fail count to the task
|
||||
int delay = getDelayTime(task);
|
||||
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
|
||||
|
||||
queue.add(task);
|
||||
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
|
||||
|
||||
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
|
||||
LogUtil.notifyLog.error(
|
||||
"[notify-retry] target:{} dataid:{} group:{} ts:{}",
|
||||
new Object[] {task.target, task.getDataId(),
|
||||
task.getGroup(), task.getLastModified()});
|
||||
asyncTaskExecute(task);
|
||||
LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
|
||||
task.target, task.getDataId(), task.getGroup(), task.getLastModified());
|
||||
|
||||
MetricsMonitor.getConfigNotifyException().increment();
|
||||
}
|
||||
@ -339,8 +310,7 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread thread = new Thread(r,
|
||||
"com.alibaba.nacos.AsyncNotifyServiceThread");
|
||||
Thread thread = new Thread(r, "com.alibaba.nacos.AsyncNotifyServiceThread");
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
@ -354,15 +324,15 @@ public class AsyncNotifyService extends AbstractEventListener {
|
||||
*/
|
||||
private static int getDelayTime(NotifySingleTask task) {
|
||||
int failCount = task.getFailCount();
|
||||
int delay = MINRETRYINTERVAL + failCount * failCount * INCREASESTEPS;
|
||||
if (failCount <= MAXCOUNT) {
|
||||
int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
|
||||
if (failCount <= MAX_COUNT) {
|
||||
task.setFailCount(failCount + 1);
|
||||
}
|
||||
return delay;
|
||||
}
|
||||
|
||||
private static int MINRETRYINTERVAL = 500;
|
||||
private static int INCREASESTEPS = 1000;
|
||||
private static int MAXCOUNT = 6;
|
||||
private static int MIN_RETRY_INTERVAL = 500;
|
||||
private static int INCREASE_STEPS = 1000;
|
||||
private static int MAX_COUNT = 6;
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user