From 67e5082293dcf065079c1d53c369e4911399e78b Mon Sep 17 00:00:00 2001 From: wuweifeng10 Date: Wed, 19 Feb 2020 18:37:56 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E8=A6=81=E6=9B=B4=E6=96=B0=EF=BC=8C?= =?UTF-8?q?=E9=87=87=E7=94=A8builder=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../platform/async/wrapper/WorkerWrapper.java | 226 +++++----- .../com/jd/platform/test/depend/Test.java | 26 +- .../jd/platform/test/parallel/TestPar.java | 398 +++++++++++++----- .../jd/platform/test/seq/TestSequential.java | 30 +- .../test/seq/TestSequentialTimeout.java | 63 +-- 5 files changed, 455 insertions(+), 288 deletions(-) diff --git a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index 9cc59ec..c0bb419 100755 --- a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -73,7 +73,7 @@ public class WorkerWrapper { private static final int WORKING = 3; private static final int INIT = 0; - public WorkerWrapper(IWorker worker, T param, ICallback callback) { + private WorkerWrapper(IWorker worker, T param, ICallback callback) { if (worker == null) { throw new NullPointerException("async.worker is null"); } @@ -267,7 +267,6 @@ public class WorkerWrapper { return; } - System.out.println(Thread.currentThread().getName() + "---" + existNoFinish); //如果上游都没有失败,分为两种情况,一种是都finish了,一种是有的在working //都finish的话 if (!existNoFinish) { @@ -305,7 +304,7 @@ public class WorkerWrapper { } } - callback.result(false, getParam(), workResult); + callback.result(false, param, workResult); return true; } @@ -326,7 +325,7 @@ public class WorkerWrapper { callback.begin(); //执行耗时操作 - V resultValue = worker.action(getParam()); + V resultValue = worker.action(param); //如果状态不是在working,说明别的地方已经修改了 if (!compareAndSetState(WORKING, FINISH)) { @@ -336,7 +335,7 @@ public class WorkerWrapper { workResult.setResultState(ResultState.SUCCESS); workResult.setResult(resultValue); //回调成功 - callback.result(true, getParam(), workResult); + callback.result(true, param, workResult); return workResult; } catch (Exception e) { @@ -349,70 +348,22 @@ public class WorkerWrapper { } } + public WorkResult getWorkResult() { + return workResult; + } + + public List> getNextWrappers() { + return nextWrappers; + } + + public void setParam(T param) { + this.param = param; + } + private boolean checkIsNullResult() { return ResultState.DEFAULT == workResult.getResultState(); } - - public WorkerWrapper addNext(WorkerWrapper... nextWrappers) { - if (nextWrappers == null) { - return this; - } - for (WorkerWrapper workerWrapper : nextWrappers) { - addNext(workerWrapper); - } - return this; - } - - public WorkerWrapper addNext(IWorker worker, T param, ICallback callback) { - WorkerWrapper workerWrapper = new WorkerWrapper<>(worker, param, callback); - return addNext(workerWrapper); - } - - public WorkerWrapper addNext(WorkerWrapper workerWrapper) { - if (nextWrappers == null) { - nextWrappers = new ArrayList<>(); - } - nextWrappers.add(workerWrapper); - workerWrapper.addDepend(this); - return this; - } - - /** - * 直接set Next - */ - public WorkerWrapper setNext(WorkerWrapper... workerWrapper) { - if (nextWrappers != null) { - nextWrappers.clear(); - } - - return addNext(workerWrapper); - } - - /** - * 设置这几个依赖的wrapper不是must执行完毕才能执行自己 - */ - public void setDependNotMust(WorkerWrapper... workerWrapper) { - if (dependWrappers == null) { - return; - } - if (workerWrapper == null) { - return; - } - for (DependWrapper dependWrapper : dependWrappers) { - for (WorkerWrapper wrapper : workerWrapper) { - if (dependWrapper.getDependWrapper().equals(wrapper)) { - dependWrapper.setMust(false); - } - } - } - } - - - private void addDepend(WorkerWrapper workerWrapper) { - this.addDepend(workerWrapper, true); - } - private void addDepend(WorkerWrapper workerWrapper, boolean must) { if (dependWrappers == null) { dependWrappers = new ArrayList<>(); @@ -428,73 +379,116 @@ public class WorkerWrapper { private WorkResult defaultResult() { workResult.setResultState(ResultState.TIMEOUT); - workResult.setResult(getWorker().defaultValue()); + workResult.setResult(worker.defaultValue()); return workResult; } private WorkResult defaultExResult(Exception ex) { workResult.setResultState(ResultState.EXCEPTION); - workResult.setResult(getWorker().defaultValue()); + workResult.setResult(worker.defaultValue()); workResult.setEx(ex); return workResult; } - private WorkResult getNoneNullWorkResult() { - if (workResult == null) { - return defaultResult(); - } - return workResult; - } - public T getParam() { - return param; - } - - public IWorker getWorker() { - return worker; - } - - public ICallback getCallback() { - return callback; - } - - public List> getNextWrappers() { - return nextWrappers; - } - - public void setNextWrappers(List> nextWrappers) { - this.nextWrappers = nextWrappers; - } - - public List getDependWrappers() { - return dependWrappers; - } - - public void setDependWrappers(List dependWrappers) { - this.dependWrappers = dependWrappers; - } - - public int getState() { + private int getState() { return state.get(); } - public boolean compareAndSetState(int expect, int update) { + private boolean compareAndSetState(int expect, int update) { return this.state.compareAndSet(expect, update); } - public WorkResult getWorkResult() { - return workResult; - } - - public void setWorkResult(WorkResult workResult) { - this.workResult = workResult; - } - - public boolean isNeedCheckNextWrapperResult() { - return needCheckNextWrapperResult; - } - - public void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) { + private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) { this.needCheckNextWrapperResult = needCheckNextWrapperResult; } + + private void setNextWrappers(List> wrappers) { + this.nextWrappers = wrappers; + } + + + public static class Builder { + /** + * worker将来要处理的param + */ + private W param; + private IWorker worker; + private ICallback callback; + /** + * 自己后面的所有 + */ + private List> nextWrappers; + /** + * 存储强依赖于自己的wrapper集合 + */ + private Set> selfIsMustSet; + + private boolean needCheckNextWrapperResult = true; + + public Builder worker(IWorker worker) { + this.worker = worker; + return this; + } + + public Builder param(W w) { + this.param = w; + return this; + } + + public Builder needCheckNextWrapperResult(boolean needCheckNextWrapperResult) { + this.needCheckNextWrapperResult = needCheckNextWrapperResult; + return this; + } + + public Builder callback(ICallback callback) { + this.callback = callback; + return this; + } + + public Builder next(WorkerWrapper wrapper) { + return next(wrapper, true); + } + + public Builder next(WorkerWrapper wrapper, boolean selfIsMust) { + if (nextWrappers == null) { + nextWrappers = new ArrayList<>(); + } + nextWrappers.add(wrapper); + + //强依赖自己 + if (selfIsMust) { + if (selfIsMustSet == null) { + selfIsMustSet = new HashSet<>(); + } + selfIsMustSet.add(wrapper); + } + return this; + } + + public Builder next(WorkerWrapper... wrappers) { + if (wrappers == null) { + return this; + } + for (WorkerWrapper wrapper : wrappers) { + next(wrapper, true); + } + return this; + } + + + public WorkerWrapper build() { + WorkerWrapper wrapper = new WorkerWrapper<>(worker, param, callback); + wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult); + wrapper.setNextWrappers(nextWrappers); + if (nextWrappers != null && nextWrappers.size() > 0) { + for (WorkerWrapper workerWrapper : nextWrappers) { + if (selfIsMustSet != null) { + workerWrapper.addDepend(wrapper, selfIsMustSet.contains(workerWrapper)); + } + } + } + return wrapper; + } + } } diff --git a/src/main/java/com/jd/platform/test/depend/Test.java b/src/main/java/com/jd/platform/test/depend/Test.java index c196929..6f24069 100644 --- a/src/main/java/com/jd/platform/test/depend/Test.java +++ b/src/main/java/com/jd/platform/test/depend/Test.java @@ -19,17 +19,29 @@ public class Test { DeWorker1 w1 = new DeWorker1(); DeWorker2 w2 = new DeWorker2(); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); + WorkerWrapper, String> workerWrapper2 = new WorkerWrapper.Builder, String>() + .worker(w2) + .callback(w2) + .build(); + + WorkerWrapper, User> workerWrapper1 = new WorkerWrapper.Builder, User>() + .worker(w1) + .callback(w1) + .next(workerWrapper2) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .param("0") + .next(workerWrapper1) + .callback(w) + .build(); //虽然尚未执行,但是也可以先取得结果的引用,作为下一个任务的入参 WorkResult result = workerWrapper.getWorkResult(); - - WorkerWrapper, User> workerWrapper1 = new WorkerWrapper<>(w1, result, w1); WorkResult result1 = workerWrapper1.getWorkResult(); - WorkerWrapper, String> workerWrapper2 = new WorkerWrapper<>(w2, result1, w2); - - workerWrapper.addNext(workerWrapper1); - workerWrapper1.addNext(workerWrapper2); + workerWrapper1.setParam(result); + workerWrapper2.setParam(result1); Async.beginWork(3500, workerWrapper); diff --git a/src/main/java/com/jd/platform/test/parallel/TestPar.java b/src/main/java/com/jd/platform/test/parallel/TestPar.java index 08b0388..1a36ac6 100755 --- a/src/main/java/com/jd/platform/test/parallel/TestPar.java +++ b/src/main/java/com/jd/platform/test/parallel/TestPar.java @@ -16,18 +16,16 @@ import java.util.concurrent.ExecutionException; public class TestPar { public static void main(String[] args) throws Exception { - // testNormal(); // testMulti(); -// testMultiError(); // testMultiError2(); // testMulti3(); -// testMulti4(); + testMulti4(); // testMulti5(); // testMulti6(); // testMulti7(); // testMulti8(); - testMulti9(); +// testMulti9(); } /** @@ -38,9 +36,24 @@ public class TestPar { ParWorker1 w1 = new ParWorker1(); ParWorker2 w2 = new ParWorker2(); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .build(); + long now = SystemClock.now(); System.out.println("begin-" + now); @@ -53,7 +66,6 @@ public class TestPar { System.out.println(Async.getThreadCount()); System.out.println(workerWrapper.getWorkResult()); -// System.out.println(getThreadCount()); Async.shutDown(); } @@ -67,11 +79,24 @@ public class TestPar { ParWorker1 w1 = new ParWorker1(); ParWorker2 w2 = new ParWorker2(); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); - workerWrapper.addNext(workerWrapper1); + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1) + .build(); long now = SystemClock.now(); System.out.println("begin-" + now); @@ -84,8 +109,9 @@ public class TestPar { Async.shutDown(); } + /** - * 0,2同时开启,1在0后面. 1超时 + * 0,2同时开启,1在0后面. 组超时,则0和2成功,1失败 * 0---1 * 2 */ @@ -94,38 +120,24 @@ public class TestPar { ParWorker1 w1 = new ParWorker1(); ParWorker2 w2 = new ParWorker2(); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); - workerWrapper.addNext(workerWrapper1); + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .build(); - long now = SystemClock.now(); - System.out.println("begin-" + now); - - Async.beginWork(1500, workerWrapper, workerWrapper2); - - System.out.println("end-" + SystemClock.now()); - System.err.println("cost-" + (SystemClock.now() - now)); - - Async.shutDown(); - } - - /** - * 0,2同时开启,1在0后面. 组超时,则0和2成功,1失败 - * 0---1 - * 2 - */ - private static void testMultiError2() throws ExecutionException, InterruptedException { - ParWorker w = new ParWorker(); - ParWorker1 w1 = new ParWorker1(); - ParWorker2 w2 = new ParWorker2(); - - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); - - workerWrapper.addNext(workerWrapper1); + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1) + .build(); long now = SystemClock.now(); System.out.println("begin-" + now); @@ -150,20 +162,39 @@ public class TestPar { ParWorker2 w2 = new ParWorker2(); ParWorker3 w3 = new ParWorker3(); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); - WorkerWrapper workerWrapper3 = new WorkerWrapper<>(w3, "3", w3); + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1, workerWrapper2) + .build(); - workerWrapper.addNext(workerWrapper1, workerWrapper2); - workerWrapper1.addNext(workerWrapper3); - workerWrapper2.addNext(workerWrapper3); long now = SystemClock.now(); System.out.println("begin-" + now); -// Async.beginWork(3100, workerWrapper); - Async.beginWork(2100, workerWrapper); + Async.beginWork(3100, workerWrapper); +// Async.beginWork(2100, workerWrapper); System.out.println("end-" + SystemClock.now()); System.err.println("cost-" + (SystemClock.now() - now)); @@ -177,22 +208,44 @@ public class TestPar { * 1 * 0 3 * 2 + * + * 执行结果0,1,2,3 */ private static void testMulti4() throws ExecutionException, InterruptedException { ParWorker w = new ParWorker(); ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); w2.setSleepTime(2000); + ParWorker3 w3 = new ParWorker3(); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); - WorkerWrapper workerWrapper3 = new WorkerWrapper<>(w3, "3", w3); + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); - workerWrapper.addNext(workerWrapper1, workerWrapper2); - workerWrapper1.addNext(workerWrapper3); - workerWrapper2.addNext(workerWrapper3); + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1, workerWrapper2) + .build(); long now = SystemClock.now(); System.out.println("begin-" + now); @@ -224,20 +277,39 @@ public class TestPar { private static void testMulti5() throws ExecutionException, InterruptedException { ParWorker w = new ParWorker(); ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); w2.setSleepTime(500); + ParWorker3 w3 = new ParWorker3(); w3.setSleepTime(400); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); - WorkerWrapper workerWrapper3 = new WorkerWrapper<>(w3, "3", w3); + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); - workerWrapper.addNext(workerWrapper1, workerWrapper2); - workerWrapper1.addNext(workerWrapper3); - workerWrapper2.addNext(workerWrapper3); - workerWrapper3.setDependNotMust(workerWrapper1, workerWrapper2); + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .next(workerWrapper3, false) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper3, false) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1, workerWrapper2) + .build(); long now = SystemClock.now(); System.out.println("begin-" + now); @@ -260,32 +332,53 @@ public class TestPar { * * 则结果是: * 0,2,1,3 + * * 2,3分别是500、400.2执行完了,1没完,那就等着1完毕,才能3 */ private static void testMulti6() throws ExecutionException, InterruptedException { ParWorker w = new ParWorker(); ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); w2.setSleepTime(500); + ParWorker3 w3 = new ParWorker3(); w3.setSleepTime(400); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); - WorkerWrapper workerWrapper3 = new WorkerWrapper<>(w3, "3", w3); + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); - workerWrapper.addNext(workerWrapper1, workerWrapper2); - workerWrapper1.addNext(workerWrapper3); - workerWrapper2.addNext(workerWrapper3); //设置2不是必须,1是必须的 - workerWrapper3.setDependNotMust(workerWrapper2); + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper0 = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper2, workerWrapper1) + .build(); + long now = SystemClock.now(); System.out.println("begin-" + now); //正常完毕 - Async.beginWork(4100, workerWrapper); + Async.beginWork(4100, workerWrapper0); System.out.println("end-" + SystemClock.now()); System.err.println("cost-" + (SystemClock.now() - now)); @@ -319,32 +412,72 @@ public class TestPar { ParWorker3 w3 = new ParWorker3(); ParWorker4 w4 = new ParWorker4(); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); - WorkerWrapper workerWrapper0 = new WorkerWrapper<>(w, "00", w); + WorkerWrapper workerWrapper4 = new WorkerWrapper.Builder() + .worker(w4) + .callback(w4) + .param("4") + .build(); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper11 = new WorkerWrapper<>(w1, "11", w1); + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .next(workerWrapper4) + .build(); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); - WorkerWrapper workerWrapper22 = new WorkerWrapper<>(w2, "22", w2); + //下面的2 + WorkerWrapper workerWrapper22 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("22") + .next(workerWrapper4) + .build(); - WorkerWrapper workerWrapper3 = new WorkerWrapper<>(w3, "3", w3); - WorkerWrapper workerWrapper4 = new WorkerWrapper<>(w3, "4", w4); + //下面的1 + WorkerWrapper workerWrapper11 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("11") + .next(workerWrapper22) + .build(); - workerWrapper.addNext(workerWrapper1, workerWrapper2); - workerWrapper1.addNext(workerWrapper3); - workerWrapper2.addNext(workerWrapper3); - workerWrapper3.addNext(workerWrapper4); + //下面的0 + WorkerWrapper workerWrapper00 = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("00") + .next(workerWrapper11) + .build(); - workerWrapper0.addNext(workerWrapper11); - workerWrapper11.addNext(workerWrapper22); - workerWrapper22.addNext(workerWrapper4); + //上面的1 + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper3) + .build(); + + //上面的2 + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .next(workerWrapper3) + .build(); + + //上面的0 + WorkerWrapper workerWrapper0 = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1, workerWrapper2) + .build(); long now = SystemClock.now(); System.out.println("begin-" + now); //正常完毕 - Async.beginWork(4100, workerWrapper, workerWrapper0); + Async.beginWork(4100, workerWrapper00, workerWrapper0); System.out.println("end-" + SystemClock.now()); System.err.println("cost-" + (SystemClock.now() - now)); @@ -356,27 +489,47 @@ public class TestPar { /** * a1 -> b -> c * a2 -> b -> c + * + * b、c */ private static void testMulti8() throws ExecutionException, InterruptedException { ParWorker w = new ParWorker(); ParWorker1 w1 = new ParWorker1(); -// w1.setSleepTime(1005); + w1.setSleepTime(1005); ParWorker2 w2 = new ParWorker2(); w2.setSleepTime(3000); ParWorker3 w3 = new ParWorker3(); w3.setSleepTime(1000); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "a1", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "a2", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "b", w2); - WorkerWrapper workerWrapper3 = new WorkerWrapper<>(w3, "c", w3); - workerWrapper.addNext(workerWrapper2); - workerWrapper1.addNext(workerWrapper2); + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("c") + .build(); - workerWrapper2.addNext(workerWrapper3); + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("b") + .next(workerWrapper3) + .build(); - Async.beginWork(6000, workerWrapper, workerWrapper1); + WorkerWrapper workerWrappera1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("a1") + .next(workerWrapper2) + .build(); + WorkerWrapper workerWrappera2 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("a2") + .next(workerWrapper2) + .build(); + + + Async.beginWork(6000, workerWrappera1, workerWrappera2); Async.shutDown(); } @@ -396,22 +549,41 @@ public class TestPar { ParWorker3 w3 = new ParWorker3(); ParWorker4 w4 = new ParWorker4(); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "w", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "w1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "w2", w2); - WorkerWrapper workerWrapper3 = new WorkerWrapper<>(w3, "w3", w3); - WorkerWrapper last = new WorkerWrapper<>(w3, "last", w4); + WorkerWrapper last = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("last") + .build(); - workerWrapper1.addNext(workerWrapper2); - workerWrapper2.addNext(workerWrapper3); - workerWrapper3.addNext(last); + WorkerWrapper wrapperW = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("w") + .next(last, false) + .build(); - workerWrapper.addNext(last); + WorkerWrapper wrapperW3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("w3") + .next(last, false) + .build(); - last.setDependNotMust(workerWrapper); - last.setDependNotMust(workerWrapper3); + WorkerWrapper wrapperW2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("w2") + .next(wrapperW3) + .build(); - Async.beginWork(6000, workerWrapper, workerWrapper1); + WorkerWrapper wrapperW1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("w1") + .next(wrapperW2) + .build(); + + Async.beginWork(6000, wrapperW, wrapperW1); Async.shutDown(); } } diff --git a/src/main/java/com/jd/platform/test/seq/TestSequential.java b/src/main/java/com/jd/platform/test/seq/TestSequential.java index 5e3e148..3e04aff 100755 --- a/src/main/java/com/jd/platform/test/seq/TestSequential.java +++ b/src/main/java/com/jd/platform/test/seq/TestSequential.java @@ -12,27 +12,37 @@ import java.util.concurrent.ExecutionException; * @author wuweifeng wrote on 2019-11-20. */ public class TestSequential { - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws InterruptedException, ExecutionException { SeqWorker w = new SeqWorker(); SeqWorker1 w1 = new SeqWorker1(); SeqWorker2 w2 = new SeqWorker2(); - SeqTimeoutWorker t = new SeqTimeoutWorker(); - WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); + //顺序0-1-2 + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); - //1在0后面串行 - workerWrapper.addNext(workerWrapper1); - //2在1后面串行 - workerWrapper1.addNext(workerWrapper2); + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper2) + .build(); + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1) + .build(); // testNormal(workerWrapper); -// testGroupTimeout(workerWrapper); + testGroupTimeout(workerWrapper); } private static void testNormal(WorkerWrapper workerWrapper) throws ExecutionException, InterruptedException { diff --git a/src/main/java/com/jd/platform/test/seq/TestSequentialTimeout.java b/src/main/java/com/jd/platform/test/seq/TestSequentialTimeout.java index b730821..b0d32a8 100755 --- a/src/main/java/com/jd/platform/test/seq/TestSequentialTimeout.java +++ b/src/main/java/com/jd/platform/test/seq/TestSequentialTimeout.java @@ -14,9 +14,7 @@ import java.util.concurrent.ExecutionException; @SuppressWarnings("Duplicates") public class TestSequentialTimeout { public static void main(String[] args) throws InterruptedException, ExecutionException { -// testFirstTimeout(); - - testSecondTimeout(); + testFirstTimeout(); } /** @@ -32,14 +30,29 @@ public class TestSequentialTimeout { SeqWorker2 w2 = new SeqWorker2(); SeqTimeoutWorker t = new SeqTimeoutWorker(); - WorkerWrapper workerWrapperT = new WorkerWrapper<>(t, "t", t); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper2) + .build(); //2在1后面串行 - workerWrapper1.addNext(workerWrapper2); //T会超时 - workerWrapperT.addNext(workerWrapper1); + WorkerWrapper workerWrapperT = new WorkerWrapper.Builder() + .worker(t) + .callback(t) + .param("t") + .next(workerWrapper1) + .build(); + + long now = SystemClock.now(); System.out.println("begin-" + now); @@ -51,38 +64,4 @@ public class TestSequentialTimeout { Async.shutDown(); } - /** - * begin-1576719842504 - * callback worker0 success--1576719843571----result = 1576719843570---param = t from 0-threadName:Thread-0 - * callback worker1 failure--1576719844376----worker1--default-threadName:main - * callback worker2 failure--1576719844376----worker2--default-threadName:main - * end-1576719844376 - * cost-1872 - */ - private static void testSecondTimeout() throws ExecutionException, InterruptedException { - SeqTimeoutWorker t = new SeqTimeoutWorker(); - - //让1超时 - SeqWorker1 w1 = new SeqWorker1(); - - SeqWorker2 w2 = new SeqWorker2(); - - WorkerWrapper workerWrapperT = new WorkerWrapper<>(t, "t", t); - WorkerWrapper workerWrapper1 = new WorkerWrapper<>(w1, "1", w1); - WorkerWrapper workerWrapper2 = new WorkerWrapper<>(w2, "2", w2); - - //2在1后面串行 - workerWrapper1.addNext(workerWrapper2); - //T会超时 - workerWrapperT.addNext(workerWrapper1); - long now = SystemClock.now(); - System.out.println("begin-" + now); - - Async.beginWork(5000, workerWrapperT); - - System.out.println("end-" + SystemClock.now()); - System.err.println("cost-" + (SystemClock.now() - now)); - - Async.shutDown(); - } }