From a430e19206288878d6a5b34cc13300d918119c72 Mon Sep 17 00:00:00 2001 From: klaokai <573984425@qq.com> Date: Fri, 28 Apr 2023 11:27:31 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=B0=86=E6=A3=80=E6=9F=A5=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=94=BE=E5=88=B0=E5=93=88=E5=B8=8C=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E8=BD=AE=EF=BC=8C=E5=87=8F=E5=B0=91=E4=B8=80=E4=B8=AA=E6=A3=80?= =?UTF-8?q?=E6=9F=A5=E7=BA=BF=E7=A8=8B=E7=9A=84=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jd/platform/async/executor/Async.java | 19 +++++----- .../executor/ExecutorServiceWrapper.java | 36 +++++++++---------- .../async/executor/PollingCenter.java | 4 +-- .../jd/platform/async/worker/OnceWork.java | 27 +++++++++++++- .../platform/async/wrapper/WorkerWrapper.java | 18 +++++----- .../src/test/java/v15/cases/Case15.java | 3 +- .../src/test/java/v15/cases/Case16.java | 15 ++++---- 7 files changed, 70 insertions(+), 52 deletions(-) diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index 054a949..db18159 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -117,17 +117,16 @@ public class Async { Async.lastExecutorService.set(Objects.requireNonNull(executorService, "ExecutorService is null ! ")); final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout); group.addWrapper(workerWrappers); - final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId); ExecutorServiceWrapper executorServiceWrapper = new ExecutorServiceWrapper(executorService); + final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId, executorServiceWrapper); //有多少个开始节点就有多少个线程,依赖任务靠被依赖任务的线程完成工作 workerWrappers.forEach(wrapper -> { if (wrapper == null) { return; } - executorServiceWrapper.addThreadSubmit(new TaskCallable(wrapper, timeout, group, executorServiceWrapper)); + onceWork.addThreadSubmit(new TaskCallable(wrapper, timeout, group, onceWork)); }); - executorServiceWrapper.startCheck(onceWork); return onceWork; } @@ -333,29 +332,29 @@ public class Async { private final WorkerWrapper wrapper; - private final ExecutorServiceWrapper executorServiceWrapper; - private final WorkerWrapper workerWrapper; - public TaskCallable(WorkerWrapper wrapper, long timeout, WorkerWrapperGroup group, ExecutorServiceWrapper executorServiceWrapper) { + private final OnceWork onceWork; + + public TaskCallable(WorkerWrapper wrapper, long timeout, WorkerWrapperGroup group, OnceWork onceWork) { this.wrapper = wrapper; this.group = group; this.timeout = timeout; - this.executorServiceWrapper = executorServiceWrapper; + this.onceWork = onceWork; this.workerWrapper = null; } - public TaskCallable(WorkerWrapper wrapper, long timeout, WorkerWrapperGroup group, ExecutorServiceWrapper executorService, WorkerWrapper workerWrapper) { + public TaskCallable(WorkerWrapper wrapper, long timeout, WorkerWrapperGroup group, OnceWork onceWork, WorkerWrapper workerWrapper) { this.wrapper = wrapper; this.group = group; this.timeout = timeout; - this.executorServiceWrapper = executorService; + this.onceWork = onceWork; this.workerWrapper = workerWrapper; } @Override public BigDecimal call() throws Exception { - wrapper.work(executorServiceWrapper, this.workerWrapper, timeout, group); + wrapper.work(onceWork, this.workerWrapper, timeout, group); return BigDecimal.ZERO; } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/ExecutorServiceWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/ExecutorServiceWrapper.java index 4ebb4a3..94b2030 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/ExecutorServiceWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/ExecutorServiceWrapper.java @@ -1,5 +1,7 @@ package com.jd.platform.async.executor; +import com.jd.platform.async.openutil.timer.Timeout; +import com.jd.platform.async.openutil.timer.TimerTask; import com.jd.platform.async.worker.OnceWork; import java.util.concurrent.*; @@ -26,33 +28,26 @@ public class ExecutorServiceWrapper { allThreadSubmit.add(executorService.submit(callable)); } - public void startCheck(final OnceWork.Impl onceWork) { - executorService.execute(new ThreadCheckRunable(onceWork, this)); + public void startCheck(final OnceWork onceWork) { + PollingCenter.getInstance().checkGroup(new ThreadCheckRunable(onceWork, this), 3000); } - private static class ThreadCheckRunable implements Runnable { + private static class ThreadCheckRunable implements TimerTask { - private final OnceWork.Impl onceWork; + private final OnceWork onceWork; private final ExecutorServiceWrapper executorServiceWrapper; - public ThreadCheckRunable(OnceWork.Impl onceWork, ExecutorServiceWrapper executorServiceWrapper) { + public ThreadCheckRunable(OnceWork onceWork, ExecutorServiceWrapper executorServiceWrapper) { this.onceWork = onceWork; this.executorServiceWrapper = executorServiceWrapper; } @Override - public void run() { - while (true) { - try { - TimeUnit.MILLISECONDS.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - //任务结束就退出检查 - if (onceWork.isFinish()) { - break; - } else if (executorServiceWrapper.getAllThreadSubmit().size() > 0) { + public void run(Timeout timeout) throws Exception { + //任务结束就退出检查 + if (!onceWork.isFinish()) { + if (executorServiceWrapper.getAllThreadSubmit().size() > 0) { boolean isException = false; boolean isCancelld = false; boolean isDone = false; @@ -68,9 +63,11 @@ public class ExecutorServiceWrapper { } catch (InterruptedException e) { //中断等 e.printStackTrace(); + System.out.println("出现中断" + e); isException = true; } catch (ExecutionException e) { //内存溢出等 + System.out.println("出现内存溢出等" + e); e.printStackTrace(); isException = true; } catch (TimeoutException e) { @@ -87,7 +84,6 @@ public class ExecutorServiceWrapper { || onceWork.isWaitingCancel())) { onceWork.pleaseCancel(); } - break; } else { if (isDone) { System.out.println("部分任务已经在线程池完成"); @@ -96,12 +92,14 @@ public class ExecutorServiceWrapper { onceWork.check(); } } else { - //FIXME 高强度检查会不会造成检查线程过多? onceWork.check(); } + } else { + System.out.println("任务已完成"); } } } - } + + diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java index 2dc1829..7a305e9 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java @@ -50,11 +50,11 @@ public class PollingCenter { // ========== fields and methods ========== - public void checkGroup(WorkerWrapperGroup.CheckFinishTask task) { + public void checkGroup(TimerTask task) { checkGroup(task, 0); } - public void checkGroup(WorkerWrapperGroup.CheckFinishTask task, long daley) { + public void checkGroup(TimerTask task, long daley) { timer.newTimeout(task, daley, TimeUnit.MILLISECONDS); } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java index 50fcff6..5423985 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java @@ -1,5 +1,7 @@ package com.jd.platform.async.worker; +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.ExecutorServiceWrapper; import com.jd.platform.async.executor.PollingCenter; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.wrapper.WorkerWrapper; @@ -150,6 +152,10 @@ public interface OnceWork { return new AsFuture(this, sleepCheckInterval); } + void check(); + + void addThreadSubmit(Async.TaskCallable taskCallable); + // class class AsFuture implements Future>> { @@ -281,9 +287,12 @@ public interface OnceWork { protected final WorkerWrapperGroup group; - public Impl(WorkerWrapperGroup group, String workId) { + private final ExecutorServiceWrapper executorServiceWrapper; + + public Impl(WorkerWrapperGroup group, String workId, ExecutorServiceWrapper executorServiceWrapper) { super(workId); this.group = group; + this.executorServiceWrapper = executorServiceWrapper; } @Override @@ -336,9 +345,16 @@ public interface OnceWork { } } + @Override public void check() { //发起检查,看看所有是否取消完毕 PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); + executorServiceWrapper.startCheck(this); + } + + @Override + public void addThreadSubmit(Async.TaskCallable taskCallable) { + executorServiceWrapper.addThreadSubmit(taskCallable); } } @@ -395,6 +411,15 @@ public interface OnceWork { public void pleaseCancel() { } + @Override + public void check() { + } + + @Override + public void addThreadSubmit(Async.TaskCallable taskCallable) { + // do nothing + } + @Override public String toString() { return "(it's empty work)"; diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index fff1ab4..89e298c 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -7,9 +7,9 @@ import com.jd.platform.async.exception.CancelException; import com.jd.platform.async.exception.EndsNormallyException; import com.jd.platform.async.exception.SkippedException; import com.jd.platform.async.executor.Async; -import com.jd.platform.async.executor.ExecutorServiceWrapper; import com.jd.platform.async.executor.PollingCenter; import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.OnceWork; import com.jd.platform.async.worker.ResultState; import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.wrapper.strategy.WrapperStrategy; @@ -150,7 +150,7 @@ public abstract class WorkerWrapper { * @param group wrapper组 * @throws IllegalStateException 当wrapper正在building状态时被启动,则会抛出该异常。 */ - public void work(ExecutorServiceWrapper executorService, + public void work(OnceWork.Impl executorService, long remainTime, WorkerWrapperGroup group) { work(executorService, null, remainTime, group); @@ -277,7 +277,7 @@ public abstract class WorkerWrapper { * @param remainTime 剩余时间。 * @throws IllegalStateException 当wrapper正在building状态时被启动,则会抛出该异常。 */ - public void work(ExecutorServiceWrapper executorService, + public void work(OnceWork work, WorkerWrapper fromWrapper, long remainTime, WorkerWrapperGroup group @@ -300,7 +300,7 @@ public abstract class WorkerWrapper { final Consumer __function__callbackResultOfFalse_beginNext = (success) -> { __function__callbackResult.accept(success); - beginNext(executorService, now, remainTime, group); + beginNext(work, now, remainTime, group); }; final BiConsumer __function__fastFail_callbackResult$false_beginNext = (fastFail_isTimeout, fastFail_exception) -> { @@ -387,7 +387,7 @@ public abstract class WorkerWrapper { wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper); switch (judge.getDependenceAction()) { case TAKE_REST: - PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); + work.check(); return; case FAST_FAIL: if (setState(state, STARTED, ERROR)) { @@ -471,7 +471,7 @@ public abstract class WorkerWrapper { *

* 本方法不负责校验状态。请在调用前自行检验 */ - protected void beginNext(ExecutorServiceWrapper executorService, long now, long remainTime, WorkerWrapperGroup group) { + protected void beginNext(OnceWork onceWork, long now, long remainTime, WorkerWrapperGroup group) { //花费的时间 final long costTime = SystemClock.now() - now; final long nextRemainTIme = remainTime - costTime; @@ -490,7 +490,7 @@ public abstract class WorkerWrapper { } finally { PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); if (next != null) { - next.work(executorService, this, nextRemainTIme, group); + next.work(onceWork, this, nextRemainTIme, group); } } } @@ -498,8 +498,8 @@ public abstract class WorkerWrapper { else { try { group.addWrapper(nextWrappers); - nextWrappers.forEach(next -> executorService.addThreadSubmit( - new Async.TaskCallable(next, nextRemainTIme, group, executorService, this) + nextWrappers.forEach(next -> onceWork.addThreadSubmit( + new Async.TaskCallable(next, nextRemainTIme, group, onceWork, this) )); setState(state, AFTER_WORK, SUCCESS); } finally { diff --git a/asyncTool-core/src/test/java/v15/cases/Case15.java b/asyncTool-core/src/test/java/v15/cases/Case15.java index ef89848..d8d569d 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case15.java +++ b/asyncTool-core/src/test/java/v15/cases/Case15.java @@ -143,8 +143,7 @@ class Case15 { public String action(String param, Map> allWrappers) { if ("F".equals(id)) { while (true) { - System.out.println("wrapper(id=" + id + ") is working"); - System.out.println("I am alive:" + i++); + System.out.print(id + " I am alive:" + i++ + "."); /* 第一种问题,内存溢出OOM,由系统取消任务执行,H的结果为{result=null, resultState=DEFAULT, ex=null},因为没有跑到H,所以H的结果为null diff --git a/asyncTool-core/src/test/java/v15/cases/Case16.java b/asyncTool-core/src/test/java/v15/cases/Case16.java index f86fac1..1924a50 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case16.java +++ b/asyncTool-core/src/test/java/v15/cases/Case16.java @@ -24,15 +24,12 @@ class Case16 { private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 30L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(10), - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - System.out.println("Task " + r.toString() + - " rejected from " + - e.toString()); - if (!e.isShutdown()) { - r.run(); - } + (r, e) -> { + System.out.println("Task " + r.toString() + + " rejected from " + + e.toString()); + if (!e.isShutdown()) { + r.run(); } });