diff --git a/src/main/java/com/tianyalei/async/group/WorkerWrapper.java b/src/main/java/com/tianyalei/async/group/WorkerWrapper.java index 61a3e7f..404ab44 100755 --- a/src/main/java/com/tianyalei/async/group/WorkerWrapper.java +++ b/src/main/java/com/tianyalei/async/group/WorkerWrapper.java @@ -1,6 +1,5 @@ package com.tianyalei.async.group; - import com.tianyalei.async.callback.DefaultCallback; import com.tianyalei.async.callback.ICallback; import com.tianyalei.async.callback.IWorker; @@ -56,7 +55,7 @@ public class WorkerWrapper { /** * 也是个钩子变量,用来存临时的结果 */ - private volatile WorkResult workResult; + private volatile WorkResult workResult = WorkResult.defaultResult(); private static final int FINISH = 1; private static final int ERROR = 2; @@ -205,7 +204,7 @@ public class WorkerWrapper { WorkerWrapper workerWrapper = dependWrapper.getDependWrapper(); WorkResult tempWorkResult = workerWrapper.getWorkResult(); //为null或者isWorking,说明它依赖的某个任务还没执行到或没执行完 - if (tempWorkResult == null || workerWrapper.getState() == WORKING) { + if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) { existNoFinish = true; break; } @@ -276,7 +275,8 @@ public class WorkerWrapper { return false; } - if (workResult == null) { + //尚未处理过结果 + if (checkIsNullResult()) { if (e == null) { workResult = defaultResult(); } else { @@ -293,7 +293,7 @@ public class WorkerWrapper { */ private WorkResult workerDoJob() { //避免重复执行 - if (workResult != null) { + if (!checkIsNullResult()) { return workResult; } try { @@ -307,20 +307,20 @@ public class WorkerWrapper { //执行耗时操作 V resultValue = worker.action(getParam()); - WorkResult tempResult = new WorkResult<>(resultValue, ResultState.SUCCESS); - //如果状态不是在working,说明别的地方已经修改了 if (!compareAndSetState(WORKING, FINISH)) { return workResult; } + + workResult.setResultState(ResultState.SUCCESS); + workResult.setResult(resultValue); //回调成功 - callback.result(true, getParam(), tempResult); - workResult = tempResult; + callback.result(true, getParam(), workResult); return workResult; } catch (Exception e) { //避免重复回调 - if (workResult != null) { + if (!checkIsNullResult()) { return workResult; } fastFail(WORKING, e); @@ -328,6 +328,10 @@ public class WorkerWrapper { } } + private boolean checkIsNullResult() { + return ResultState.DEFAULT == workResult.getResultState(); + } + public WorkerWrapper addNext(WorkerWrapper... nextWrappers) { if (nextWrappers == null) { @@ -385,11 +389,16 @@ public class WorkerWrapper { } private WorkResult defaultResult() { - return new WorkResult<>(getWorker().defaultValue(), ResultState.TIMEOUT); + workResult.setResultState(ResultState.TIMEOUT); + workResult.setResult(getWorker().defaultValue()); + return workResult; } private WorkResult defaultExResult(Exception ex) { - return new WorkResult<>(getWorker().defaultValue(), ResultState.EXCEPTION, ex); + workResult.setResultState(ResultState.EXCEPTION); + workResult.setResult(getWorker().defaultValue()); + workResult.setEx(ex); + return workResult; } private WorkResult getNoneNullWorkResult() { diff --git a/src/main/java/com/tianyalei/async/worker/ResultState.java b/src/main/java/com/tianyalei/async/worker/ResultState.java index 002dda1..c6208a2 100755 --- a/src/main/java/com/tianyalei/async/worker/ResultState.java +++ b/src/main/java/com/tianyalei/async/worker/ResultState.java @@ -7,5 +7,6 @@ package com.tianyalei.async.worker; public enum ResultState { SUCCESS, TIMEOUT, - EXCEPTION + EXCEPTION, + DEFAULT //默认状态 } diff --git a/src/main/java/com/tianyalei/async/worker/WorkResult.java b/src/main/java/com/tianyalei/async/worker/WorkResult.java index ca425e0..40d04f5 100755 --- a/src/main/java/com/tianyalei/async/worker/WorkResult.java +++ b/src/main/java/com/tianyalei/async/worker/WorkResult.java @@ -25,7 +25,7 @@ public class WorkResult { } public static WorkResult defaultResult() { - return new WorkResult<>(null, ResultState.TIMEOUT); + return new WorkResult<>(null, ResultState.DEFAULT); } @Override diff --git a/src/main/java/com/tianyalei/test/depend/ParWorker.java b/src/main/java/com/tianyalei/test/depend/ParWorker.java new file mode 100755 index 0000000..6f2355a --- /dev/null +++ b/src/main/java/com/tianyalei/test/depend/ParWorker.java @@ -0,0 +1,40 @@ +package com.tianyalei.test.depend; + + +import com.tianyalei.async.callback.ICallback; +import com.tianyalei.async.callback.IWorker; +import com.tianyalei.async.executor.timer.SystemClock; +import com.tianyalei.async.worker.WorkResult; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker implements IWorker, ICallback { + + @Override + public User action(String object) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new User("user0"); + } + + + @Override + public User defaultValue() { + return new User("default User"); + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("worker0 的结果是:" + workResult.getResult()); + } + +} diff --git a/src/main/java/com/tianyalei/test/depend/ParWorker1.java b/src/main/java/com/tianyalei/test/depend/ParWorker1.java new file mode 100755 index 0000000..c700906 --- /dev/null +++ b/src/main/java/com/tianyalei/test/depend/ParWorker1.java @@ -0,0 +1,40 @@ +package com.tianyalei.test.depend; + + +import com.tianyalei.async.callback.ICallback; +import com.tianyalei.async.callback.IWorker; +import com.tianyalei.async.worker.WorkResult; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker1 implements IWorker, User>, ICallback, User> { + + @Override + public User action(WorkResult result) { + System.out.println("par1的入参来自于par0: " + result.getResult()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new User("user1"); + } + + + @Override + public User defaultValue() { + return new User("default User"); + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, WorkResult param, WorkResult workResult) { + System.out.println("worker1 的结果是:" + workResult.getResult()); + } + +} diff --git a/src/main/java/com/tianyalei/test/depend/ParWorker2.java b/src/main/java/com/tianyalei/test/depend/ParWorker2.java new file mode 100755 index 0000000..20c5dde --- /dev/null +++ b/src/main/java/com/tianyalei/test/depend/ParWorker2.java @@ -0,0 +1,40 @@ +package com.tianyalei.test.depend; + + +import com.tianyalei.async.callback.ICallback; +import com.tianyalei.async.callback.IWorker; +import com.tianyalei.async.worker.WorkResult; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker2 implements IWorker, String>, ICallback, String> { + + @Override + public String action(WorkResult result) { + System.out.println("par2的入参来自于par1: " + result.getResult()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return result.getResult().getName(); + } + + + @Override + public String defaultValue() { + return "default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, WorkResult param, WorkResult workResult) { + System.out.println("worker2 的结果是:" + workResult.getResult()); + } + +} diff --git a/src/main/java/com/tianyalei/test/depend/Test.java b/src/main/java/com/tianyalei/test/depend/Test.java new file mode 100644 index 0000000..32cf9c9 --- /dev/null +++ b/src/main/java/com/tianyalei/test/depend/Test.java @@ -0,0 +1,39 @@ +package com.tianyalei.test.depend; + +import com.tianyalei.async.executor.Async; +import com.tianyalei.async.group.WorkerWrapper; +import com.tianyalei.async.worker.WorkResult; + +import java.util.concurrent.ExecutionException; + + +/** + * 后面请求依赖于前面请求的执行结果 + * @author wuweifeng wrote on 2019-12-26 + * @version 1.0 + */ +public class Test { + + public static void main(String[] args) throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); + + WorkerWrapper workerWrapper = new WorkerWrapper<>(w, "0", w); + //虽然尚未执行,但是也可以先取得结果的引用,作为下一个任务的入参 + WorkResult result = workerWrapper.getWorkResult(); + + WorkerWrapper, User> workerWrapper1 = new WorkerWrapper<>(w1, result, w1); + WorkResult result1 = workerWrapper1.getWorkResult(); + + WorkerWrapper, String> workerWrapper2 = new WorkerWrapper<>(w2, result1, w2); + + workerWrapper.addNext(workerWrapper1); + workerWrapper1.addNext(workerWrapper2); + + Async.beginWork(3500, workerWrapper); + + System.out.println(workerWrapper2.getWorkResult()); + Async.shutDown(); + } +} diff --git a/src/main/java/com/tianyalei/test/depend/User.java b/src/main/java/com/tianyalei/test/depend/User.java new file mode 100644 index 0000000..b0be501 --- /dev/null +++ b/src/main/java/com/tianyalei/test/depend/User.java @@ -0,0 +1,29 @@ +package com.tianyalei.test.depend; + +/** + * 一个包装类 + * @author wuweifeng wrote on 2019-12-26 + * @version 1.0 + */ +public class User { + private String name; + + public User(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return "User{" + + "name='" + name + '\'' + + '}'; + } +} diff --git a/src/main/java/com/tianyalei/test/parallel/TestPar.java b/src/main/java/com/tianyalei/test/parallel/TestPar.java index 7791608..d609b37 100755 --- a/src/main/java/com/tianyalei/test/parallel/TestPar.java +++ b/src/main/java/com/tianyalei/test/parallel/TestPar.java @@ -10,7 +10,7 @@ import java.util.concurrent.ExecutionException; import static com.tianyalei.async.executor.Async.getThreadCount; /** - * 串行测试 + * 并行测试 * * @author wuweifeng wrote on 2019-11-20. */