!27 解决OOM情况下,线程不再运行导致的整个AsyncTool死等超时的问题

Merge pull request !27 from 云开/kyle-dev
This commit is contained in:
tianyaleixiaowu 2023-02-20 10:02:05 +00:00 committed by Gitee
commit ea41a63394
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 597 additions and 196 deletions

View File

@ -10,6 +10,7 @@ import com.jd.platform.async.wrapper.WorkerWrapperGroup;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
@ -103,7 +104,6 @@ public class Async {
* @param executorService 执行线程池
* @param workerWrappers 任务容器集合
* @param workId 本次工作id
*
* @return 返回 {@link OnceWork}任务句柄对象
*/
public static OnceWork work(long timeout,
@ -118,33 +118,16 @@ public class Async {
final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout);
group.addWrapper(workerWrappers);
final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId);
ExecutorServiceWrapper executorServiceWrapper = new ExecutorServiceWrapper(executorService);
//有多少个开始节点就有多少个线程依赖任务靠被依赖任务的线程完成工作
workerWrappers.forEach(wrapper -> {
if (wrapper == null) {
return;
}
Future<?> future = executorService.submit(() -> wrapper.work(executorService, timeout, group));
onceWork.getAllThreadSubmit().add(future);
});
executorService.execute(() -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//任务结束就退出检查
if (onceWork.isFinish()) {
break;
} else if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone() || future.isCancelled())) {
//完成或者取消就及时取消任务
if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) {
onceWork.pleaseCancel();
}
break;
}
}
executorServiceWrapper.addThreadSubmit(new TaskCallable(wrapper, timeout, group, executorServiceWrapper));
});
executorServiceWrapper.startCheck(onceWork);
return onceWork;
}
@ -202,7 +185,6 @@ public class Async {
/**
* @param now 是否立即关闭
*
* @return 如果尚未调用过{@link #getCommonPool()}即没有初始化默认线程池返回false否则返回true
*/
@SuppressWarnings("unused")
@ -220,13 +202,10 @@ public class Async {
return true;
}
// ========================= deprecated =========================
/**
* 同步执行一次任务
*
* @return 只要执行未超时就返回true
*
* @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代
*/
@Deprecated
@ -256,6 +235,8 @@ public class Async {
return beginWork(timeout, executorService, workerWrappers);
}
// ========================= deprecated =========================
/**
* 同步阻塞,直到所有都完成,或失败
*
@ -335,7 +316,6 @@ public class Async {
* 关闭指定的线程池
*
* @param executorService 指定的线程池传入null则会关闭默认线程池
*
* @deprecated 没啥用的方法要关闭线程池还不如直接调用线程池的关闭方法避免歧义
*/
@Deprecated
@ -345,4 +325,39 @@ public class Async {
}
}
public static class TaskCallable implements Callable<BigDecimal> {
private final WorkerWrapperGroup group;
private final long timeout;
private final WorkerWrapper<?, ?> wrapper;
private final ExecutorServiceWrapper executorServiceWrapper;
private final WorkerWrapper workerWrapper;
public TaskCallable(WorkerWrapper<?, ?> wrapper, long timeout, WorkerWrapperGroup group, ExecutorServiceWrapper executorServiceWrapper) {
this.wrapper = wrapper;
this.group = group;
this.timeout = timeout;
this.executorServiceWrapper = executorServiceWrapper;
this.workerWrapper = null;
}
public <V, T> TaskCallable(WorkerWrapper<?, ?> wrapper, long timeout, WorkerWrapperGroup group, ExecutorServiceWrapper executorService, WorkerWrapper<?, ?> workerWrapper) {
this.wrapper = wrapper;
this.group = group;
this.timeout = timeout;
this.executorServiceWrapper = executorService;
this.workerWrapper = workerWrapper;
}
@Override
public BigDecimal call() throws Exception {
wrapper.work(executorServiceWrapper, this.workerWrapper, timeout, group);
return BigDecimal.ZERO;
}
}
}

View File

@ -0,0 +1,107 @@
package com.jd.platform.async.executor;
import com.jd.platform.async.worker.OnceWork;
import java.util.concurrent.*;
public class ExecutorServiceWrapper {
private final ExecutorService executorService;
/**
* 本次任务中所有线程提交
*/
protected LinkedBlockingQueue<Future<?>> allThreadSubmit;
public ExecutorServiceWrapper(ExecutorService executorService) {
this.executorService = executorService;
this.allThreadSubmit = new LinkedBlockingQueue<>();
}
public LinkedBlockingQueue<Future<?>> getAllThreadSubmit() {
return allThreadSubmit;
}
public void addThreadSubmit(Async.TaskCallable callable) {
allThreadSubmit.add(executorService.submit(callable));
}
public void startCheck(final OnceWork.Impl onceWork) {
executorService.execute(new ThreadCheckRunable(onceWork, this));
}
private static class ThreadCheckRunable implements Runnable {
private final OnceWork.Impl onceWork;
private final ExecutorServiceWrapper executorServiceWrapper;
public ThreadCheckRunable(OnceWork.Impl onceWork, ExecutorServiceWrapper executorServiceWrapper) {
this.onceWork = onceWork;
this.executorServiceWrapper = executorServiceWrapper;
}
@Override
public void run() {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//任务结束就退出检查
if (onceWork.isFinish()) {
break;
} else if (executorServiceWrapper.getAllThreadSubmit().size() > 0) {
boolean isException = false;
boolean isCancelld = false;
boolean isDone = false;
for (Future<?> item : executorServiceWrapper.getAllThreadSubmit()) {
try {
//完成并且没有返回
if (item.isCancelled()) {
isCancelld = true;
}
if ((item.isDone() && item.get(500, TimeUnit.MILLISECONDS) == null)) {
isDone = true;
}
} catch (InterruptedException e) {
//中断等
e.printStackTrace();
isException = true;
} catch (ExecutionException e) {
//内存溢出等
e.printStackTrace();
isException = true;
} catch (TimeoutException e) {
//超时不管继续检查
}
}
//异常或者有线程取消
if (isException || isCancelld) {
//未超时未完成或者未取消就取消任务
while (!(onceWork.hasTimeout()
|| onceWork.isFinish()
|| onceWork.isCancelled()
|| onceWork.isWaitingCancel())) {
onceWork.pleaseCancel();
}
break;
} else {
if (isDone) {
System.out.println("部分任务已经在线程池完成");
}
//没有的话继续完成
onceWork.check();
}
} else {
//FIXME 高强度检查会不会造成检查线程过多
onceWork.check();
}
}
}
}
}

View File

@ -6,7 +6,10 @@ import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperGroup;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -16,6 +19,14 @@ import java.util.stream.Collectors;
* @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午3:22
*/
public interface OnceWork {
/**
* 空任务
*/
static OnceWork emptyWork(String workId) {
return new EmptyWork(workId);
}
/**
* 返回唯一的workId
*/
@ -126,6 +137,8 @@ public interface OnceWork {
return new AsFuture(this, limitTime -> limitTime / 16);
}
// static
/**
* 返回{@link Future}视图
*
@ -137,19 +150,12 @@ public interface OnceWork {
return new AsFuture(this, sleepCheckInterval);
}
// static
/**
* 空任务
*/
static OnceWork emptyWork(String workId) {
return new EmptyWork(workId);
}
// class
class AsFuture implements Future<Map<String, WorkerWrapper<?, ?>>> {
private final OnceWork onceWork;
private final Function<Long, Long> sleepCheckInterval;
private AsFuture(OnceWork onceWork, Function<Long, Long> sleepCheckInterval) {
@ -217,9 +223,11 @@ public interface OnceWork {
public String toString() {
return "(asFuture from " + onceWork + ")@" + Integer.toHexString(this.hashCode());
}
}
abstract class AbstractOnceWork implements OnceWork {
protected final String workId;
public AbstractOnceWork(String workId) {
@ -266,24 +274,16 @@ public interface OnceWork {
.append(", wrappers::getId=").append(getWrappers().keySet())
.append('}').toString();
}
}
class Impl extends AbstractOnceWork {
protected final WorkerWrapperGroup group;
/**
* 本次任务中所有线程提交
*/
protected List<Future<?>> allThreadSubmit;
public List<Future<?>> getAllThreadSubmit() {
return allThreadSubmit;
}
public Impl(WorkerWrapperGroup group, String workId) {
super(workId);
this.group = group;
allThreadSubmit = new ArrayList<>(group.getForParamUseWrappers().size());
}
@Override
@ -331,13 +331,20 @@ public interface OnceWork {
@Override
public void pleaseCancel() {
group.pleaseCancel();
if (group.pleaseCancel()) {
check();
}
}
public void check() {
//发起检查看看所有是否取消完毕
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
}
}
class EmptyWork extends AbstractOnceWork {
private final long initTime = SystemClock.now();
public EmptyWork(String workId) {
@ -392,5 +399,7 @@ public interface OnceWork {
public String toString() {
return "(it's empty work)";
}
}
}

View File

@ -6,6 +6,8 @@ import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.exception.CancelException;
import com.jd.platform.async.exception.EndsNormallyException;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.executor.ExecutorServiceWrapper;
import com.jd.platform.async.executor.PollingCenter;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.ResultState;
@ -18,7 +20,6 @@ import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy;
import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@ -43,50 +44,60 @@ public abstract class WorkerWrapper<T, V> {
* 该wrapper的唯一标识
*/
protected final String id;
protected final IWorker<T, V> worker;
protected final ICallback<T, V> callback;
/**
* 各种策略的封装类
*/
private final WrapperStrategy wrapperStrategy;
/**
* 是否允许被打断
*/
protected final boolean allowInterrupt;
/**
* 是否启动超时检查
*/
final boolean enableTimeout;
/**
* 超时时间长度
*/
final long timeoutLength;
/**
* 超时时间单位
*/
final TimeUnit timeoutUnit;
// ========== 临时属性 ==========
/**
* worker将来要处理的param
*/
protected volatile T param;
/**
* 原子设置wrapper的状态
* <p>
* {@link State}此枚举类枚举了state值所代表的状态枚举
*/
protected final AtomicInteger state = new AtomicInteger(BUILDING.id);
/**
* 该值将在{@link IWorker#action(Object, Map)}进行时设为当前线程在任务开始前或结束后都为null
*/
protected final AtomicReference<Thread> doWorkingThread = new AtomicReference<>();
/**
* 也是个钩子变量用来存临时的结果
*/
protected final AtomicReference<WorkResult<V>> workResult = new AtomicReference<>(null);
/**
* 是否启动超时检查
*/
final boolean enableTimeout;
// ========== 临时属性 ==========
/**
* 超时时间长度
*/
final long timeoutLength;
/**
* 超时时间单位
*/
final TimeUnit timeoutUnit;
/**
* 各种策略的封装类
*/
private final WrapperStrategy wrapperStrategy;
/**
* worker将来要处理的param
*/
protected volatile T param;
WorkerWrapper(String id,
IWorker<T, V> worker,
ICallback<T, V> callback,
@ -126,6 +137,10 @@ public abstract class WorkerWrapper<T, V> {
// ========== public ==========
public static <T, V> WorkerWrapperBuilder<T, V> builder() {
return new Builder<>();
}
/**
* 外部调用本线程运行此wrapper的入口方法
* 该方法将会确定这组wrapper所属的group
@ -135,7 +150,7 @@ public abstract class WorkerWrapper<T, V> {
* @param group wrapper组
* @throws IllegalStateException 当wrapper正在building状态时被启动则会抛出该异常
*/
public void work(ExecutorService executorService,
public void work(ExecutorServiceWrapper executorService,
long remainTime,
WorkerWrapperGroup group) {
work(executorService, null, remainTime, group);
@ -167,11 +182,17 @@ public abstract class WorkerWrapper<T, V> {
*/
public abstract Set<WorkerWrapper<?, ?>> getNextWrappers();
abstract void setNextWrappers(Set<WorkerWrapper<?, ?>> nextWrappers);
/**
* 获取上游wrapper
*/
public abstract Set<WorkerWrapper<?, ?>> getDependWrappers();
abstract void setDependWrappers(Set<WorkerWrapper<?, ?>> dependWrappers);
// ========== protected ==========
/**
* 获取本wrapper的超时情况如有必要还会修改wrapper状态
*
@ -249,8 +270,6 @@ public abstract class WorkerWrapper<T, V> {
return wrapperStrategy;
}
// ========== protected ==========
/**
* 工作的核心方法
*
@ -258,10 +277,10 @@ public abstract class WorkerWrapper<T, V> {
* @param remainTime 剩余时间
* @throws IllegalStateException 当wrapper正在building状态时被启动则会抛出该异常
*/
protected void work(ExecutorService executorService,
WorkerWrapper<?, ?> fromWrapper,
long remainTime,
WorkerWrapperGroup group
public void work(ExecutorServiceWrapper executorService,
WorkerWrapper<?, ?> fromWrapper,
long remainTime,
WorkerWrapperGroup group
) {
long now = SystemClock.now();
// ================================================
@ -297,7 +316,7 @@ public abstract class WorkerWrapper<T, V> {
if (setState(state, WORKING, AFTER_WORK)) {
__function__callbackResultOfFalse_beginNext.accept(true);
}
}else {
} else {
//如果任务超时需要将最后那个超时任务设置为超时异常结束的
if (setState(state, WORKING, ERROR)) {
__function__fastFail_callbackResult$false_beginNext.accept(true, new TimeoutException());
@ -368,10 +387,7 @@ public abstract class WorkerWrapper<T, V> {
wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper);
switch (judge.getDependenceAction()) {
case TAKE_REST:
//FIXME 等待200毫秒重新投入线程池主要为了调起最后一个任务
Thread.sleep(200L);
executorService.submit(() -> this.work(executorService, fromWrapper,
remainTime - (SystemClock.now() - now), group));
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
return;
case FAST_FAIL:
if (setState(state, STARTED, ERROR)) {
@ -401,11 +417,13 @@ public abstract class WorkerWrapper<T, V> {
}
}
// ========== hashcode and equals ==========
/**
* 本工作线程执行自己的job.
* <p/>
* 本方法不负责校验状态请在调用前自行检验
*
* @return
*/
protected boolean fire(WorkerWrapperGroup group) {
@ -446,12 +464,14 @@ public abstract class WorkerWrapper<T, V> {
));
}
// ========== builder ==========
/**
* 进行下一个任务
* <p/>
* 本方法不负责校验状态请在调用前自行检验
*/
protected void beginNext(ExecutorService executorService, long now, long remainTime, WorkerWrapperGroup group) {
protected void beginNext(ExecutorServiceWrapper executorService, long now, long remainTime, WorkerWrapperGroup group) {
//花费的时间
final long costTime = SystemClock.now() - now;
final long nextRemainTIme = remainTime - costTime;
@ -478,9 +498,9 @@ public abstract class WorkerWrapper<T, V> {
else {
try {
group.addWrapper(nextWrappers);
nextWrappers.forEach(next -> executorService.submit(() ->
next.work(executorService, this, nextRemainTIme, group))
);
nextWrappers.forEach(next -> executorService.addThreadSubmit(
new Async.TaskCallable(next, nextRemainTIme, group, executorService, this)
));
setState(state, AFTER_WORK, SUCCESS);
} finally {
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
@ -489,13 +509,13 @@ public abstract class WorkerWrapper<T, V> {
}
// ========== hashcode and equals ==========
@Override
public boolean equals(Object o) {
return super.equals(o);
}
// ========== package access methods ==========
/**
* {@code return id.hashCode();}返回id值的hashcode
*/
@ -505,33 +525,6 @@ public abstract class WorkerWrapper<T, V> {
return id.hashCode();
}
// ========== builder ==========
public static <T, V> WorkerWrapperBuilder<T, V> builder() {
return new Builder<>();
}
/**
* 自v1.5该类被抽取到{@link StableWorkerWrapperBuilder}抽象类兼容之前的版本
*/
public static class Builder<W, C> extends StableWorkerWrapperBuilder<W, C, Builder<W, C>> {
/**
* @deprecated 建议使用 {@link #builder()}返回{@link WorkerWrapperBuilder}接口以调用v1.5之后的规范api
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
public Builder() {
}
}
// ========== package access methods ==========
abstract void setNextWrappers(Set<WorkerWrapper<?, ?>> nextWrappers);
abstract void setDependWrappers(Set<WorkerWrapper<?, ?>> dependWrappers);
// ========== toString ==========
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(256)
@ -567,56 +560,7 @@ public abstract class WorkerWrapper<T, V> {
return sb.toString();
}
/**
* 一个通用的策略器实现类提供了修改的功能并兼容之前的代码
*/
public static class StableWrapperStrategy extends WrapperStrategy.AbstractWrapperStrategy {
private DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper;
private DependMustStrategyMapper dependMustStrategyMapper;
private DependenceStrategy dependenceStrategy;
private SkipStrategy skipStrategy;
@Override
public DependOnUpWrapperStrategyMapper getDependWrapperStrategyMapper() {
return dependOnUpWrapperStrategyMapper;
}
@Override
public void setDependWrapperStrategyMapper(DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper) {
this.dependOnUpWrapperStrategyMapper = dependOnUpWrapperStrategyMapper;
}
@SuppressWarnings("deprecation")
@Override
public DependMustStrategyMapper getDependMustStrategyMapper() {
return dependMustStrategyMapper;
}
@Override
public void setDependMustStrategyMapper(DependMustStrategyMapper dependMustStrategyMapper) {
this.dependMustStrategyMapper = dependMustStrategyMapper;
}
@Override
public DependenceStrategy getDependenceStrategy() {
return dependenceStrategy;
}
@Override
public void setDependenceStrategy(DependenceStrategy dependenceStrategy) {
this.dependenceStrategy = dependenceStrategy;
}
@Override
public SkipStrategy getSkipStrategy() {
return skipStrategy;
}
@Override
public void setSkipStrategy(SkipStrategy skipStrategy) {
this.skipStrategy = skipStrategy;
}
}
// ========== toString ==========
/**
* state状态枚举工具类
@ -658,28 +602,34 @@ public abstract class WorkerWrapper<T, V> {
// public
public boolean finished() {
return this == SUCCESS || this == ERROR || this == SKIP;
}
static final State[] states_of_notWorked = new State[]{INIT, STARTED};
// package
State(int id) {
this.id = id;
}
final int id;
// package-static
static final State[] states_of_notWorked = new State[]{INIT, STARTED};
static final State[] states_of_skipOrAfterWork = new State[]{SKIP, AFTER_WORK};
static final State[] states_of_beforeWorkingEnd = new State[]{INIT, STARTED, WORKING};
// package-static
static final State[] states_all = new State[]{BUILDING, INIT, STARTED, WORKING, AFTER_WORK, SUCCESS, ERROR, SKIP};
static final Map<Integer, State> id2state;
static {
HashMap<Integer, State> map = new HashMap<>();
for (State s : State.values()) {
map.put(s.id, s);
}
id2state = Collections.unmodifiableMap(map);
}
final int id;
State(int id) {
this.id = id;
}
/**
* 自旋+CAS的设置状态如果状态不在exceptValues返回内 没有设置成功则返回false
*
@ -778,25 +728,93 @@ public abstract class WorkerWrapper<T, V> {
return id2state.get(id);
}
static final Map<Integer, State> id2state;
static {
HashMap<Integer, State> map = new HashMap<>();
for (State s : State.values()) {
map.put(s.id, s);
}
id2state = Collections.unmodifiableMap(map);
public boolean finished() {
return this == SUCCESS || this == ERROR || this == SKIP;
}
}
/**
* 自v1.5该类被抽取到{@link StableWorkerWrapperBuilder}抽象类兼容之前的版本
*/
public static class Builder<W, C> extends StableWorkerWrapperBuilder<W, C, Builder<W, C>> {
/**
* @deprecated 建议使用 {@link #builder()}返回{@link WorkerWrapperBuilder}接口以调用v1.5之后的规范api
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
public Builder() {
}
}
/**
* 一个通用的策略器实现类提供了修改的功能并兼容之前的代码
*/
public static class StableWrapperStrategy extends WrapperStrategy.AbstractWrapperStrategy {
private DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper;
private DependMustStrategyMapper dependMustStrategyMapper;
private DependenceStrategy dependenceStrategy;
private SkipStrategy skipStrategy;
@Override
public DependOnUpWrapperStrategyMapper getDependWrapperStrategyMapper() {
return dependOnUpWrapperStrategyMapper;
}
@Override
public void setDependWrapperStrategyMapper(DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper) {
this.dependOnUpWrapperStrategyMapper = dependOnUpWrapperStrategyMapper;
}
@SuppressWarnings("deprecation")
@Override
public DependMustStrategyMapper getDependMustStrategyMapper() {
return dependMustStrategyMapper;
}
@Override
public void setDependMustStrategyMapper(DependMustStrategyMapper dependMustStrategyMapper) {
this.dependMustStrategyMapper = dependMustStrategyMapper;
}
@Override
public DependenceStrategy getDependenceStrategy() {
return dependenceStrategy;
}
@Override
public void setDependenceStrategy(DependenceStrategy dependenceStrategy) {
this.dependenceStrategy = dependenceStrategy;
}
@Override
public SkipStrategy getSkipStrategy() {
return skipStrategy;
}
@Override
public void setSkipStrategy(SkipStrategy skipStrategy) {
this.skipStrategy = skipStrategy;
}
}
/**
* 这是因未知错误而引发的异常
*/
public static class NotExpectedException extends Exception {
public NotExpectedException(Throwable cause, WorkerWrapper<?, ?> wrapper) {
super("It's should not happened Exception . wrapper is " + wrapper, cause);
}
}
}

View File

@ -12,16 +12,15 @@ import com.jd.platform.async.wrapper.WorkerWrapperBuilder;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.UUID;
import java.util.concurrent.*;
/**
* 示例模拟内存溢出
* <p>
* 运行之前请设置
* 运行内存溢出之前请设置
* -Xmx20m -Xms20m
*
* <p>
* 当内存溢出时其中一个线程会OOMrunable不会继续调度
* 我通过添加一个线程主动cancel来达到提前结束任务而不是等超时
*
@ -33,7 +32,7 @@ class Case15 {
return WorkerWrapper.<String, String>builder()
.id(id)
.param(id + "X")
.param(UUID.randomUUID().toString())
.worker(new MyWorker(id))
.callback((new ICallback<String, String>() {
@Override
@ -69,12 +68,24 @@ class Case15 {
)
.build();
try {
OnceWork work = Async.work(5000, a, d);
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejectexxxxd from " +
e.toString());
}
});
OnceWork work = Async.work(10000, executor, a, d);
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
pool.execute(() -> {
while (true) {
try {
if (work.hasTimeout()) {
System.out.println("超时");
}
if (work.isCancelled()) {
System.out.println("取消成功");
}
@ -95,8 +106,12 @@ class Case15 {
}
System.out.println("cost:" + (SystemClock.now() - now));
int count = 1;
while (build.getWorkResult().getEx() == null) {
//同步等待result数据写入
if (count++ > 800) {
break;
}
}
System.out.println("输出H节点的结果----" + build.getWorkResult());
/* 输出:
@ -116,7 +131,7 @@ class Case15 {
//用于存放模拟的对象防止GC回收用List做对象引用
private final List<byte[]> list = new LinkedList<>();
private String id;
private final String id;
private int i = 0;
@ -127,13 +142,51 @@ class Case15 {
@Override
public String action(String param, Map<String, WorkerWrapper<?, ?>> allWrappers) {
if ("F".equals(id)) {
System.out.println("wrapper(id=" + id + ") is working");
while (true) {
System.out.println("wrapper(id=" + id + ") is working");
System.out.println("I am alive" + i++);
/*
第一种问题内存溢出OOM由系统取消任务执行H的结果为{result=null, resultState=DEFAULT, ex=null}因为没有跑到H所以H的结果为null
取消成功结束成功
*/
byte[] buf = new byte[1024 * 1024];
list.add(buf);
/*
第二种问题存在异常H的结果为WorkResult{result=null, resultState=EXCEPTION, ex=java.lang.ArithmeticException: / by zero}
结束成功
*/
/*if(i==20000){
int a=1/0;
}*/
/*
第三种问题啥也不做就是等待结果执行超时WorkResult{result=null, resultState=TIMEOUT, ex=null}AsyncTool会在超时时发出中断指令停止运行
超时结束成功
*/
/*try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
//如果将下面的语句注释那么任务将永远不会结束
throw new RuntimeException("被中断");
}*/
}
}
if ("H".equals(id)) {
/**
* 最后一个节点是否会被回调
*
* 第一种问题下不会回调
* 第二种问题下不会回调
* 第三种问题下不会回调
*/
System.out.println("H被回调");
}
return id;
}

View File

@ -0,0 +1,199 @@
package v15.cases;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.worker.OnceWork;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import com.jd.platform.async.wrapper.WorkerWrapperBuilder;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 示例模拟线程池资源不够用的情况
*
* @author create by kyle
*/
class Case16 {
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20,
30L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(10),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
System.out.println("Task " + r.toString() +
" rejected from " +
e.toString());
if (!e.isShutdown()) {
r.run();
}
}
});
private static WorkerWrapperBuilder<?, ?> builder(String id) {
return WorkerWrapper.<String, String>builder()
.id(id)
.param(UUID.randomUUID().toString())
.worker(new MyWorker(id))
.callback((new ICallback<String, String>() {
@Override
public void begin() {
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
/* System.out.println("(id=" + id + ") callback "
+ (success ? "success " : "fail ")
+ ", workResult is " + workResult);*/
}
}))
.allowInterrupt(true);
}
public static void main(String[] args) {
//任务数量
final int count = 40;
//超时时间
int timeout = 10000;
List<OnceWork> workList = new ArrayList<>(count);
//每个任务约23个节点
for (int i = 0; i < count; i++) {
WorkerWrapper<?, ?> a = builder("A").build();
WorkerWrapper<?, ?> d;
WorkerWrapper<?, ?> k;
WorkerWrapper<?, ?> n;
WorkerWrapper<?, ?> q;
WorkerWrapper<?, ?> t;
WorkerWrapper<?, ?> w;
WorkerWrapper<?, ?> build = builder("H")
.depends(
builder("F")
.depends(builder("B").depends(a).build())
.depends(builder("C").depends(a).build())
.build(),
builder("G")
.depends(builder("E")
.depends(d = builder("D").build())
.build())
.build(),
builder("I")
.depends(builder("J")
.depends(k = builder("K").build())
.build())
.build(),
builder("L")
.depends(builder("M")
.depends(n = builder("N").build())
.build())
.build(),
builder("O")
.depends(builder("P")
.depends(q = builder("Q").build())
.build())
.build(),
builder("R")
.depends(builder("S")
.depends(t = builder("T").build())
.build())
.build(),
builder("U")
.depends(builder("V")
.depends(w = builder("W").build())
.build())
.build()
)
.build();
OnceWork work = Async.work(timeout, executor, a, d, k, n, q, t, w);
workList.add(work);
}
while (true) {
long finishCount = workList.stream().filter(OnceWork::isFinish).count();
if (finishCount == count) {
break;
}
}
for (OnceWork work : workList) {
try {
System.out.println("cost:" + (work.getFinishTime() - work.getStartTime()));
} catch (IllegalStateException e) {
}
}
long cancelCount = workList.stream().filter(onceWork -> onceWork.isCancelled() || onceWork.isWaitingCancel()).count();
long timeoutCount = workList.stream().filter(OnceWork::hasTimeout).count();
long finishCount = workList.stream().filter(OnceWork::isFinish).count();
System.out.println("取消数量" + cancelCount);
System.out.println("超时数量" + timeoutCount);
System.out.println("完成数量" + finishCount);
}
private static class MyWorker implements IWorker<String, String> {
//用于存放模拟的对象防止GC回收用List做对象引用
private final List<byte[]> list = new LinkedList<>();
private final String id;
private final int i = 0;
public MyWorker(String id) {
this.id = id;
}
@Override
public String action(String param, Map<String, WorkerWrapper<?, ?>> allWrappers) {
if ("F".equals(id)) {
while (true) {
/*
第一种问题内存溢出OOM由系统取消任务执行H的结果为{result=null, resultState=DEFAULT, ex=null}因为没有跑到H所以H的结果为null
取消成功结束成功
*/
/*byte[] buf = new byte[1024 * 1024];
list.add(buf);*/
/*
第二种问题存在异常H的结果为WorkResult{result=null, resultState=EXCEPTION, ex=java.lang.ArithmeticException: / by zero}
结束成功
*/
/*if(i==20000){
int a=1/0;
}*/
/*
第三种问题啥也不做就是等待结果执行超时WorkResult{result=null, resultState=TIMEOUT, ex=null}AsyncTool会在超时时发出中断指令停止运行
超时结束成功
*/
/*try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
//如果将下面的语句注释那么任务将永远不会结束
throw new RuntimeException("被中断");
}*/
//模拟有任务不退出的情况
System.out.println(param + " running");
Thread.yield();
}
}
return id;
}
}
}