!24 修改两处逻辑

Merge pull request !24 from 云开/dev-kyle
This commit is contained in:
tianyaleixiaowu 2023-02-03 08:40:27 +00:00 committed by Gitee
commit 0a994806b8
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 43 additions and 33 deletions

View File

@ -14,7 +14,6 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -28,6 +27,34 @@ public class Async {
// ========================= 任务执行核心代码 ========================= // ========================= 任务执行核心代码 =========================
/**
* 在以前及现在的版本中
* 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时ExecutorService将会被记录下来
* <p/>
* 注意这里是个static也就是只能有一个线程池用户自定义线程池时也只能定义一个
*
* @deprecated 不明意义毫无用处的字段记录之前使用的线程池没啥意义
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
private static final AtomicReference<ExecutorService> lastExecutorService = new AtomicReference<>(null);
/**
* 默认线程池
* <p>
* 在v1.4及之前该COMMON_POLL是被写死的
* <p>
* 自v1.5后
* 该线程池将会在第一次调用{@link #getCommonPool()}时懒加载
* tip:
* 要注意{@link #work(long, WorkerWrapper[])}{@link #work(long, Collection)}这些方法
* 不传入线程池就会默认调用{@link #getCommonPool()}就会初始化线程池
* <p>
* 该线程池将会给线程取名为asyncTool-commonPool-thread-0数字不重复
* </p>
*/
private static volatile ThreadPoolExecutor COMMON_POOL;
/** /**
* {@link #work(long, ExecutorService, Collection, String)}方法的简易封装 * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装
* 使用uuid作为工作id使用{@link #getCommonPool()}作为线程池 * 使用uuid作为工作id使用{@link #getCommonPool()}作为线程池
@ -57,6 +84,8 @@ public class Async {
Objects.requireNonNull(workerWrappers, "workerWrappers array is null"))); Objects.requireNonNull(workerWrappers, "workerWrappers array is null")));
} }
// ========================= 设置/属性选项 =========================
/** /**
* {@link #work(long, ExecutorService, Collection, String)}方法的简易封装 * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装
* 省略工作id使用uuid * 省略工作id使用uuid
@ -74,6 +103,7 @@ public class Async {
* @param executorService 执行线程池 * @param executorService 执行线程池
* @param workerWrappers 任务容器集合 * @param workerWrappers 任务容器集合
* @param workId 本次工作id * @param workId 本次工作id
*
* @return 返回 {@link OnceWork}任务句柄对象 * @return 返回 {@link OnceWork}任务句柄对象
*/ */
public static OnceWork work(long timeout, public static OnceWork work(long timeout,
@ -103,8 +133,11 @@ public class Async {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
//完成或者取消就及时取消任务 //任务结束就退出检查
if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone()|| future.isCancelled())) { if (onceWork.isFinish()) {
break;
} else if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone() || future.isCancelled())) {
//完成或者取消就及时取消任务
if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) { if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) {
onceWork.pleaseCancel(); onceWork.pleaseCancel();
} }
@ -115,36 +148,6 @@ public class Async {
return onceWork; return onceWork;
} }
// ========================= 设置/属性选项 =========================
/**
* 默认线程池
* <p>
* 在v1.4及之前该COMMON_POLL是被写死的
* <p>
* 自v1.5后
* 该线程池将会在第一次调用{@link #getCommonPool()}时懒加载
* tip:
* 要注意{@link #work(long, WorkerWrapper[])}{@link #work(long, Collection)}这些方法
* 不传入线程池就会默认调用{@link #getCommonPool()}就会初始化线程池
* <p>
* 该线程池将会给线程取名为asyncTool-commonPool-thread-0数字不重复
* </p>
*/
private static volatile ThreadPoolExecutor COMMON_POOL;
/**
* 在以前及现在的版本中
* 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时ExecutorService将会被记录下来
* <p/>
* 注意这里是个static也就是只能有一个线程池用户自定义线程池时也只能定义一个
*
* @deprecated 不明意义毫无用处的字段记录之前使用的线程池没啥意义
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
private static final AtomicReference<ExecutorService> lastExecutorService = new AtomicReference<>(null);
/** /**
* 该方法将会返回{@link #COMMON_POOL}如果还未初始化则会懒加载初始化后再返回 * 该方法将会返回{@link #COMMON_POOL}如果还未初始化则会懒加载初始化后再返回
* 详情参考{@link #COMMON_POOL}上的注解 * 详情参考{@link #COMMON_POOL}上的注解
@ -199,6 +202,7 @@ public class Async {
/** /**
* @param now 是否立即关闭 * @param now 是否立即关闭
*
* @return 如果尚未调用过{@link #getCommonPool()}即没有初始化默认线程池返回false否则返回true * @return 如果尚未调用过{@link #getCommonPool()}即没有初始化默认线程池返回false否则返回true
*/ */
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -222,6 +226,7 @@ public class Async {
* 同步执行一次任务 * 同步执行一次任务
* *
* @return 只要执行未超时就返回true * @return 只要执行未超时就返回true
*
* @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代 * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代
*/ */
@Deprecated @Deprecated
@ -330,6 +335,7 @@ public class Async {
* 关闭指定的线程池 * 关闭指定的线程池
* *
* @param executorService 指定的线程池传入null则会关闭默认线程池 * @param executorService 指定的线程池传入null则会关闭默认线程池
*
* @deprecated 没啥用的方法要关闭线程池还不如直接调用线程池的关闭方法避免歧义 * @deprecated 没啥用的方法要关闭线程池还不如直接调用线程池的关闭方法避免歧义
*/ */
@Deprecated @Deprecated
@ -338,4 +344,5 @@ public class Async {
executorService.shutdown(); executorService.shutdown();
} }
} }
} }

View File

@ -235,6 +235,9 @@ public interface WorkerWrapperBuilder<T, V> {
return setNext().wrapper(wrappers).end(); return setNext().wrapper(wrappers).end();
} }
default WorkerWrapperBuilder<T, V> nextOf2(Collection<WorkerWrapper<?,?>> wrappers) {
return setNext().wrapper(wrappers).end();
}
/** /**
* 设置超时时间的具体属性 * 设置超时时间的具体属性
*/ */