优化编排方式

This commit is contained in:
wuweifeng10 2020-02-25 16:15:25 +08:00
parent d8dca15d2a
commit fe62735389
3 changed files with 319 additions and 20 deletions

14
pom.xml
View File

@ -8,5 +8,17 @@
<artifactId>asyncTool</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -9,10 +9,7 @@ import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.worker.WorkResult;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
@ -171,7 +168,6 @@ public class WorkerWrapper<T, V> {
* 进行下一个任务
*/
private void beginNext(ThreadPoolExecutor poolExecutor, long now, long remainTime) {
// System.out.println("now is " + SystemClock.now() + " and thread count : " + getThreadCount());
//花费的时间
long costTime = SystemClock.now() - now;
if (nextWrappers == null) {
@ -291,7 +287,6 @@ public class WorkerWrapper<T, V> {
private boolean fastFail(int expect, Exception e) {
//试图将它从expect状态,改成Error
if (!compareAndSetState(expect, ERROR)) {
// System.out.println("compareAndSetState----------fail");
return false;
}
@ -365,16 +360,51 @@ public class WorkerWrapper<T, V> {
}
private void addDepend(WorkerWrapper<?, ?> workerWrapper, boolean must) {
addDepend(new DependWrapper(workerWrapper, must));
}
private void addDepend(DependWrapper dependWrapper) {
if (dependWrappers == null) {
dependWrappers = new ArrayList<>();
}
//如果依赖的是重复的同一个就不重复添加了
for (DependWrapper dependWrapper : dependWrappers) {
if (workerWrapper.equals(dependWrapper.getDependWrapper())) {
for (DependWrapper wrapper : dependWrappers) {
if (wrapper.equals(dependWrapper)) {
return;
}
}
dependWrappers.add(new DependWrapper(workerWrapper, must));
dependWrappers.add(dependWrapper);
}
private void addNext(WorkerWrapper<?, ?> workerWrapper) {
if (nextWrappers == null) {
nextWrappers = new ArrayList<>();
}
//避免添加重复
for (WorkerWrapper wrapper : nextWrappers) {
if (workerWrapper.equals(wrapper)) {
return;
}
}
nextWrappers.add(workerWrapper);
}
private void addNextWrappers(List<WorkerWrapper<?, ?>> wrappers) {
if (wrappers == null) {
return;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
addNext(wrapper);
}
}
private void addDependWrappers(List<DependWrapper> dependWrappers) {
if (dependWrappers == null) {
return;
}
for (DependWrapper wrapper : dependWrappers) {
addDepend(wrapper);
}
}
private WorkResult<V> defaultResult() {
@ -403,11 +433,6 @@ public class WorkerWrapper<T, V> {
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
}
private void setNextWrappers(List<WorkerWrapper<?, ?>> wrappers) {
this.nextWrappers = wrappers;
}
public static class Builder<W, C> {
/**
* worker将来要处理的param
@ -419,6 +444,10 @@ public class WorkerWrapper<T, V> {
* 自己后面的所有
*/
private List<WorkerWrapper<?, ?>> nextWrappers;
/**
* 自己依赖的所有
*/
private List<DependWrapper> dependWrappers;
/**
* 存储强依赖于自己的wrapper集合
*/
@ -446,6 +475,32 @@ public class WorkerWrapper<T, V> {
return this;
}
public Builder<W, C> depend(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
depend(wrapper);
}
return this;
}
public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper) {
return depend(wrapper, true);
}
public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper, boolean isMust) {
if (wrapper == null) {
return this;
}
DependWrapper dependWrapper = new DependWrapper(wrapper, isMust);
if (dependWrappers == null) {
dependWrappers = new ArrayList<>();
}
dependWrappers.add(dependWrapper);
return this;
}
public Builder<W, C> next(WorkerWrapper<?, ?> wrapper) {
return next(wrapper, true);
}
@ -471,24 +526,31 @@ public class WorkerWrapper<T, V> {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
next(wrapper, true);
next(wrapper);
}
return this;
}
public WorkerWrapper<W, C> build() {
WorkerWrapper<W, C> wrapper = new WorkerWrapper<>(worker, param, callback);
wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult);
wrapper.setNextWrappers(nextWrappers);
if (nextWrappers != null && nextWrappers.size() > 0) {
if (dependWrappers != null) {
for (DependWrapper workerWrapper : dependWrappers) {
workerWrapper.getDependWrapper().addNext(wrapper);
wrapper.addDepend(workerWrapper);
}
}
if (nextWrappers != null) {
for (WorkerWrapper<?, ?> workerWrapper : nextWrappers) {
if (selfIsMustSet != null) {
workerWrapper.addDepend(wrapper, selfIsMustSet.contains(workerWrapper));
}
wrapper.addNext(workerWrapper);
}
}
return wrapper;
}
}
}

View File

@ -18,10 +18,14 @@ public class TestPar {
// testNormal();
// testMulti();
// testMultiReverse();
// testMultiError2();
// testMulti3();
testMulti4();
// testMulti3Reverse();
// testMulti4();
// testMulti4Reverse();
// testMulti5();
testMulti5Reverse();
// testMulti6();
// testMulti7();
// testMulti8();
@ -109,6 +113,47 @@ public class TestPar {
Async.shutDown();
}
/**
* 0,2同时开启,1在0后面
* 0---1
* 2
*/
private static void testMultiReverse() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.depend(workerWrapper)
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(2500, workerWrapper, workerWrapper2);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
/**
* 0,2同时开启,1在0后面. 组超时,则0和2成功,1失败
@ -203,6 +248,60 @@ public class TestPar {
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2都完成后3
* 1
* 0 3
* 2
*/
private static void testMulti3Reverse() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
ParWorker3 w3 = new ParWorker3();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.depend(workerWrapper)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.depend(workerWrapper)
.build();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.depend(workerWrapper1, workerWrapper2)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(3100, workerWrapper);
// Async.beginWork(2100, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2都完成后32耗时2秒1耗时1秒3会等待2完成
* 1
@ -264,6 +363,68 @@ public class TestPar {
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2都完成后32耗时2秒1耗时1秒3会等待2完成
* 1
* 0 3
* 2
*
* 执行结果0123
*/
private static void testMulti4Reverse() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(2000);
ParWorker3 w3 = new ParWorker3();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.build();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.depend(workerWrapper)
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.depend(workerWrapper)
.next(workerWrapper3)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper);
//3会超时
// Async.beginWork(3100, workerWrapper);
//2,3会超时
// Async.beginWork(2900, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2 任何一个执行完后都执行3
* 1
@ -324,6 +485,70 @@ public class TestPar {
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2 任何一个执行完后都执行3
* 1
* 0 3
* 2
*
* 则结果是
* 0231
* 23分别是500400.3执行完毕后1才执行完
*/
private static void testMulti5Reverse() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(500);
ParWorker3 w3 = new ParWorker3();
w3.setSleepTime(400);
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.build();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.depend(workerWrapper, true)
.next(workerWrapper3, false)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.depend(workerWrapper, true)
.next(workerWrapper3, false)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 必须1执行完毕后才能执行3. 无论2是否领先1完毕都要等1
* 1