From 12e29c5ee8922dea2af64b2b5c31decdeb2996c2 Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Wed, 1 Feb 2023 20:42:38 +0800 Subject: [PATCH 1/3] =?UTF-8?q?refactor:=20=E6=B7=BB=E5=8A=A0=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E6=B3=9B=E5=9E=8B=E6=8E=A5=E6=94=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jd/platform/async/wrapper/WorkerWrapperBuilder.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java index 36e7c56..c997054 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java @@ -235,6 +235,9 @@ public interface WorkerWrapperBuilder { return setNext().wrapper(wrappers).end(); } + default WorkerWrapperBuilder nextOf2(Collection> wrappers) { + return setNext().wrapper(wrappers).end(); + } /** * 设置超时时间的具体属性 */ From 5efb329f9cefb0c32af7659886febdade1f54134 Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Thu, 2 Feb 2023 08:55:02 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20=E5=B7=A5=E4=BD=9C=E7=BB=93=E6=9D=9F?= =?UTF-8?q?=E5=B0=B1=E4=B8=8D=E6=A3=80=E6=9F=A5=E5=90=84=E4=B8=AA=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=98=AF=E5=90=A6=E5=B7=B2=E7=BB=8F=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=92=8C=E5=8F=96=E6=B6=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jd/platform/async/executor/Async.java | 73 +++++++++++-------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index fa72efc..9299efc 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -14,7 +14,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -28,6 +27,34 @@ public class Async { // ========================= 任务执行核心代码 ========================= + /** + * 在以前(及现在)的版本中: + * 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时,ExecutorService将会被记录下来。 + *

+ * 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个 + * + * @deprecated 不明意义、毫无用处的字段。记录之前使用的线程池没啥意义。 + */ + @SuppressWarnings("DeprecatedIsStillUsed") + @Deprecated + private static final AtomicReference lastExecutorService = new AtomicReference<>(null); + + /** + * 默认线程池。 + *

+ * 在v1.4及之前,该COMMON_POLL是被写死的。 + *

+ * 自v1.5后: + * 该线程池将会在第一次调用{@link #getCommonPool()}时懒加载。 + * tip: + * 要注意,{@link #work(long, WorkerWrapper[])}、{@link #work(long, Collection)}这些方法, + * 不传入线程池就会默认调用{@link #getCommonPool()},就会初始化线程池。 + *

+ * 该线程池将会给线程取名为asyncTool-commonPool-thread-0(数字不重复)。 + *

+ */ + private static volatile ThreadPoolExecutor COMMON_POOL; + /** * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。 * 使用uuid作为工作id。使用{@link #getCommonPool()}作为线程池。 @@ -57,6 +84,8 @@ public class Async { Objects.requireNonNull(workerWrappers, "workerWrappers array is null"))); } + // ========================= 设置/属性选项 ========================= + /** * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。 * 省略工作id,使用uuid。 @@ -74,6 +103,7 @@ public class Async { * @param executorService 执行线程池 * @param workerWrappers 任务容器集合 * @param workId 本次工作id + * * @return 返回 {@link OnceWork}任务句柄对象。 */ public static OnceWork work(long timeout, @@ -103,8 +133,13 @@ public class Async { } catch (InterruptedException e) { e.printStackTrace(); } + //任务结束就退出检查 + if (onceWork.isFinish()) { + break; + } //完成或者取消就及时取消任务 - if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone()|| future.isCancelled())) { + if (!onceWork.isFinish() + && onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone() || future.isCancelled())) { if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) { onceWork.pleaseCancel(); } @@ -115,36 +150,6 @@ public class Async { return onceWork; } - // ========================= 设置/属性选项 ========================= - - /** - * 默认线程池。 - *

- * 在v1.4及之前,该COMMON_POLL是被写死的。 - *

- * 自v1.5后: - * 该线程池将会在第一次调用{@link #getCommonPool()}时懒加载。 - * tip: - * 要注意,{@link #work(long, WorkerWrapper[])}、{@link #work(long, Collection)}这些方法, - * 不传入线程池就会默认调用{@link #getCommonPool()},就会初始化线程池。 - *

- * 该线程池将会给线程取名为asyncTool-commonPool-thread-0(数字不重复)。 - *

- */ - private static volatile ThreadPoolExecutor COMMON_POOL; - - /** - * 在以前(及现在)的版本中: - * 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时,ExecutorService将会被记录下来。 - *

- * 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个 - * - * @deprecated 不明意义、毫无用处的字段。记录之前使用的线程池没啥意义。 - */ - @SuppressWarnings("DeprecatedIsStillUsed") - @Deprecated - private static final AtomicReference lastExecutorService = new AtomicReference<>(null); - /** * 该方法将会返回{@link #COMMON_POOL},如果还未初始化则会懒加载初始化后再返回。 * 详情参考{@link #COMMON_POOL}上的注解 @@ -199,6 +204,7 @@ public class Async { /** * @param now 是否立即关闭 + * * @return 如果尚未调用过{@link #getCommonPool()},即没有初始化默认线程池,返回false。否则返回true。 */ @SuppressWarnings("unused") @@ -222,6 +228,7 @@ public class Async { * 同步执行一次任务。 * * @return 只要执行未超时,就返回true。 + * * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 */ @Deprecated @@ -330,6 +337,7 @@ public class Async { * 关闭指定的线程池 * * @param executorService 指定的线程池。传入null则会关闭默认线程池。 + * * @deprecated 没啥用的方法,要关闭线程池还不如直接调用线程池的关闭方法,避免歧义。 */ @Deprecated @@ -338,4 +346,5 @@ public class Async { executorService.shutdown(); } } + } From 6b546bc7a4ed76dac25d33a41a753b8680bd5ee1 Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Thu, 2 Feb 2023 09:00:36 +0800 Subject: [PATCH 3/3] =?UTF-8?q?chore:=20=E7=AE=80=E5=8C=96=E5=88=A4?= =?UTF-8?q?=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/jd/platform/async/executor/Async.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index 9299efc..7c66de9 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -136,10 +136,8 @@ public class Async { //任务结束就退出检查 if (onceWork.isFinish()) { break; - } - //完成或者取消就及时取消任务 - if (!onceWork.isFinish() - && onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone() || future.isCancelled())) { + } else if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone() || future.isCancelled())) { + //完成或者取消就及时取消任务 if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) { onceWork.pleaseCancel(); }