添加新功能。在执行前去校验,自己的nextWrapper是否已经执行完毕了

This commit is contained in:
wuweifeng10 2020-02-18 16:35:24 +08:00
parent fe279d9e2b
commit 7d5cdfaef9
3 changed files with 86 additions and 21 deletions

View File

@ -0,0 +1,16 @@
package com.tianyalei.async.exception;
/**
* 如果任务在执行之前自己后面的任务已经执行完或正在被执行则抛该exception
* @author wuweifeng wrote on 2020-02-18
* @version 1.0
*/
public class SkippedException extends RuntimeException {
public SkippedException() {
super();
}
public SkippedException(String message) {
super(message);
}
}

View File

@ -3,6 +3,7 @@ package com.tianyalei.async.wrapper;
import com.tianyalei.async.callback.DefaultCallback; import com.tianyalei.async.callback.DefaultCallback;
import com.tianyalei.async.callback.ICallback; import com.tianyalei.async.callback.ICallback;
import com.tianyalei.async.callback.IWorker; import com.tianyalei.async.callback.IWorker;
import com.tianyalei.async.exception.SkippedException;
import com.tianyalei.async.executor.timer.SystemClock; import com.tianyalei.async.executor.timer.SystemClock;
import com.tianyalei.async.worker.DependWrapper; import com.tianyalei.async.worker.DependWrapper;
import com.tianyalei.async.worker.ResultState; import com.tianyalei.async.worker.ResultState;
@ -59,18 +60,17 @@ public class WorkerWrapper<T, V> {
private volatile WorkResult<V> workResult = WorkResult.defaultResult(); private volatile WorkResult<V> workResult = WorkResult.defaultResult();
/** /**
* 是否在执行自己前去校验nextWrapper的执行结果<p> * 是否在执行自己前去校验nextWrapper的执行结果<p>
* 1 * 1 4
* -------3 * -------3
* 2 * 2
* 如这种在2执行前可能3已经执行完毕了被1执行完后触发的那么2就没必要执行了 * 如这种在4执行前可能3已经执行完毕了被2执行完后触发的那么4就没必要执行了
* 注意该属性仅在nextWrapper数量<=1时有效>1时的情况是不存在的 * 注意该属性仅在nextWrapper数量<=1时有效>1时的情况是不存在的
*/ */
private volatile boolean checkNextWrapperResult; private volatile boolean needCheckNextWrapperResult = true;
private static final int FINISH = 1; private static final int FINISH = 1;
private static final int ERROR = 2; private static final int ERROR = 2;
private static final int WORKING = 3; private static final int WORKING = 3;
private static final int SKIPPED = 4;
private static final int INIT = 0; private static final int INIT = 0;
public WorkerWrapper(IWorker<T, V> worker, T param, ICallback<T, V> callback) { public WorkerWrapper(IWorker<T, V> worker, T param, ICallback<T, V> callback) {
@ -106,14 +106,12 @@ public class WorkerWrapper<T, V> {
} }
//如果在执行前需要校验nextWrapper的状态 //如果在执行前需要校验nextWrapper的状态
if (checkNextWrapperResult) { if (needCheckNextWrapperResult) {
if (nextWrappers != null && nextWrappers.size() == 1) { //如果自己的next链上有已经出结果或已经开始执行的任务了自己就不用继续了
WorkerWrapper nextWrapper = nextWrappers.get(0); if (!checkNextWrapperResult()) {
if (nextWrapper.getState() == FINISH || nextWrapper.getState() == ERROR) { fastFail(INIT, new SkippedException());
compareAndSetState(INIT, SKIPPED); beginNext(poolExecutor, now, remainTime);
beginNext(poolExecutor, now, remainTime); return;
return;
}
} }
} }
@ -154,6 +152,21 @@ public class WorkerWrapper<T, V> {
} }
} }
/**
* 判断自己下游链路上是否存在已经出结果的或已经开始执行的
* 如果没有返回true如果有返回false
*/
private boolean checkNextWrapperResult() {
//如果自己就是最后一个或者后面有并行的多个就返回true
if (nextWrappers == null || nextWrappers.size() != 1) {
return getState() == INIT;
}
WorkerWrapper nextWrapper = nextWrappers.get(0);
boolean state = nextWrapper.getState() == INIT;
//继续校验自己的next的状态
return state && nextWrapper.checkNextWrapperResult();
}
/** /**
* 进行下一个任务 * 进行下一个任务
*/ */
@ -254,7 +267,7 @@ public class WorkerWrapper<T, V> {
return; return;
} }
System.out.println(Thread.currentThread().getName()+"---" + existNoFinish); System.out.println(Thread.currentThread().getName() + "---" + existNoFinish);
//如果上游都没有失败分为两种情况一种是都finish了一种是有的在working //如果上游都没有失败分为两种情况一种是都finish了一种是有的在working
//都finish的话 //都finish的话
if (!existNoFinish) { if (!existNoFinish) {
@ -279,7 +292,7 @@ public class WorkerWrapper<T, V> {
private boolean fastFail(int expect, Exception e) { private boolean fastFail(int expect, Exception e) {
//试图将它从expect状态,改成Error //试图将它从expect状态,改成Error
if (!compareAndSetState(expect, ERROR)) { if (!compareAndSetState(expect, ERROR)) {
System.out.println("compareAndSetState----------fail"); // System.out.println("compareAndSetState----------fail");
return false; return false;
} }
@ -477,11 +490,11 @@ public class WorkerWrapper<T, V> {
this.workResult = workResult; this.workResult = workResult;
} }
public boolean isCheckNextWrapperResult() { public boolean isNeedCheckNextWrapperResult() {
return checkNextWrapperResult; return needCheckNextWrapperResult;
} }
public void setCheckNextWrapperResult(boolean checkNextWrapperResult) { public void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
this.checkNextWrapperResult = checkNextWrapperResult; this.needCheckNextWrapperResult = needCheckNextWrapperResult;
} }
} }

View File

@ -28,7 +28,8 @@ public class TestPar {
// testMulti5(); // testMulti5();
// testMulti6(); // testMulti6();
// testMulti7(); // testMulti7();
testMulti8(); // testMulti8();
testMulti9();
} }
/** /**
@ -380,4 +381,39 @@ public class TestPar {
Async.beginWork(6000, workerWrapper, workerWrapper1); Async.beginWork(6000, workerWrapper, workerWrapper1);
Async.shutDown(); Async.shutDown();
} }
/**
* w1 -> w2 -> w3
* --- last
* w
* w1和w并行w执行完后就执行last此时bc还没开始bc就不需要执行了
*/
private static void testMulti9() throws ExecutionException, InterruptedException {
ParWorker1 w1 = new ParWorker1();
//注意这里如果w1的执行时间比w长那么w2和w3肯定不走 如果w1和w执行时间一样长多运行几次会发现w2有时走有时不走
// w1.setSleepTime(1100);
ParWorker w = new ParWorker();
ParWorker2 w2 = new ParWorker2();
ParWorker3 w3 = new ParWorker3();
ParWorker4 w4 = new ParWorker4();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "w", w);
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "w1", w1);
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "w2", w2);
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "w3", w3);
WorkerWrapper<String, String> last = new WorkerWrapper<>(w3, "last", w4);
workerWrapper1.addNext(workerWrapper2);
workerWrapper2.addNext(workerWrapper3);
workerWrapper3.addNext(last);
workerWrapper.addNext(last);
last.setDependNotMust(workerWrapper);
last.setDependNotMust(workerWrapper3);
Async.beginWork(6000, workerWrapper, workerWrapper1);
Async.shutDown();
}
} }