mirror of
https://gitee.com/jd-platform-opensource/asyncTool.git
synced 2024-12-19 03:30:30 +08:00
fix: 将检查任务放到哈希时间轮,减少一个检查线程的使用
This commit is contained in:
parent
ea41a63394
commit
a430e19206
@ -117,17 +117,16 @@ public class Async {
|
||||
Async.lastExecutorService.set(Objects.requireNonNull(executorService, "ExecutorService is null ! "));
|
||||
final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout);
|
||||
group.addWrapper(workerWrappers);
|
||||
final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId);
|
||||
ExecutorServiceWrapper executorServiceWrapper = new ExecutorServiceWrapper(executorService);
|
||||
final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId, executorServiceWrapper);
|
||||
|
||||
//有多少个开始节点就有多少个线程,依赖任务靠被依赖任务的线程完成工作
|
||||
workerWrappers.forEach(wrapper -> {
|
||||
if (wrapper == null) {
|
||||
return;
|
||||
}
|
||||
executorServiceWrapper.addThreadSubmit(new TaskCallable(wrapper, timeout, group, executorServiceWrapper));
|
||||
onceWork.addThreadSubmit(new TaskCallable(wrapper, timeout, group, onceWork));
|
||||
});
|
||||
executorServiceWrapper.startCheck(onceWork);
|
||||
return onceWork;
|
||||
}
|
||||
|
||||
@ -333,29 +332,29 @@ public class Async {
|
||||
|
||||
private final WorkerWrapper<?, ?> wrapper;
|
||||
|
||||
private final ExecutorServiceWrapper executorServiceWrapper;
|
||||
|
||||
private final WorkerWrapper workerWrapper;
|
||||
|
||||
public TaskCallable(WorkerWrapper<?, ?> wrapper, long timeout, WorkerWrapperGroup group, ExecutorServiceWrapper executorServiceWrapper) {
|
||||
private final OnceWork onceWork;
|
||||
|
||||
public TaskCallable(WorkerWrapper<?, ?> wrapper, long timeout, WorkerWrapperGroup group, OnceWork onceWork) {
|
||||
this.wrapper = wrapper;
|
||||
this.group = group;
|
||||
this.timeout = timeout;
|
||||
this.executorServiceWrapper = executorServiceWrapper;
|
||||
this.onceWork = onceWork;
|
||||
this.workerWrapper = null;
|
||||
}
|
||||
|
||||
public <V, T> TaskCallable(WorkerWrapper<?, ?> wrapper, long timeout, WorkerWrapperGroup group, ExecutorServiceWrapper executorService, WorkerWrapper<?, ?> workerWrapper) {
|
||||
public <V, T> TaskCallable(WorkerWrapper<?, ?> wrapper, long timeout, WorkerWrapperGroup group, OnceWork onceWork, WorkerWrapper<?, ?> workerWrapper) {
|
||||
this.wrapper = wrapper;
|
||||
this.group = group;
|
||||
this.timeout = timeout;
|
||||
this.executorServiceWrapper = executorService;
|
||||
this.onceWork = onceWork;
|
||||
this.workerWrapper = workerWrapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BigDecimal call() throws Exception {
|
||||
wrapper.work(executorServiceWrapper, this.workerWrapper, timeout, group);
|
||||
wrapper.work(onceWork, this.workerWrapper, timeout, group);
|
||||
return BigDecimal.ZERO;
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
package com.jd.platform.async.executor;
|
||||
|
||||
import com.jd.platform.async.openutil.timer.Timeout;
|
||||
import com.jd.platform.async.openutil.timer.TimerTask;
|
||||
import com.jd.platform.async.worker.OnceWork;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
@ -26,33 +28,26 @@ public class ExecutorServiceWrapper {
|
||||
allThreadSubmit.add(executorService.submit(callable));
|
||||
}
|
||||
|
||||
public void startCheck(final OnceWork.Impl onceWork) {
|
||||
executorService.execute(new ThreadCheckRunable(onceWork, this));
|
||||
public void startCheck(final OnceWork onceWork) {
|
||||
PollingCenter.getInstance().checkGroup(new ThreadCheckRunable(onceWork, this), 3000);
|
||||
}
|
||||
|
||||
private static class ThreadCheckRunable implements Runnable {
|
||||
private static class ThreadCheckRunable implements TimerTask {
|
||||
|
||||
private final OnceWork.Impl onceWork;
|
||||
private final OnceWork onceWork;
|
||||
|
||||
private final ExecutorServiceWrapper executorServiceWrapper;
|
||||
|
||||
public ThreadCheckRunable(OnceWork.Impl onceWork, ExecutorServiceWrapper executorServiceWrapper) {
|
||||
public ThreadCheckRunable(OnceWork onceWork, ExecutorServiceWrapper executorServiceWrapper) {
|
||||
this.onceWork = onceWork;
|
||||
this.executorServiceWrapper = executorServiceWrapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
TimeUnit.MILLISECONDS.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
//任务结束就退出检查
|
||||
if (onceWork.isFinish()) {
|
||||
break;
|
||||
} else if (executorServiceWrapper.getAllThreadSubmit().size() > 0) {
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
//任务结束就退出检查
|
||||
if (!onceWork.isFinish()) {
|
||||
if (executorServiceWrapper.getAllThreadSubmit().size() > 0) {
|
||||
boolean isException = false;
|
||||
boolean isCancelld = false;
|
||||
boolean isDone = false;
|
||||
@ -68,9 +63,11 @@ public class ExecutorServiceWrapper {
|
||||
} catch (InterruptedException e) {
|
||||
//中断等
|
||||
e.printStackTrace();
|
||||
System.out.println("出现中断" + e);
|
||||
isException = true;
|
||||
} catch (ExecutionException e) {
|
||||
//内存溢出等
|
||||
System.out.println("出现内存溢出等" + e);
|
||||
e.printStackTrace();
|
||||
isException = true;
|
||||
} catch (TimeoutException e) {
|
||||
@ -87,7 +84,6 @@ public class ExecutorServiceWrapper {
|
||||
|| onceWork.isWaitingCancel())) {
|
||||
onceWork.pleaseCancel();
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
if (isDone) {
|
||||
System.out.println("部分任务已经在线程池完成");
|
||||
@ -96,12 +92,14 @@ public class ExecutorServiceWrapper {
|
||||
onceWork.check();
|
||||
}
|
||||
} else {
|
||||
//FIXME 高强度检查会不会造成检查线程过多?
|
||||
onceWork.check();
|
||||
}
|
||||
} else {
|
||||
System.out.println("任务已完成");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -50,11 +50,11 @@ public class PollingCenter {
|
||||
|
||||
// ========== fields and methods ==========
|
||||
|
||||
public void checkGroup(WorkerWrapperGroup.CheckFinishTask task) {
|
||||
public void checkGroup(TimerTask task) {
|
||||
checkGroup(task, 0);
|
||||
}
|
||||
|
||||
public void checkGroup(WorkerWrapperGroup.CheckFinishTask task, long daley) {
|
||||
public void checkGroup(TimerTask task, long daley) {
|
||||
timer.newTimeout(task, daley, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
package com.jd.platform.async.worker;
|
||||
|
||||
import com.jd.platform.async.executor.Async;
|
||||
import com.jd.platform.async.executor.ExecutorServiceWrapper;
|
||||
import com.jd.platform.async.executor.PollingCenter;
|
||||
import com.jd.platform.async.executor.timer.SystemClock;
|
||||
import com.jd.platform.async.wrapper.WorkerWrapper;
|
||||
@ -150,6 +152,10 @@ public interface OnceWork {
|
||||
return new AsFuture(this, sleepCheckInterval);
|
||||
}
|
||||
|
||||
void check();
|
||||
|
||||
void addThreadSubmit(Async.TaskCallable taskCallable);
|
||||
|
||||
// class
|
||||
|
||||
class AsFuture implements Future<Map<String, WorkerWrapper<?, ?>>> {
|
||||
@ -281,9 +287,12 @@ public interface OnceWork {
|
||||
|
||||
protected final WorkerWrapperGroup group;
|
||||
|
||||
public Impl(WorkerWrapperGroup group, String workId) {
|
||||
private final ExecutorServiceWrapper executorServiceWrapper;
|
||||
|
||||
public Impl(WorkerWrapperGroup group, String workId, ExecutorServiceWrapper executorServiceWrapper) {
|
||||
super(workId);
|
||||
this.group = group;
|
||||
this.executorServiceWrapper = executorServiceWrapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -336,9 +345,16 @@ public interface OnceWork {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void check() {
|
||||
//发起检查,看看所有是否取消完毕
|
||||
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
|
||||
executorServiceWrapper.startCheck(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addThreadSubmit(Async.TaskCallable taskCallable) {
|
||||
executorServiceWrapper.addThreadSubmit(taskCallable);
|
||||
}
|
||||
|
||||
}
|
||||
@ -395,6 +411,15 @@ public interface OnceWork {
|
||||
public void pleaseCancel() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void check() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addThreadSubmit(Async.TaskCallable taskCallable) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "(it's empty work)";
|
||||
|
@ -7,9 +7,9 @@ import com.jd.platform.async.exception.CancelException;
|
||||
import com.jd.platform.async.exception.EndsNormallyException;
|
||||
import com.jd.platform.async.exception.SkippedException;
|
||||
import com.jd.platform.async.executor.Async;
|
||||
import com.jd.platform.async.executor.ExecutorServiceWrapper;
|
||||
import com.jd.platform.async.executor.PollingCenter;
|
||||
import com.jd.platform.async.executor.timer.SystemClock;
|
||||
import com.jd.platform.async.worker.OnceWork;
|
||||
import com.jd.platform.async.worker.ResultState;
|
||||
import com.jd.platform.async.worker.WorkResult;
|
||||
import com.jd.platform.async.wrapper.strategy.WrapperStrategy;
|
||||
@ -150,7 +150,7 @@ public abstract class WorkerWrapper<T, V> {
|
||||
* @param group wrapper组
|
||||
* @throws IllegalStateException 当wrapper正在building状态时被启动,则会抛出该异常。
|
||||
*/
|
||||
public void work(ExecutorServiceWrapper executorService,
|
||||
public void work(OnceWork.Impl executorService,
|
||||
long remainTime,
|
||||
WorkerWrapperGroup group) {
|
||||
work(executorService, null, remainTime, group);
|
||||
@ -277,7 +277,7 @@ public abstract class WorkerWrapper<T, V> {
|
||||
* @param remainTime 剩余时间。
|
||||
* @throws IllegalStateException 当wrapper正在building状态时被启动,则会抛出该异常。
|
||||
*/
|
||||
public void work(ExecutorServiceWrapper executorService,
|
||||
public void work(OnceWork work,
|
||||
WorkerWrapper<?, ?> fromWrapper,
|
||||
long remainTime,
|
||||
WorkerWrapperGroup group
|
||||
@ -300,7 +300,7 @@ public abstract class WorkerWrapper<T, V> {
|
||||
final Consumer<Boolean> __function__callbackResultOfFalse_beginNext =
|
||||
(success) -> {
|
||||
__function__callbackResult.accept(success);
|
||||
beginNext(executorService, now, remainTime, group);
|
||||
beginNext(work, now, remainTime, group);
|
||||
};
|
||||
final BiConsumer<Boolean, Exception> __function__fastFail_callbackResult$false_beginNext =
|
||||
(fastFail_isTimeout, fastFail_exception) -> {
|
||||
@ -387,7 +387,7 @@ public abstract class WorkerWrapper<T, V> {
|
||||
wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper);
|
||||
switch (judge.getDependenceAction()) {
|
||||
case TAKE_REST:
|
||||
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
|
||||
work.check();
|
||||
return;
|
||||
case FAST_FAIL:
|
||||
if (setState(state, STARTED, ERROR)) {
|
||||
@ -471,7 +471,7 @@ public abstract class WorkerWrapper<T, V> {
|
||||
* <p/>
|
||||
* 本方法不负责校验状态。请在调用前自行检验
|
||||
*/
|
||||
protected void beginNext(ExecutorServiceWrapper executorService, long now, long remainTime, WorkerWrapperGroup group) {
|
||||
protected void beginNext(OnceWork onceWork, long now, long remainTime, WorkerWrapperGroup group) {
|
||||
//花费的时间
|
||||
final long costTime = SystemClock.now() - now;
|
||||
final long nextRemainTIme = remainTime - costTime;
|
||||
@ -490,7 +490,7 @@ public abstract class WorkerWrapper<T, V> {
|
||||
} finally {
|
||||
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
|
||||
if (next != null) {
|
||||
next.work(executorService, this, nextRemainTIme, group);
|
||||
next.work(onceWork, this, nextRemainTIme, group);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -498,8 +498,8 @@ public abstract class WorkerWrapper<T, V> {
|
||||
else {
|
||||
try {
|
||||
group.addWrapper(nextWrappers);
|
||||
nextWrappers.forEach(next -> executorService.addThreadSubmit(
|
||||
new Async.TaskCallable(next, nextRemainTIme, group, executorService, this)
|
||||
nextWrappers.forEach(next -> onceWork.addThreadSubmit(
|
||||
new Async.TaskCallable(next, nextRemainTIme, group, onceWork, this)
|
||||
));
|
||||
setState(state, AFTER_WORK, SUCCESS);
|
||||
} finally {
|
||||
|
@ -143,8 +143,7 @@ class Case15 {
|
||||
public String action(String param, Map<String, WorkerWrapper<?, ?>> allWrappers) {
|
||||
if ("F".equals(id)) {
|
||||
while (true) {
|
||||
System.out.println("wrapper(id=" + id + ") is working");
|
||||
System.out.println("I am alive:" + i++);
|
||||
System.out.print(id + " I am alive:" + i++ + ".");
|
||||
/*
|
||||
第一种问题,内存溢出OOM,由系统取消任务执行,H的结果为{result=null, resultState=DEFAULT, ex=null},因为没有跑到H,所以H的结果为null
|
||||
|
||||
|
@ -24,15 +24,12 @@ class Case16 {
|
||||
|
||||
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20,
|
||||
30L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(10),
|
||||
new RejectedExecutionHandler() {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
|
||||
System.out.println("Task " + r.toString() +
|
||||
" rejected from " +
|
||||
e.toString());
|
||||
if (!e.isShutdown()) {
|
||||
r.run();
|
||||
}
|
||||
(r, e) -> {
|
||||
System.out.println("Task " + r.toString() +
|
||||
" rejected from " +
|
||||
e.toString());
|
||||
if (!e.isShutdown()) {
|
||||
r.run();
|
||||
}
|
||||
});
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user