重要更新,采用builder模式

This commit is contained in:
wuweifeng10 2020-02-19 18:37:56 +08:00
parent 5b1c05032e
commit 67e5082293
5 changed files with 455 additions and 288 deletions

View File

@ -73,7 +73,7 @@ public class WorkerWrapper<T, V> {
private static final int WORKING = 3; private static final int WORKING = 3;
private static final int INIT = 0; private static final int INIT = 0;
public WorkerWrapper(IWorker<T, V> worker, T param, ICallback<T, V> callback) { private WorkerWrapper(IWorker<T, V> worker, T param, ICallback<T, V> callback) {
if (worker == null) { if (worker == null) {
throw new NullPointerException("async.worker is null"); throw new NullPointerException("async.worker is null");
} }
@ -267,7 +267,6 @@ public class WorkerWrapper<T, V> {
return; return;
} }
System.out.println(Thread.currentThread().getName() + "---" + existNoFinish);
//如果上游都没有失败分为两种情况一种是都finish了一种是有的在working //如果上游都没有失败分为两种情况一种是都finish了一种是有的在working
//都finish的话 //都finish的话
if (!existNoFinish) { if (!existNoFinish) {
@ -305,7 +304,7 @@ public class WorkerWrapper<T, V> {
} }
} }
callback.result(false, getParam(), workResult); callback.result(false, param, workResult);
return true; return true;
} }
@ -326,7 +325,7 @@ public class WorkerWrapper<T, V> {
callback.begin(); callback.begin();
//执行耗时操作 //执行耗时操作
V resultValue = worker.action(getParam()); V resultValue = worker.action(param);
//如果状态不是在working,说明别的地方已经修改了 //如果状态不是在working,说明别的地方已经修改了
if (!compareAndSetState(WORKING, FINISH)) { if (!compareAndSetState(WORKING, FINISH)) {
@ -336,7 +335,7 @@ public class WorkerWrapper<T, V> {
workResult.setResultState(ResultState.SUCCESS); workResult.setResultState(ResultState.SUCCESS);
workResult.setResult(resultValue); workResult.setResult(resultValue);
//回调成功 //回调成功
callback.result(true, getParam(), workResult); callback.result(true, param, workResult);
return workResult; return workResult;
} catch (Exception e) { } catch (Exception e) {
@ -349,70 +348,22 @@ public class WorkerWrapper<T, V> {
} }
} }
public WorkResult<V> getWorkResult() {
return workResult;
}
public List<WorkerWrapper<?, ?>> getNextWrappers() {
return nextWrappers;
}
public void setParam(T param) {
this.param = param;
}
private boolean checkIsNullResult() { private boolean checkIsNullResult() {
return ResultState.DEFAULT == workResult.getResultState(); return ResultState.DEFAULT == workResult.getResultState();
} }
public WorkerWrapper addNext(WorkerWrapper<?, ?>... nextWrappers) {
if (nextWrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> workerWrapper : nextWrappers) {
addNext(workerWrapper);
}
return this;
}
public WorkerWrapper addNext(IWorker<T, V> worker, T param, ICallback<T, V> callback) {
WorkerWrapper<T, V> workerWrapper = new WorkerWrapper<>(worker, param, callback);
return addNext(workerWrapper);
}
public WorkerWrapper addNext(WorkerWrapper<?, ?> workerWrapper) {
if (nextWrappers == null) {
nextWrappers = new ArrayList<>();
}
nextWrappers.add(workerWrapper);
workerWrapper.addDepend(this);
return this;
}
/**
* 直接set Next
*/
public WorkerWrapper setNext(WorkerWrapper<?, ?>... workerWrapper) {
if (nextWrappers != null) {
nextWrappers.clear();
}
return addNext(workerWrapper);
}
/**
* 设置这几个依赖的wrapper不是must执行完毕才能执行自己
*/
public void setDependNotMust(WorkerWrapper<?, ?>... workerWrapper) {
if (dependWrappers == null) {
return;
}
if (workerWrapper == null) {
return;
}
for (DependWrapper dependWrapper : dependWrappers) {
for (WorkerWrapper wrapper : workerWrapper) {
if (dependWrapper.getDependWrapper().equals(wrapper)) {
dependWrapper.setMust(false);
}
}
}
}
private void addDepend(WorkerWrapper<?, ?> workerWrapper) {
this.addDepend(workerWrapper, true);
}
private void addDepend(WorkerWrapper<?, ?> workerWrapper, boolean must) { private void addDepend(WorkerWrapper<?, ?> workerWrapper, boolean must) {
if (dependWrappers == null) { if (dependWrappers == null) {
dependWrappers = new ArrayList<>(); dependWrappers = new ArrayList<>();
@ -428,73 +379,116 @@ public class WorkerWrapper<T, V> {
private WorkResult<V> defaultResult() { private WorkResult<V> defaultResult() {
workResult.setResultState(ResultState.TIMEOUT); workResult.setResultState(ResultState.TIMEOUT);
workResult.setResult(getWorker().defaultValue()); workResult.setResult(worker.defaultValue());
return workResult; return workResult;
} }
private WorkResult<V> defaultExResult(Exception ex) { private WorkResult<V> defaultExResult(Exception ex) {
workResult.setResultState(ResultState.EXCEPTION); workResult.setResultState(ResultState.EXCEPTION);
workResult.setResult(getWorker().defaultValue()); workResult.setResult(worker.defaultValue());
workResult.setEx(ex); workResult.setEx(ex);
return workResult; return workResult;
} }
private WorkResult<V> getNoneNullWorkResult() {
if (workResult == null) {
return defaultResult();
}
return workResult;
}
public T getParam() { private int getState() {
return param;
}
public IWorker<T, V> getWorker() {
return worker;
}
public ICallback<T, V> getCallback() {
return callback;
}
public List<WorkerWrapper<?, ?>> getNextWrappers() {
return nextWrappers;
}
public void setNextWrappers(List<WorkerWrapper<?, ?>> nextWrappers) {
this.nextWrappers = nextWrappers;
}
public List<DependWrapper> getDependWrappers() {
return dependWrappers;
}
public void setDependWrappers(List<DependWrapper> dependWrappers) {
this.dependWrappers = dependWrappers;
}
public int getState() {
return state.get(); return state.get();
} }
public boolean compareAndSetState(int expect, int update) { private boolean compareAndSetState(int expect, int update) {
return this.state.compareAndSet(expect, update); return this.state.compareAndSet(expect, update);
} }
public WorkResult<V> getWorkResult() { private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
return workResult;
}
public void setWorkResult(WorkResult<V> workResult) {
this.workResult = workResult;
}
public boolean isNeedCheckNextWrapperResult() {
return needCheckNextWrapperResult;
}
public void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
this.needCheckNextWrapperResult = needCheckNextWrapperResult; this.needCheckNextWrapperResult = needCheckNextWrapperResult;
} }
private void setNextWrappers(List<WorkerWrapper<?, ?>> wrappers) {
this.nextWrappers = wrappers;
}
public static class Builder<W, C> {
/**
* worker将来要处理的param
*/
private W param;
private IWorker<W, C> worker;
private ICallback<W, C> callback;
/**
* 自己后面的所有
*/
private List<WorkerWrapper<?, ?>> nextWrappers;
/**
* 存储强依赖于自己的wrapper集合
*/
private Set<WorkerWrapper<?, ?>> selfIsMustSet;
private boolean needCheckNextWrapperResult = true;
public Builder<W, C> worker(IWorker<W, C> worker) {
this.worker = worker;
return this;
}
public Builder<W, C> param(W w) {
this.param = w;
return this;
}
public Builder<W, C> needCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
return this;
}
public Builder<W, C> callback(ICallback<W, C> callback) {
this.callback = callback;
return this;
}
public Builder<W, C> next(WorkerWrapper<?, ?> wrapper) {
return next(wrapper, true);
}
public Builder<W, C> next(WorkerWrapper<?, ?> wrapper, boolean selfIsMust) {
if (nextWrappers == null) {
nextWrappers = new ArrayList<>();
}
nextWrappers.add(wrapper);
//强依赖自己
if (selfIsMust) {
if (selfIsMustSet == null) {
selfIsMustSet = new HashSet<>();
}
selfIsMustSet.add(wrapper);
}
return this;
}
public Builder<W, C> next(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
next(wrapper, true);
}
return this;
}
public WorkerWrapper<W, C> build() {
WorkerWrapper<W, C> wrapper = new WorkerWrapper<>(worker, param, callback);
wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult);
wrapper.setNextWrappers(nextWrappers);
if (nextWrappers != null && nextWrappers.size() > 0) {
for (WorkerWrapper<?, ?> workerWrapper : nextWrappers) {
if (selfIsMustSet != null) {
workerWrapper.addDepend(wrapper, selfIsMustSet.contains(workerWrapper));
}
}
}
return wrapper;
}
}
} }

View File

@ -19,17 +19,29 @@ public class Test {
DeWorker1 w1 = new DeWorker1(); DeWorker1 w1 = new DeWorker1();
DeWorker2 w2 = new DeWorker2(); DeWorker2 w2 = new DeWorker2();
WorkerWrapper<String, User> workerWrapper = new WorkerWrapper<>(w, "0", w); WorkerWrapper<WorkResult<User>, String> workerWrapper2 = new WorkerWrapper.Builder<WorkResult<User>, String>()
.worker(w2)
.callback(w2)
.build();
WorkerWrapper<WorkResult<User>, User> workerWrapper1 = new WorkerWrapper.Builder<WorkResult<User>, User>()
.worker(w1)
.callback(w1)
.next(workerWrapper2)
.build();
WorkerWrapper<String, User> workerWrapper = new WorkerWrapper.Builder<String, User>()
.worker(w)
.param("0")
.next(workerWrapper1)
.callback(w)
.build();
//虽然尚未执行但是也可以先取得结果的引用作为下一个任务的入参 //虽然尚未执行但是也可以先取得结果的引用作为下一个任务的入参
WorkResult<User> result = workerWrapper.getWorkResult(); WorkResult<User> result = workerWrapper.getWorkResult();
WorkerWrapper<WorkResult<User>, User> workerWrapper1 = new WorkerWrapper<>(w1, result, w1);
WorkResult<User> result1 = workerWrapper1.getWorkResult(); WorkResult<User> result1 = workerWrapper1.getWorkResult();
WorkerWrapper<WorkResult<User>, String> workerWrapper2 = new WorkerWrapper<>(w2, result1, w2); workerWrapper1.setParam(result);
workerWrapper2.setParam(result1);
workerWrapper.addNext(workerWrapper1);
workerWrapper1.addNext(workerWrapper2);
Async.beginWork(3500, workerWrapper); Async.beginWork(3500, workerWrapper);

View File

@ -16,18 +16,16 @@ import java.util.concurrent.ExecutionException;
public class TestPar { public class TestPar {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// testNormal(); // testNormal();
// testMulti(); // testMulti();
// testMultiError();
// testMultiError2(); // testMultiError2();
// testMulti3(); // testMulti3();
// testMulti4(); testMulti4();
// testMulti5(); // testMulti5();
// testMulti6(); // testMulti6();
// testMulti7(); // testMulti7();
// testMulti8(); // testMulti8();
testMulti9(); // testMulti9();
} }
/** /**
@ -38,9 +36,24 @@ public class TestPar {
ParWorker1 w1 = new ParWorker1(); ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2(); ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w); WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); .worker(w2)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); .callback(w2)
.param("2")
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.build();
long now = SystemClock.now(); long now = SystemClock.now();
System.out.println("begin-" + now); System.out.println("begin-" + now);
@ -53,7 +66,6 @@ public class TestPar {
System.out.println(Async.getThreadCount()); System.out.println(Async.getThreadCount());
System.out.println(workerWrapper.getWorkResult()); System.out.println(workerWrapper.getWorkResult());
// System.out.println(getThreadCount());
Async.shutDown(); Async.shutDown();
} }
@ -67,11 +79,24 @@ public class TestPar {
ParWorker1 w1 = new ParWorker1(); ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2(); ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w); WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); .worker(w2)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); .callback(w2)
.param("2")
.build();
workerWrapper.addNext(workerWrapper1); WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1)
.build();
long now = SystemClock.now(); long now = SystemClock.now();
System.out.println("begin-" + now); System.out.println("begin-" + now);
@ -84,8 +109,9 @@ public class TestPar {
Async.shutDown(); Async.shutDown();
} }
/** /**
* 0,2同时开启,1在0后面. 1超时 * 0,2同时开启,1在0后面. 组超时,则0和2成功,1失败
* 0---1 * 0---1
* 2 * 2
*/ */
@ -94,38 +120,24 @@ public class TestPar {
ParWorker1 w1 = new ParWorker1(); ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2(); ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w); WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); .worker(w2)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); .callback(w2)
.param("2")
.build();
workerWrapper.addNext(workerWrapper1); WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.build();
long now = SystemClock.now(); WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
System.out.println("begin-" + now); .worker(w)
.callback(w)
Async.beginWork(1500, workerWrapper, workerWrapper2); .param("0")
.next(workerWrapper1)
System.out.println("end-" + SystemClock.now()); .build();
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
/**
* 0,2同时开启,1在0后面. 组超时,则0和2成功,1失败
* 0---1
* 2
*/
private static void testMultiError2() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
workerWrapper.addNext(workerWrapper1);
long now = SystemClock.now(); long now = SystemClock.now();
System.out.println("begin-" + now); System.out.println("begin-" + now);
@ -150,20 +162,39 @@ public class TestPar {
ParWorker2 w2 = new ParWorker2(); ParWorker2 w2 = new ParWorker2();
ParWorker3 w3 = new ParWorker3(); ParWorker3 w3 = new ParWorker3();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w); WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); .worker(w3)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); .callback(w3)
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3); .param("3")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1, workerWrapper2)
.build();
workerWrapper.addNext(workerWrapper1, workerWrapper2);
workerWrapper1.addNext(workerWrapper3);
workerWrapper2.addNext(workerWrapper3);
long now = SystemClock.now(); long now = SystemClock.now();
System.out.println("begin-" + now); System.out.println("begin-" + now);
// Async.beginWork(3100, workerWrapper); Async.beginWork(3100, workerWrapper);
Async.beginWork(2100, workerWrapper); // Async.beginWork(2100, workerWrapper);
System.out.println("end-" + SystemClock.now()); System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now)); System.err.println("cost-" + (SystemClock.now() - now));
@ -177,22 +208,44 @@ public class TestPar {
* 1 * 1
* 0 3 * 0 3
* 2 * 2
*
* 执行结果0123
*/ */
private static void testMulti4() throws ExecutionException, InterruptedException { private static void testMulti4() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker(); ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1(); ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2(); ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(2000); w2.setSleepTime(2000);
ParWorker3 w3 = new ParWorker3(); ParWorker3 w3 = new ParWorker3();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w); WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); .worker(w3)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); .callback(w3)
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3); .param("3")
.build();
workerWrapper.addNext(workerWrapper1, workerWrapper2); WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
workerWrapper1.addNext(workerWrapper3); .worker(w2)
workerWrapper2.addNext(workerWrapper3); .callback(w2)
.param("2")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1, workerWrapper2)
.build();
long now = SystemClock.now(); long now = SystemClock.now();
System.out.println("begin-" + now); System.out.println("begin-" + now);
@ -224,20 +277,39 @@ public class TestPar {
private static void testMulti5() throws ExecutionException, InterruptedException { private static void testMulti5() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker(); ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1(); ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2(); ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(500); w2.setSleepTime(500);
ParWorker3 w3 = new ParWorker3(); ParWorker3 w3 = new ParWorker3();
w3.setSleepTime(400); w3.setSleepTime(400);
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w); WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); .worker(w3)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); .callback(w3)
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3); .param("3")
.build();
workerWrapper.addNext(workerWrapper1, workerWrapper2); WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
workerWrapper1.addNext(workerWrapper3); .worker(w2)
workerWrapper2.addNext(workerWrapper3); .callback(w2)
workerWrapper3.setDependNotMust(workerWrapper1, workerWrapper2); .param("2")
.next(workerWrapper3, false)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper3, false)
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1, workerWrapper2)
.build();
long now = SystemClock.now(); long now = SystemClock.now();
System.out.println("begin-" + now); System.out.println("begin-" + now);
@ -260,32 +332,53 @@ public class TestPar {
* *
* 则结果是 * 则结果是
* 0213 * 0213
*
* 23分别是500400.2执行完了1没完那就等着1完毕才能3 * 23分别是500400.2执行完了1没完那就等着1完毕才能3
*/ */
private static void testMulti6() throws ExecutionException, InterruptedException { private static void testMulti6() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker(); ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1(); ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2(); ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(500); w2.setSleepTime(500);
ParWorker3 w3 = new ParWorker3(); ParWorker3 w3 = new ParWorker3();
w3.setSleepTime(400); w3.setSleepTime(400);
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w); WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); .worker(w3)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); .callback(w3)
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3); .param("3")
.build();
workerWrapper.addNext(workerWrapper1, workerWrapper2);
workerWrapper1.addNext(workerWrapper3);
workerWrapper2.addNext(workerWrapper3);
//设置2不是必须1是必须的 //设置2不是必须1是必须的
workerWrapper3.setDependNotMust(workerWrapper2); WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper0 = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper2, workerWrapper1)
.build();
long now = SystemClock.now(); long now = SystemClock.now();
System.out.println("begin-" + now); System.out.println("begin-" + now);
//正常完毕 //正常完毕
Async.beginWork(4100, workerWrapper); Async.beginWork(4100, workerWrapper0);
System.out.println("end-" + SystemClock.now()); System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now)); System.err.println("cost-" + (SystemClock.now() - now));
@ -319,32 +412,72 @@ public class TestPar {
ParWorker3 w3 = new ParWorker3(); ParWorker3 w3 = new ParWorker3();
ParWorker4 w4 = new ParWorker4(); ParWorker4 w4 = new ParWorker4();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w); WorkerWrapper<String, String> workerWrapper4 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper0 = new WorkerWrapper<>(w, "00", w); .worker(w4)
.callback(w4)
.param("4")
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper11 = new WorkerWrapper<>(w1, "11", w1); .worker(w3)
.callback(w3)
.param("3")
.next(workerWrapper4)
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); //下面的2
WorkerWrapper<String, String> workerWrapper22 = new WorkerWrapper<>(w2, "22", w2); WorkerWrapper<String, String> workerWrapper22 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("22")
.next(workerWrapper4)
.build();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3); //下面的1
WorkerWrapper<String, String> workerWrapper4 = new WorkerWrapper<>(w3, "4", w4); WorkerWrapper<String, String> workerWrapper11 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("11")
.next(workerWrapper22)
.build();
workerWrapper.addNext(workerWrapper1, workerWrapper2); //下面的0
workerWrapper1.addNext(workerWrapper3); WorkerWrapper<String, String> workerWrapper00 = new WorkerWrapper.Builder<String, String>()
workerWrapper2.addNext(workerWrapper3); .worker(w)
workerWrapper3.addNext(workerWrapper4); .callback(w)
.param("00")
.next(workerWrapper11)
.build();
workerWrapper0.addNext(workerWrapper11); //上面的1
workerWrapper11.addNext(workerWrapper22); WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
workerWrapper22.addNext(workerWrapper4); .worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper3)
.build();
//上面的2
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.next(workerWrapper3)
.build();
//上面的0
WorkerWrapper<String, String> workerWrapper0 = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1, workerWrapper2)
.build();
long now = SystemClock.now(); long now = SystemClock.now();
System.out.println("begin-" + now); System.out.println("begin-" + now);
//正常完毕 //正常完毕
Async.beginWork(4100, workerWrapper, workerWrapper0); Async.beginWork(4100, workerWrapper00, workerWrapper0);
System.out.println("end-" + SystemClock.now()); System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now)); System.err.println("cost-" + (SystemClock.now() - now));
@ -356,27 +489,47 @@ public class TestPar {
/** /**
* a1 -> b -> c * a1 -> b -> c
* a2 -> b -> c * a2 -> b -> c
*
* bc
*/ */
private static void testMulti8() throws ExecutionException, InterruptedException { private static void testMulti8() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker(); ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1(); ParWorker1 w1 = new ParWorker1();
// w1.setSleepTime(1005); w1.setSleepTime(1005);
ParWorker2 w2 = new ParWorker2(); ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(3000); w2.setSleepTime(3000);
ParWorker3 w3 = new ParWorker3(); ParWorker3 w3 = new ParWorker3();
w3.setSleepTime(1000); w3.setSleepTime(1000);
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "a1", w); WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "a2", w1); .worker(w3)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "b", w2); .callback(w3)
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "c", w3); .param("c")
workerWrapper.addNext(workerWrapper2); .build();
workerWrapper1.addNext(workerWrapper2);
workerWrapper2.addNext(workerWrapper3); WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("b")
.next(workerWrapper3)
.build();
Async.beginWork(6000, workerWrapper, workerWrapper1); WorkerWrapper<String, String> workerWrappera1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("a1")
.next(workerWrapper2)
.build();
WorkerWrapper<String, String> workerWrappera2 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("a2")
.next(workerWrapper2)
.build();
Async.beginWork(6000, workerWrappera1, workerWrappera2);
Async.shutDown(); Async.shutDown();
} }
@ -396,22 +549,41 @@ public class TestPar {
ParWorker3 w3 = new ParWorker3(); ParWorker3 w3 = new ParWorker3();
ParWorker4 w4 = new ParWorker4(); ParWorker4 w4 = new ParWorker4();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "w", w); WorkerWrapper<String, String> last = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "w1", w1); .worker(w1)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "w2", w2); .callback(w1)
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "w3", w3); .param("last")
WorkerWrapper<String, String> last = new WorkerWrapper<>(w3, "last", w4); .build();
workerWrapper1.addNext(workerWrapper2); WorkerWrapper<String, String> wrapperW = new WorkerWrapper.Builder<String, String>()
workerWrapper2.addNext(workerWrapper3); .worker(w)
workerWrapper3.addNext(last); .callback(w)
.param("w")
.next(last, false)
.build();
workerWrapper.addNext(last); WorkerWrapper<String, String> wrapperW3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("w3")
.next(last, false)
.build();
last.setDependNotMust(workerWrapper); WorkerWrapper<String, String> wrapperW2 = new WorkerWrapper.Builder<String, String>()
last.setDependNotMust(workerWrapper3); .worker(w2)
.callback(w2)
.param("w2")
.next(wrapperW3)
.build();
Async.beginWork(6000, workerWrapper, workerWrapper1); WorkerWrapper<String, String> wrapperW1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("w1")
.next(wrapperW2)
.build();
Async.beginWork(6000, wrapperW, wrapperW1);
Async.shutDown(); Async.shutDown();
} }
} }

View File

@ -12,27 +12,37 @@ import java.util.concurrent.ExecutionException;
* @author wuweifeng wrote on 2019-11-20. * @author wuweifeng wrote on 2019-11-20.
*/ */
public class TestSequential { public class TestSequential {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException, ExecutionException {
SeqWorker w = new SeqWorker(); SeqWorker w = new SeqWorker();
SeqWorker1 w1 = new SeqWorker1(); SeqWorker1 w1 = new SeqWorker1();
SeqWorker2 w2 = new SeqWorker2(); SeqWorker2 w2 = new SeqWorker2();
SeqTimeoutWorker t = new SeqTimeoutWorker(); //顺序0-1-2
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w); WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); .worker(w2)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); .callback(w2)
.param("2")
.build();
//1在0后面串行 WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
workerWrapper.addNext(workerWrapper1); .worker(w1)
//2在1后面串行 .callback(w1)
workerWrapper1.addNext(workerWrapper2); .param("1")
.next(workerWrapper2)
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1)
.build();
// testNormal(workerWrapper); // testNormal(workerWrapper);
// testGroupTimeout(workerWrapper);
testGroupTimeout(workerWrapper);
} }
private static void testNormal(WorkerWrapper<String, String> workerWrapper) throws ExecutionException, InterruptedException { private static void testNormal(WorkerWrapper<String, String> workerWrapper) throws ExecutionException, InterruptedException {

View File

@ -14,9 +14,7 @@ import java.util.concurrent.ExecutionException;
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
public class TestSequentialTimeout { public class TestSequentialTimeout {
public static void main(String[] args) throws InterruptedException, ExecutionException { public static void main(String[] args) throws InterruptedException, ExecutionException {
// testFirstTimeout(); testFirstTimeout();
testSecondTimeout();
} }
/** /**
@ -32,14 +30,29 @@ public class TestSequentialTimeout {
SeqWorker2 w2 = new SeqWorker2(); SeqWorker2 w2 = new SeqWorker2();
SeqTimeoutWorker t = new SeqTimeoutWorker(); SeqTimeoutWorker t = new SeqTimeoutWorker();
WorkerWrapper<String, String> workerWrapperT = new WorkerWrapper<>(t, "t", t); WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); .worker(w2)
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); .callback(w2)
.param("2")
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper2)
.build();
//2在1后面串行 //2在1后面串行
workerWrapper1.addNext(workerWrapper2);
//T会超时 //T会超时
workerWrapperT.addNext(workerWrapper1); WorkerWrapper<String, String> workerWrapperT = new WorkerWrapper.Builder<String, String>()
.worker(t)
.callback(t)
.param("t")
.next(workerWrapper1)
.build();
long now = SystemClock.now(); long now = SystemClock.now();
System.out.println("begin-" + now); System.out.println("begin-" + now);
@ -51,38 +64,4 @@ public class TestSequentialTimeout {
Async.shutDown(); Async.shutDown();
} }
/**
* begin-1576719842504
* callback worker0 success--1576719843571----result = 1576719843570---param = t from 0-threadName:Thread-0
* callback worker1 failure--1576719844376----worker1--default-threadName:main
* callback worker2 failure--1576719844376----worker2--default-threadName:main
* end-1576719844376
* cost-1872
*/
private static void testSecondTimeout() throws ExecutionException, InterruptedException {
SeqTimeoutWorker t = new SeqTimeoutWorker();
//让1超时
SeqWorker1 w1 = new SeqWorker1();
SeqWorker2 w2 = new SeqWorker2();
WorkerWrapper<String, String> workerWrapperT = new WorkerWrapper<>(t, "t", t);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
//2在1后面串行
workerWrapper1.addNext(workerWrapper2);
//T会超时
workerWrapperT.addNext(workerWrapper1);
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(5000, workerWrapperT);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
} }