Merge branch 'V1.4'

# Conflicts:
#	src/main/java/com/jd/platform/async/executor/Async.java
#	src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java
This commit is contained in:
wuweifeng10 2020-12-14 10:27:48 +08:00
commit 00c90aaec1
4 changed files with 70 additions and 38 deletions

View File

@ -6,7 +6,7 @@
<groupId>com.jd.platform</groupId> <groupId>com.jd.platform</groupId>
<artifactId>asyncTool</artifactId> <artifactId>asyncTool</artifactId>
<version>1.3.1-SNAPSHOT</version> <version>1.4.1-SNAPSHOT</version>
<build> <build>
<plugins> <plugins>

View File

@ -15,22 +15,29 @@ import java.util.stream.Collectors;
* @version 1.0 * @version 1.0
*/ */
public class Async { public class Async {
public static final ThreadPoolExecutor COMMON_POOL = private static final ThreadPoolExecutor COMMON_POOL =
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024, new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024,
15L, TimeUnit.SECONDS, 15L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new LinkedBlockingQueue<>(),
(ThreadFactory) Thread::new); (ThreadFactory) Thread::new);
public static boolean beginWork(long timeout, ThreadPoolExecutor pool, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException { private static ExecutorService executorService;
/**
* 出发点
*/
public static boolean beginWork(long timeout, ExecutorService executorService, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException {
if(workerWrappers == null || workerWrappers.size() == 0) { if(workerWrappers == null || workerWrappers.size() == 0) {
return false; return false;
} }
//保存线程池变量
Async.executorService = executorService;
//定义一个map存放所有的wrapperkey为wrapper的唯一idvalue是该wrapper可以从value中获取wrapper的result //定义一个map存放所有的wrapperkey为wrapper的唯一idvalue是该wrapper可以从value中获取wrapper的result
Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>(); Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>();
CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()]; CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
for (int i = 0; i < workerWrappers.size(); i++) { for (int i = 0; i < workerWrappers.size(); i++) {
WorkerWrapper wrapper = workerWrappers.get(i); WorkerWrapper wrapper = workerWrappers.get(i);
futures[i] = CompletableFuture.runAsync(() -> wrapper.work(pool, timeout, forParamUseWrappers), pool); futures[i] = CompletableFuture.runAsync(() -> wrapper.work(executorService, timeout, forParamUseWrappers), executorService);
} }
try { try {
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS); CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
@ -48,12 +55,12 @@ public class Async {
/** /**
* 如果想自定义线程池请传pool不自定义的话就走默认的COMMON_POOL * 如果想自定义线程池请传pool不自定义的话就走默认的COMMON_POOL
*/ */
public static boolean beginWork(long timeout, ThreadPoolExecutor pool, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException { public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
if(workerWrapper == null || workerWrapper.length == 0) { if(workerWrapper == null || workerWrapper.length == 0) {
return false; return false;
} }
List<WorkerWrapper> workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toList()); List<WorkerWrapper> workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toList());
return beginWork(timeout, pool, workerWrappers); return beginWork(timeout, executorService, workerWrappers);
} }
/** /**
@ -71,6 +78,21 @@ public class Async {
groupCallback = new DefaultGroupCallback(); groupCallback = new DefaultGroupCallback();
} }
IGroupCallback finalGroupCallback = groupCallback; IGroupCallback finalGroupCallback = groupCallback;
if (executorService != null) {
executorService.submit(() -> {
try {
boolean success = beginWork(timeout, COMMON_POOL, workerWrapper);
if (success) {
finalGroupCallback.success(Arrays.asList(workerWrapper));
} else {
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
}
});
} else {
COMMON_POOL.submit(() -> { COMMON_POOL.submit(() -> {
try { try {
boolean success = beginWork(timeout, COMMON_POOL, workerWrapper); boolean success = beginWork(timeout, COMMON_POOL, workerWrapper);
@ -86,6 +108,8 @@ public class Async {
}); });
} }
}
/** /**
* 总共多少个执行单元 * 总共多少个执行单元
*/ */
@ -102,10 +126,16 @@ public class Async {
} }
/**
* 关闭线程池
*/
public static void shutDown() { public static void shutDown() {
if (executorService != null) {
executorService.shutdown();
} else {
COMMON_POOL.shutdown(); COMMON_POOL.shutdown();
} }
}
public static String getThreadCount() { public static String getThreadCount() {
return "activeCount=" + COMMON_POOL.getActiveCount() + return "activeCount=" + COMMON_POOL.getActiveCount() +

View File

@ -12,7 +12,7 @@ import com.jd.platform.async.worker.WorkResult;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -96,7 +96,7 @@ public class WorkerWrapper<T, V> {
* 开始工作 * 开始工作
* fromWrapper代表这次work是由哪个上游wrapper发起的 * fromWrapper代表这次work是由哪个上游wrapper发起的
*/ */
private void work(ThreadPoolExecutor poolExecutor, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) { private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
this.forParamUseWrappers = forParamUseWrappers; this.forParamUseWrappers = forParamUseWrappers;
//将自己放到所有wrapper的集合里去 //将自己放到所有wrapper的集合里去
forParamUseWrappers.put(id, this); forParamUseWrappers.put(id, this);
@ -104,13 +104,13 @@ public class WorkerWrapper<T, V> {
//总的已经超时了就快速失败进行下一个 //总的已经超时了就快速失败进行下一个
if (remainTime <= 0) { if (remainTime <= 0) {
fastFail(INIT, null); fastFail(INIT, null);
beginNext(poolExecutor, now, remainTime); beginNext(executorService, now, remainTime);
return; return;
} }
//如果自己已经执行过了 //如果自己已经执行过了
//可能有多个依赖其中的一个依赖已经执行完了并且自己也已开始执行或执行完毕当另一个依赖执行完毕又进来该方法时就不重复处理了 //可能有多个依赖其中的一个依赖已经执行完了并且自己也已开始执行或执行完毕当另一个依赖执行完毕又进来该方法时就不重复处理了
if (getState() == FINISH || getState() == ERROR) { if (getState() == FINISH || getState() == ERROR) {
beginNext(poolExecutor, now, remainTime); beginNext(executorService, now, remainTime);
return; return;
} }
@ -119,7 +119,7 @@ public class WorkerWrapper<T, V> {
//如果自己的next链上有已经出结果或已经开始执行的任务了自己就不用继续了 //如果自己的next链上有已经出结果或已经开始执行的任务了自己就不用继续了
if (!checkNextWrapperResult()) { if (!checkNextWrapperResult()) {
fastFail(INIT, new SkippedException()); fastFail(INIT, new SkippedException());
beginNext(poolExecutor, now, remainTime); beginNext(executorService, now, remainTime);
return; return;
} }
} }
@ -127,7 +127,7 @@ public class WorkerWrapper<T, V> {
//如果没有任何依赖说明自己就是第一批要执行的 //如果没有任何依赖说明自己就是第一批要执行的
if (dependWrappers == null || dependWrappers.size() == 0) { if (dependWrappers == null || dependWrappers.size() == 0) {
fire(); fire();
beginNext(poolExecutor, now, remainTime); beginNext(executorService, now, remainTime);
return; return;
} }
@ -139,17 +139,17 @@ public class WorkerWrapper<T, V> {
//只有一个依赖 //只有一个依赖
if (dependWrappers.size() == 1) { if (dependWrappers.size() == 1) {
doDependsOneJob(fromWrapper); doDependsOneJob(fromWrapper);
beginNext(poolExecutor, now, remainTime); beginNext(executorService, now, remainTime);
} else { } else {
//有多个依赖时 //有多个依赖时
doDependsJobs(poolExecutor, dependWrappers, fromWrapper, now, remainTime); doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);
} }
} }
public void work(ThreadPoolExecutor poolExecutor, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) { public void work(ExecutorService executorService, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
work(poolExecutor, null, remainTime, forParamUseWrappers); work(executorService, null, remainTime, forParamUseWrappers);
} }
/** /**
@ -179,21 +179,21 @@ public class WorkerWrapper<T, V> {
/** /**
* 进行下一个任务 * 进行下一个任务
*/ */
private void beginNext(ThreadPoolExecutor poolExecutor, long now, long remainTime) { private void beginNext(ExecutorService executorService, long now, long remainTime) {
//花费的时间 //花费的时间
long costTime = SystemClock.now() - now; long costTime = SystemClock.now() - now;
if (nextWrappers == null) { if (nextWrappers == null) {
return; return;
} }
if (nextWrappers.size() == 1) { if (nextWrappers.size() == 1) {
nextWrappers.get(0).work(poolExecutor, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers); nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
return; return;
} }
CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()]; CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
for (int i = 0; i < nextWrappers.size(); i++) { for (int i = 0; i < nextWrappers.size(); i++) {
int finalI = i; int finalI = i;
futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI) futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
.work(poolExecutor, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), poolExecutor); .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
} }
try { try {
CompletableFuture.allOf(futures).get(); CompletableFuture.allOf(futures).get();
@ -215,7 +215,7 @@ public class WorkerWrapper<T, V> {
} }
} }
private synchronized void doDependsJobs(ThreadPoolExecutor poolExecutor, List<DependWrapper> dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) { private synchronized void doDependsJobs(ExecutorService executorService, List<DependWrapper> dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
boolean nowDependIsMust = false; boolean nowDependIsMust = false;
//创建必须完成的上游wrapper集合 //创建必须完成的上游wrapper集合
Set<DependWrapper> mustWrapper = new HashSet<>(); Set<DependWrapper> mustWrapper = new HashSet<>();
@ -235,7 +235,7 @@ public class WorkerWrapper<T, V> {
} else { } else {
fire(); fire();
} }
beginNext(poolExecutor, now, remainTime); beginNext(executorService, now, remainTime);
return; return;
} }
@ -271,7 +271,7 @@ public class WorkerWrapper<T, V> {
//只要有失败的 //只要有失败的
if (hasError) { if (hasError) {
fastFail(INIT, null); fastFail(INIT, null);
beginNext(poolExecutor, now, remainTime); beginNext(executorService, now, remainTime);
return; return;
} }
@ -280,7 +280,8 @@ public class WorkerWrapper<T, V> {
if (!existNoFinish) { if (!existNoFinish) {
//上游都finish了进行自己 //上游都finish了进行自己
fire(); fire();
beginNext(poolExecutor, now, remainTime); beginNext(executorService, now, remainTime);
return;
} }
} }

View File

@ -6,6 +6,7 @@ import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
/** /**
* 并行测试 * 并行测试
@ -863,7 +864,7 @@ public class TestPar {
.next(last, false) .next(last, false)
.build(); .build();
Async.beginWork(6000, wrapperW, wrapperW1); Async.beginWork(6000,Executors.newCachedThreadPool(), wrapperW, wrapperW1);
Async.shutDown(); Async.shutDown();
} }
} }