Refactor dispatch task execute (#3995)

* Refactor nacos task execute engine

* Refactor nacos task execute engine

* For checkstyle

* For checkstyle

* Use ThreadUtils to reduce duplicate codes

* Set custom logger for TaskExecuteWorker

* Set custom logger for TaskExecuteWorker
This commit is contained in:
杨翊 SionYang 2020-10-16 14:32:39 +08:00 committed by GitHub
parent 164ad3a8b9
commit d78ebbbce1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 388 additions and 270 deletions

View File

@ -21,7 +21,7 @@ package com.alibaba.nacos.common.task;
* *
* @author xiweng.yy * @author xiweng.yy
*/ */
public abstract class AbstractExecuteTask implements NacosTask { public abstract class AbstractExecuteTask implements NacosTask, Runnable {
@Override @Override
public boolean shouldProcess() { public boolean shouldProcess() {

View File

@ -29,5 +29,5 @@ public interface NacosTaskProcessor {
* @param task task. * @param task task.
* @return process task result. * @return process task result.
*/ */
boolean process(AbstractDelayTask task); boolean process(NacosTask task);
} }

View File

@ -16,20 +16,13 @@
package com.alibaba.nacos.common.task.engine; package com.alibaba.nacos.common.task.engine;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* Abstract nacos task execute engine. * Abstract nacos task execute engine.
@ -40,57 +33,12 @@ public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implem
private final Logger log; private final Logger log;
private final ScheduledExecutorService processingExecutor;
private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>(); private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>();
protected final ConcurrentHashMap<Object, T> tasks;
protected final ReentrantLock lock = new ReentrantLock();
private NacosTaskProcessor defaultTaskProcessor; private NacosTaskProcessor defaultTaskProcessor;
public AbstractNacosTaskExecuteEngine(String name) { public AbstractNacosTaskExecuteEngine(Logger logger) {
this(name, 32, null, 100L);
}
public AbstractNacosTaskExecuteEngine(String name, Logger logger) {
this(name, 32, logger, 100L);
}
public AbstractNacosTaskExecuteEngine(String name, Logger logger, long processInterval) {
this(name, 32, logger, processInterval);
}
public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger) {
this(name, initCapacity, logger, 100L);
}
public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
this.log = null != logger ? logger : LoggerFactory.getLogger(AbstractNacosTaskExecuteEngine.class.getName()); this.log = null != logger ? logger : LoggerFactory.getLogger(AbstractNacosTaskExecuteEngine.class.getName());
tasks = new ConcurrentHashMap<Object, T>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
@Override
public int size() {
lock.lock();
try {
return tasks.size();
} finally {
lock.unlock();
}
}
@Override
public boolean isEmpty() {
lock.lock();
try {
return tasks.isEmpty();
} finally {
lock.unlock();
}
} }
@Override @Override
@ -118,56 +66,7 @@ public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implem
this.defaultTaskProcessor = defaultTaskProcessor; this.defaultTaskProcessor = defaultTaskProcessor;
} }
@Override
public T removeTask(Object key) {
lock.lock();
try {
T task = tasks.get(key);
if (null != task && task.shouldProcess()) {
return tasks.remove(key);
} else {
return null;
}
} finally {
lock.unlock();
}
}
@Override
public Collection<Object> getAllTaskKeys() {
Collection<Object> keys = new HashSet<Object>();
lock.lock();
try {
keys.addAll(tasks.keySet());
} finally {
lock.unlock();
}
return keys;
}
@Override
public void shutdown() throws NacosException {
processingExecutor.shutdown();
}
protected Logger getEngineLog() { protected Logger getEngineLog() {
return log; return log;
} }
/**
* process tasks in execute engine.
*/
protected abstract void processTasks();
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
AbstractNacosTaskExecuteEngine.this.processTasks();
} catch (Throwable e) {
log.error(e.toString(), e);
}
}
}
} }

View File

@ -16,11 +16,19 @@
package com.alibaba.nacos.common.task.engine; package com.alibaba.nacos.common.task.engine;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* Nacos delay task execute engine. * Nacos delay task execute engine.
@ -29,27 +37,105 @@ import java.util.Collection;
*/ */
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> { public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
private final ScheduledExecutorService processingExecutor;
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
protected final ReentrantLock lock = new ReentrantLock();
public NacosDelayTaskExecuteEngine(String name) { public NacosDelayTaskExecuteEngine(String name) {
super(name); this(name, null);
} }
public NacosDelayTaskExecuteEngine(String name, Logger logger) { public NacosDelayTaskExecuteEngine(String name, Logger logger) {
super(name, logger); this(name, 32, logger, 100L);
} }
public NacosDelayTaskExecuteEngine(String name, Logger logger, long processInterval) { public NacosDelayTaskExecuteEngine(String name, Logger logger, long processInterval) {
super(name, logger, processInterval); this(name, 32, logger, processInterval);
} }
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger) { public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger) {
super(name, initCapacity, logger); this(name, initCapacity, logger, 100L);
} }
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(name, initCapacity, logger, processInterval); super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
} }
@Override @Override
public int size() {
lock.lock();
try {
return tasks.size();
} finally {
lock.unlock();
}
}
@Override
public boolean isEmpty() {
lock.lock();
try {
return tasks.isEmpty();
} finally {
lock.unlock();
}
}
@Override
public AbstractDelayTask removeTask(Object key) {
lock.lock();
try {
AbstractDelayTask task = tasks.get(key);
if (null != task && task.shouldProcess()) {
return tasks.remove(key);
} else {
return null;
}
} finally {
lock.unlock();
}
}
@Override
public Collection<Object> getAllTaskKeys() {
Collection<Object> keys = new HashSet<Object>();
lock.lock();
try {
keys.addAll(tasks.keySet());
} finally {
lock.unlock();
}
return keys;
}
@Override
public void shutdown() throws NacosException {
processingExecutor.shutdown();
}
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
/**
* process tasks in execute engine.
*/
protected void processTasks() { protected void processTasks() {
Collection<Object> keys = getAllTaskKeys(); Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) { for (Object taskKey : keys) {
@ -74,22 +160,20 @@ public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<
} }
} }
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
private void retryFailedTask(Object key, AbstractDelayTask task) { private void retryFailedTask(Object key, AbstractDelayTask task) {
task.setLastProcessTime(System.currentTimeMillis()); task.setLastProcessTime(System.currentTimeMillis());
addTask(key, task); addTask(key, task);
} }
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
} }

View File

@ -0,0 +1,111 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.task.engine;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.common.utils.ThreadUtils;
import org.slf4j.Logger;
import java.util.Collection;
/**
* Nacos execute task execute engine.
*
* @author xiweng.yy
*/
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {
private final TaskExecuteWorker[] executeWorkers;
public NacosExecuteTaskExecuteEngine(String name, Logger logger) {
this(name, logger, ThreadUtils.getSuitableThreadCount(1));
}
public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
super(logger);
executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
}
}
@Override
public int size() {
int result = 0;
for (TaskExecuteWorker each : executeWorkers) {
result += each.pendingTaskCount();
}
return result;
}
@Override
public boolean isEmpty() {
return 0 == size();
}
@Override
public void addTask(Object tag, AbstractExecuteTask task) {
NacosTaskProcessor processor = getProcessor(tag);
if (null != processor) {
processor.process(task);
return;
}
TaskExecuteWorker worker = getWorker(tag);
worker.process(task);
}
private TaskExecuteWorker getWorker(Object tag) {
int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
return executeWorkers[idx];
}
private int workersCount() {
return executeWorkers.length;
}
@Override
public AbstractExecuteTask removeTask(Object key) {
throw new UnsupportedOperationException("ExecuteTaskEngine do not support remove task");
}
@Override
public Collection<Object> getAllTaskKeys() {
throw new UnsupportedOperationException("ExecuteTaskEngine do not support get all task keys");
}
@Override
public void shutdown() throws NacosException {
for (TaskExecuteWorker each : executeWorkers) {
each.shutdown();
}
}
/**
* Get workers status.
*
* @return workers status string
*/
public String workersStatus() {
StringBuilder sb = new StringBuilder();
for (TaskExecuteWorker worker : executeWorkers) {
sb.append(worker.status()).append("\n");
}
return sb.toString();
}
}

View File

@ -14,69 +14,69 @@
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.nacos.core.distributed.distro.task.execute; package com.alibaba.nacos.common.task.engine;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.lifecycle.Closeable; import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Distro execute worker. * Nacos execute task execute worker.
* *
* @author xiweng.yy * @author xiweng.yy
*/ */
public final class DistroExecuteWorker implements Closeable { public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(DistroExecuteWorker.class); /**
* Max task queue size 32768.
*/
private static final int QUEUE_CAPACITY = 1 << 15;
private static final int QUEUE_CAPACITY = 50000; private final Logger log;
private final BlockingQueue<Runnable> queue;
private final String name; private final String name;
private final BlockingQueue<Runnable> queue;
private final AtomicBoolean closed; private final AtomicBoolean closed;
public DistroExecuteWorker(final int mod, final int total) { public TaskExecuteWorker(final String name, final int mod, final int total) {
name = getClass().getName() + "_" + mod + "%" + total; this(name, mod, total, null);
queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY); }
closed = new AtomicBoolean(false);
public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
this.name = name + "_" + mod + "%" + total;
this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
new InnerWorker(name).start(); new InnerWorker(name).start();
} }
public String getName() { public String getName() {
return name; return name;
} }
/** @Override
* Execute task without result. public boolean process(NacosTask task) {
*/ if (task instanceof AbstractExecuteTask) {
public void execute(Runnable task) { putTask((Runnable) task);
putTask(task); }
return true;
} }
/**
* Execute task with a result.
*/
public <V> Future<V> execute(Callable<V> task) {
FutureTask<V> future = new FutureTask(task);
putTask(future);
return future;
}
private void putTask(Runnable task) { private void putTask(Runnable task) {
try { try {
queue.put(task); queue.put(task);
} catch (InterruptedException ire) { } catch (InterruptedException ire) {
LOGGER.error(ire.toString(), ire); log.error(ire.toString(), ire);
} }
} }
@ -101,12 +101,12 @@ public final class DistroExecuteWorker implements Closeable {
* Inner execute worker. * Inner execute worker.
*/ */
private class InnerWorker extends Thread { private class InnerWorker extends Thread {
InnerWorker(String name) { InnerWorker(String name) {
setDaemon(false); setDaemon(false);
setName(name); setName(name);
} }
@Override @Override
public void run() { public void run() {
while (!closed.get()) { while (!closed.get()) {
@ -116,10 +116,10 @@ public final class DistroExecuteWorker implements Closeable {
task.run(); task.run();
long duration = System.currentTimeMillis() - begin; long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) { if (duration > 1000L) {
LOGGER.warn("distro task {} takes {}ms", task, duration); log.warn("distro task {} takes {}ms", task, duration);
} }
} catch (Throwable e) { } catch (Throwable e) {
LOGGER.error("[DISTRO-FAILED] " + e.toString(), e); log.error("[DISTRO-FAILED] " + e.toString(), e);
} }
} }
} }

View File

@ -94,9 +94,19 @@ public final class ThreadUtils {
* @return thread count * @return thread count
*/ */
public static int getSuitableThreadCount() { public static int getSuitableThreadCount() {
return getSuitableThreadCount(THREAD_MULTIPLER);
}
/**
* Through the number of cores, calculate the appropriate number of threads.
*
* @param threadMultiple multiple time of cores
* @return thread count
*/
public static int getSuitableThreadCount(int threadMultiple) {
final int coreCount = Runtime.getRuntime().availableProcessors(); final int coreCount = Runtime.getRuntime().availableProcessors();
int workerCount = 1; int workerCount = 1;
while (workerCount < coreCount * THREAD_MULTIPLER) { while (workerCount < coreCount * threadMultiple) {
workerCount <<= 1; workerCount <<= 1;
} }
return workerCount; return workerCount;

View File

@ -0,0 +1,57 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.task.engine;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class NacosExecuteTaskExecuteEngineTest {
private NacosExecuteTaskExecuteEngine executeTaskExecuteEngine;
@Before
public void setUp() {
executeTaskExecuteEngine = new NacosExecuteTaskExecuteEngine("TEST", null);
}
@After
public void tearDown() throws NacosException {
executeTaskExecuteEngine.shutdown();
}
@Mock
private AbstractExecuteTask task;
@Test
public void testAddTask() {
executeTaskExecuteEngine.addTask("test", task);
verify(task).run();
assertTrue(executeTaskExecuteEngine.isEmpty());
assertEquals(0, executeTaskExecuteEngine.size());
}
}

View File

@ -44,10 +44,6 @@ public final class TaskManager extends NacosDelayTaskExecuteEngine implements Ta
Condition notEmpty = this.lock.newCondition(); Condition notEmpty = this.lock.newCondition();
public TaskManager() {
this(null);
}
public TaskManager(String name) { public TaskManager(String name) {
super(name, LOGGER, 100L); super(name, LOGGER, 100L);
this.name = name; this.name = name;

View File

@ -16,7 +16,7 @@
package com.alibaba.nacos.config.server.service.dump.processor; package com.alibaba.nacos.config.server.service.dump.processor;
import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.config.server.model.ConfigInfoBetaWrapper; import com.alibaba.nacos.config.server.model.ConfigInfoBetaWrapper;
import com.alibaba.nacos.config.server.model.Page; import com.alibaba.nacos.config.server.model.Page;
@ -43,7 +43,7 @@ public class DumpAllBetaProcessor implements NacosTaskProcessor {
} }
@Override @Override
public boolean process(AbstractDelayTask task) { public boolean process(NacosTask task) {
int rowCount = persistService.configInfoBetaCount(); int rowCount = persistService.configInfoBetaCount();
int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE); int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE);

View File

@ -16,9 +16,9 @@
package com.alibaba.nacos.config.server.service.dump.processor; package com.alibaba.nacos.config.server.service.dump.processor;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper; import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
import com.alibaba.nacos.config.server.model.Page; import com.alibaba.nacos.config.server.model.Page;
@ -47,7 +47,7 @@ public class DumpAllProcessor implements NacosTaskProcessor {
} }
@Override @Override
public boolean process(AbstractDelayTask task) { public boolean process(NacosTask task) {
long currentMaxId = persistService.findConfigMaxId(); long currentMaxId = persistService.findConfigMaxId();
long lastMaxId = 0; long lastMaxId = 0;
while (lastMaxId < currentMaxId) { while (lastMaxId < currentMaxId) {

View File

@ -16,7 +16,7 @@
package com.alibaba.nacos.config.server.service.dump.processor; package com.alibaba.nacos.config.server.service.dump.processor;
import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.config.server.model.ConfigInfoTagWrapper; import com.alibaba.nacos.config.server.model.ConfigInfoTagWrapper;
import com.alibaba.nacos.config.server.model.Page; import com.alibaba.nacos.config.server.model.Page;
@ -42,7 +42,7 @@ public class DumpAllTagProcessor implements NacosTaskProcessor {
} }
@Override @Override
public boolean process(AbstractDelayTask task) { public boolean process(NacosTask task) {
int rowCount = persistService.configInfoTagCount(); int rowCount = persistService.configInfoTagCount();
int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE); int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE);

View File

@ -16,9 +16,9 @@
package com.alibaba.nacos.config.server.service.dump.processor; package com.alibaba.nacos.config.server.service.dump.processor;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper; import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
@ -47,7 +47,7 @@ public class DumpChangeProcessor implements NacosTaskProcessor {
} }
@Override @Override
public boolean process(AbstractDelayTask task) { public boolean process(NacosTask task) {
LogUtil.DEFAULT_LOG.warn("quick start; startTime:{},endTime:{}", startTime, endTime); LogUtil.DEFAULT_LOG.warn("quick start; startTime:{},endTime:{}", startTime, endTime);
LogUtil.DEFAULT_LOG.warn("updateMd5 start"); LogUtil.DEFAULT_LOG.warn("updateMd5 start");
long startUpdateMd5 = System.currentTimeMillis(); long startUpdateMd5 = System.currentTimeMillis();

View File

@ -16,7 +16,7 @@
package com.alibaba.nacos.config.server.service.dump.processor; package com.alibaba.nacos.config.server.service.dump.processor;
import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.model.ConfigInfo4Beta; import com.alibaba.nacos.config.server.model.ConfigInfo4Beta;
@ -44,7 +44,7 @@ public class DumpProcessor implements NacosTaskProcessor {
} }
@Override @Override
public boolean process(AbstractDelayTask task) { public boolean process(NacosTask task) {
final PersistService persistService = dumpService.getPersistService(); final PersistService persistService = dumpService.getPersistService();
DumpTask dumpTask = (DumpTask) task; DumpTask dumpTask = (DumpTask) task;
String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey()); String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());

View File

@ -17,8 +17,8 @@
package com.alibaba.nacos.config.server.service.merge; package com.alibaba.nacos.config.server.service.merge;
import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.model.ConfigInfoAggr; import com.alibaba.nacos.config.server.model.ConfigInfoAggr;
@ -52,7 +52,7 @@ public class MergeTaskProcessor implements NacosTaskProcessor {
} }
@Override @Override
public boolean process(AbstractDelayTask task) { public boolean process(NacosTask task) {
MergeDataTask mergeTask = (MergeDataTask) task; MergeDataTask mergeTask = (MergeDataTask) task;
final String dataId = mergeTask.dataId; final String dataId = mergeTask.dataId;
final String group = mergeTask.groupId; final String group = mergeTask.groupId;

View File

@ -18,7 +18,7 @@ package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.common.executor.ExecutorFactory; import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory; import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.config.server.utils.LogUtil; import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.cluster.ServerMemberManager;
@ -50,7 +50,7 @@ public class NotifySingleService {
} }
@Override @Override
public boolean process(AbstractDelayTask task) { public boolean process(NacosTask task) {
NotifySingleTask notifyTask = (NotifySingleTask) task; NotifySingleTask notifyTask = (NotifySingleTask) task;
return notifyToDump(notifyTask.getDataId(), notifyTask.getGroup(), notifyTask.getTenant(), return notifyToDump(notifyTask.getDataId(), notifyTask.getGroup(), notifyTask.getTenant(),
notifyTask.getLastModified(), notifyTask.target); notifyTask.getLastModified(), notifyTask.target);

View File

@ -17,8 +17,8 @@
package com.alibaba.nacos.config.server.service.notify; package com.alibaba.nacos.config.server.service.notify;
import com.alibaba.nacos.common.model.RestResult; import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor; import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService; import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
@ -46,7 +46,7 @@ public class NotifyTaskProcessor implements NacosTaskProcessor {
} }
@Override @Override
public boolean process(AbstractDelayTask task) { public boolean process(NacosTask task) {
NotifyTask notifyTask = (NotifyTask) task; NotifyTask notifyTask = (NotifyTask) task;
String dataId = notifyTask.getDataId(); String dataId = notifyTask.getDataId();
String group = notifyTask.getGroup(); String group = notifyTask.getGroup();

View File

@ -20,7 +20,7 @@ import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder; import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor;
import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteWorkersManager; import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteTaskExecuteEngine;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
@ -33,7 +33,7 @@ public class DistroTaskEngineHolder {
private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine(); private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
private final DistroExecuteWorkersManager executeWorkersManager = new DistroExecuteWorkersManager(); private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) { public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder); DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
@ -44,7 +44,7 @@ public class DistroTaskEngineHolder {
return delayTaskExecuteEngine; return delayTaskExecuteEngine;
} }
public DistroExecuteWorkersManager getExecuteWorkersManager() { public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
return executeWorkersManager; return executeWorkersManager;
} }

View File

@ -16,7 +16,7 @@
package com.alibaba.nacos.core.distributed.distro.task.delay; package com.alibaba.nacos.core.distributed.distro.task.delay;
import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder; import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
@ -42,7 +42,7 @@ public class DistroDelayTaskProcessor implements NacosTaskProcessor {
} }
@Override @Override
public boolean process(AbstractDelayTask task) { public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) { if (!(task instanceof DistroDelayTask)) {
return true; return true;
} }
@ -50,7 +50,7 @@ public class DistroDelayTaskProcessor implements NacosTaskProcessor {
DistroKey distroKey = distroDelayTask.getDistroKey(); DistroKey distroKey = distroDelayTask.getDistroKey();
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) { if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true; return true;
} }
return false; return false;

View File

@ -24,7 +24,7 @@ import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
* *
* @author xiweng.yy * @author xiweng.yy
*/ */
public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask implements Runnable { public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask {
private final DistroKey distroKey; private final DistroKey distroKey;

View File

@ -0,0 +1,32 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.execute;
import com.alibaba.nacos.common.task.engine.NacosExecuteTaskExecuteEngine;
import com.alibaba.nacos.core.utils.Loggers;
/**
* Distro execute task execute engine.
*
* @author xiweng.yy
*/
public class DistroExecuteTaskExecuteEngine extends NacosExecuteTaskExecuteEngine {
public DistroExecuteTaskExecuteEngine() {
super(DistroExecuteTaskExecuteEngine.class.getSimpleName(), Loggers.DISTRO);
}
}

View File

@ -1,75 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.execute;
/**
* Distro execute workers manager.
*
* @author xiweng.yy
*/
public final class DistroExecuteWorkersManager {
private final DistroExecuteWorker[] connectionWorkers;
public DistroExecuteWorkersManager() {
int workerCount = findWorkerCount();
connectionWorkers = new DistroExecuteWorker[workerCount];
for (int mod = 0; mod < workerCount; ++mod) {
connectionWorkers[mod] = new DistroExecuteWorker(mod, workerCount);
}
}
private int findWorkerCount() {
final int coreCount = Runtime.getRuntime().availableProcessors();
int result = 1;
while (result < coreCount) {
result <<= 1;
}
return result;
}
/**
* Dispatch task to worker by tag.
*/
public void dispatch(Object tag, Runnable task) {
DistroExecuteWorker worker = getWorker(tag);
worker.execute(task);
}
private DistroExecuteWorker getWorker(Object tag) {
int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
return connectionWorkers[idx];
}
/**
* Get workers status.
*
* @return workers status string
*/
public String workersStatus() {
StringBuilder sb = new StringBuilder();
for (DistroExecuteWorker worker : connectionWorkers) {
sb.append(worker.status()).append("\n");
}
return sb.toString();
}
public int workersCount() {
return connectionWorkers.length;
}
}

View File

@ -18,9 +18,9 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined;
import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -49,15 +49,18 @@ public class DistroHttpCombinedKeyDelayTask extends DistroDelayTask {
public void merge(AbstractDelayTask task) { public void merge(AbstractDelayTask task) {
actualResourceKeys.addAll(((DistroHttpCombinedKeyDelayTask) task).getActualResourceKeys()); actualResourceKeys.addAll(((DistroHttpCombinedKeyDelayTask) task).getActualResourceKeys());
if (actualResourceKeys.size() >= batchSize) { if (actualResourceKeys.size() >= batchSize) {
this.setLastProcessTime(0);
DistroHttpCombinedKey.incrementSequence(); DistroHttpCombinedKey.incrementSequence();
setLastProcessTime(0);
} else {
setLastProcessTime(task.getLastProcessTime());
} }
} }
@Override @Override
public DistroKey getDistroKey() { public DistroKey getDistroKey() {
DistroKey taskKey = super.getDistroKey(); DistroKey taskKey = super.getDistroKey();
DistroHttpCombinedKey result = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, taskKey.getTargetServer()); DistroHttpCombinedKey result = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
taskKey.getTargetServer());
result.setResourceKey(taskKey.getResourceKey()); result.setResourceKey(taskKey.getResourceKey());
result.getActualResourceTypes().addAll(actualResourceKeys); result.getActualResourceTypes().addAll(actualResourceKeys);
return result; return result;

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined; package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine;
@ -31,7 +32,7 @@ import com.alibaba.nacos.naming.misc.Loggers;
* *
* @author xiweng.yy * @author xiweng.yy
*/ */
public class DistroHttpCombinedKeyExecuteTask implements Runnable { public class DistroHttpCombinedKeyExecuteTask extends AbstractExecuteTask {
private final GlobalConfig globalConfig; private final GlobalConfig globalConfig;

View File

@ -16,7 +16,7 @@
package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined; package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined;
import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder; import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
@ -50,12 +50,12 @@ public class DistroHttpDelayTaskProcessor implements NacosTaskProcessor {
} }
@Override @Override
public boolean process(AbstractDelayTask task) { public boolean process(NacosTask task) {
DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey(); DistroKey distroKey = distroDelayTask.getDistroKey();
DistroHttpCombinedKeyExecuteTask executeTask = new DistroHttpCombinedKeyExecuteTask(globalConfig, DistroHttpCombinedKeyExecuteTask executeTask = new DistroHttpCombinedKeyExecuteTask(globalConfig,
distroTaskEngineHolder.getDelayTaskExecuteEngine(), distroKey, distroDelayTask.getAction()); distroTaskEngineHolder.getDelayTaskExecuteEngine(), distroKey, distroDelayTask.getAction());
distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, executeTask); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, executeTask);
return true; return true;
} }
} }