!29 将检查任务放到哈希时间轮,减少一个检查线程的使用

Merge pull request !29 from 云开/kyle-dev
This commit is contained in:
tianyaleixiaowu 2023-04-28 06:34:40 +00:00 committed by Gitee
commit 445293e89a
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
7 changed files with 70 additions and 52 deletions

View File

@ -117,17 +117,16 @@ public class Async {
Async.lastExecutorService.set(Objects.requireNonNull(executorService, "ExecutorService is null ! ")); Async.lastExecutorService.set(Objects.requireNonNull(executorService, "ExecutorService is null ! "));
final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout); final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout);
group.addWrapper(workerWrappers); group.addWrapper(workerWrappers);
final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId);
ExecutorServiceWrapper executorServiceWrapper = new ExecutorServiceWrapper(executorService); ExecutorServiceWrapper executorServiceWrapper = new ExecutorServiceWrapper(executorService);
final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId, executorServiceWrapper);
//有多少个开始节点就有多少个线程依赖任务靠被依赖任务的线程完成工作 //有多少个开始节点就有多少个线程依赖任务靠被依赖任务的线程完成工作
workerWrappers.forEach(wrapper -> { workerWrappers.forEach(wrapper -> {
if (wrapper == null) { if (wrapper == null) {
return; return;
} }
executorServiceWrapper.addThreadSubmit(new TaskCallable(wrapper, timeout, group, executorServiceWrapper)); onceWork.addThreadSubmit(new TaskCallable(wrapper, timeout, group, onceWork));
}); });
executorServiceWrapper.startCheck(onceWork);
return onceWork; return onceWork;
} }
@ -333,29 +332,29 @@ public class Async {
private final WorkerWrapper<?, ?> wrapper; private final WorkerWrapper<?, ?> wrapper;
private final ExecutorServiceWrapper executorServiceWrapper;
private final WorkerWrapper workerWrapper; private final WorkerWrapper workerWrapper;
public TaskCallable(WorkerWrapper<?, ?> wrapper, long timeout, WorkerWrapperGroup group, ExecutorServiceWrapper executorServiceWrapper) { private final OnceWork onceWork;
public TaskCallable(WorkerWrapper<?, ?> wrapper, long timeout, WorkerWrapperGroup group, OnceWork onceWork) {
this.wrapper = wrapper; this.wrapper = wrapper;
this.group = group; this.group = group;
this.timeout = timeout; this.timeout = timeout;
this.executorServiceWrapper = executorServiceWrapper; this.onceWork = onceWork;
this.workerWrapper = null; this.workerWrapper = null;
} }
public <V, T> TaskCallable(WorkerWrapper<?, ?> wrapper, long timeout, WorkerWrapperGroup group, ExecutorServiceWrapper executorService, WorkerWrapper<?, ?> workerWrapper) { public <V, T> TaskCallable(WorkerWrapper<?, ?> wrapper, long timeout, WorkerWrapperGroup group, OnceWork onceWork, WorkerWrapper<?, ?> workerWrapper) {
this.wrapper = wrapper; this.wrapper = wrapper;
this.group = group; this.group = group;
this.timeout = timeout; this.timeout = timeout;
this.executorServiceWrapper = executorService; this.onceWork = onceWork;
this.workerWrapper = workerWrapper; this.workerWrapper = workerWrapper;
} }
@Override @Override
public BigDecimal call() throws Exception { public BigDecimal call() throws Exception {
wrapper.work(executorServiceWrapper, this.workerWrapper, timeout, group); wrapper.work(onceWork, this.workerWrapper, timeout, group);
return BigDecimal.ZERO; return BigDecimal.ZERO;
} }

View File

@ -1,5 +1,7 @@
package com.jd.platform.async.executor; package com.jd.platform.async.executor;
import com.jd.platform.async.openutil.timer.Timeout;
import com.jd.platform.async.openutil.timer.TimerTask;
import com.jd.platform.async.worker.OnceWork; import com.jd.platform.async.worker.OnceWork;
import java.util.concurrent.*; import java.util.concurrent.*;
@ -26,33 +28,26 @@ public class ExecutorServiceWrapper {
allThreadSubmit.add(executorService.submit(callable)); allThreadSubmit.add(executorService.submit(callable));
} }
public void startCheck(final OnceWork.Impl onceWork) { public void startCheck(final OnceWork onceWork) {
executorService.execute(new ThreadCheckRunable(onceWork, this)); PollingCenter.getInstance().checkGroup(new ThreadCheckRunable(onceWork, this), 3000);
} }
private static class ThreadCheckRunable implements Runnable { private static class ThreadCheckRunable implements TimerTask {
private final OnceWork.Impl onceWork; private final OnceWork onceWork;
private final ExecutorServiceWrapper executorServiceWrapper; private final ExecutorServiceWrapper executorServiceWrapper;
public ThreadCheckRunable(OnceWork.Impl onceWork, ExecutorServiceWrapper executorServiceWrapper) { public ThreadCheckRunable(OnceWork onceWork, ExecutorServiceWrapper executorServiceWrapper) {
this.onceWork = onceWork; this.onceWork = onceWork;
this.executorServiceWrapper = executorServiceWrapper; this.executorServiceWrapper = executorServiceWrapper;
} }
@Override @Override
public void run() { public void run(Timeout timeout) throws Exception {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//任务结束就退出检查 //任务结束就退出检查
if (onceWork.isFinish()) { if (!onceWork.isFinish()) {
break; if (executorServiceWrapper.getAllThreadSubmit().size() > 0) {
} else if (executorServiceWrapper.getAllThreadSubmit().size() > 0) {
boolean isException = false; boolean isException = false;
boolean isCancelld = false; boolean isCancelld = false;
boolean isDone = false; boolean isDone = false;
@ -68,9 +63,11 @@ public class ExecutorServiceWrapper {
} catch (InterruptedException e) { } catch (InterruptedException e) {
//中断等 //中断等
e.printStackTrace(); e.printStackTrace();
System.out.println("出现中断" + e);
isException = true; isException = true;
} catch (ExecutionException e) { } catch (ExecutionException e) {
//内存溢出等 //内存溢出等
System.out.println("出现内存溢出等" + e);
e.printStackTrace(); e.printStackTrace();
isException = true; isException = true;
} catch (TimeoutException e) { } catch (TimeoutException e) {
@ -87,7 +84,6 @@ public class ExecutorServiceWrapper {
|| onceWork.isWaitingCancel())) { || onceWork.isWaitingCancel())) {
onceWork.pleaseCancel(); onceWork.pleaseCancel();
} }
break;
} else { } else {
if (isDone) { if (isDone) {
System.out.println("部分任务已经在线程池完成"); System.out.println("部分任务已经在线程池完成");
@ -96,12 +92,14 @@ public class ExecutorServiceWrapper {
onceWork.check(); onceWork.check();
} }
} else { } else {
//FIXME 高强度检查会不会造成检查线程过多
onceWork.check(); onceWork.check();
} }
} else {
System.out.println("任务已完成");
} }
} }
} }
} }

View File

@ -50,11 +50,11 @@ public class PollingCenter {
// ========== fields and methods ========== // ========== fields and methods ==========
public void checkGroup(WorkerWrapperGroup.CheckFinishTask task) { public void checkGroup(TimerTask task) {
checkGroup(task, 0); checkGroup(task, 0);
} }
public void checkGroup(WorkerWrapperGroup.CheckFinishTask task, long daley) { public void checkGroup(TimerTask task, long daley) {
timer.newTimeout(task, daley, TimeUnit.MILLISECONDS); timer.newTimeout(task, daley, TimeUnit.MILLISECONDS);
} }

View File

@ -1,5 +1,7 @@
package com.jd.platform.async.worker; package com.jd.platform.async.worker;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.executor.ExecutorServiceWrapper;
import com.jd.platform.async.executor.PollingCenter; import com.jd.platform.async.executor.PollingCenter;
import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapper;
@ -150,6 +152,10 @@ public interface OnceWork {
return new AsFuture(this, sleepCheckInterval); return new AsFuture(this, sleepCheckInterval);
} }
void check();
void addThreadSubmit(Async.TaskCallable taskCallable);
// class // class
class AsFuture implements Future<Map<String, WorkerWrapper<?, ?>>> { class AsFuture implements Future<Map<String, WorkerWrapper<?, ?>>> {
@ -281,9 +287,12 @@ public interface OnceWork {
protected final WorkerWrapperGroup group; protected final WorkerWrapperGroup group;
public Impl(WorkerWrapperGroup group, String workId) { private final ExecutorServiceWrapper executorServiceWrapper;
public Impl(WorkerWrapperGroup group, String workId, ExecutorServiceWrapper executorServiceWrapper) {
super(workId); super(workId);
this.group = group; this.group = group;
this.executorServiceWrapper = executorServiceWrapper;
} }
@Override @Override
@ -336,9 +345,16 @@ public interface OnceWork {
} }
} }
@Override
public void check() { public void check() {
//发起检查看看所有是否取消完毕 //发起检查看看所有是否取消完毕
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
executorServiceWrapper.startCheck(this);
}
@Override
public void addThreadSubmit(Async.TaskCallable taskCallable) {
executorServiceWrapper.addThreadSubmit(taskCallable);
} }
} }
@ -395,6 +411,15 @@ public interface OnceWork {
public void pleaseCancel() { public void pleaseCancel() {
} }
@Override
public void check() {
}
@Override
public void addThreadSubmit(Async.TaskCallable taskCallable) {
// do nothing
}
@Override @Override
public String toString() { public String toString() {
return "(it's empty work)"; return "(it's empty work)";

View File

@ -7,9 +7,9 @@ import com.jd.platform.async.exception.CancelException;
import com.jd.platform.async.exception.EndsNormallyException; import com.jd.platform.async.exception.EndsNormallyException;
import com.jd.platform.async.exception.SkippedException; import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.executor.Async; import com.jd.platform.async.executor.Async;
import com.jd.platform.async.executor.ExecutorServiceWrapper;
import com.jd.platform.async.executor.PollingCenter; import com.jd.platform.async.executor.PollingCenter;
import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.OnceWork;
import com.jd.platform.async.worker.ResultState; import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.strategy.WrapperStrategy; import com.jd.platform.async.wrapper.strategy.WrapperStrategy;
@ -150,7 +150,7 @@ public abstract class WorkerWrapper<T, V> {
* @param group wrapper组 * @param group wrapper组
* @throws IllegalStateException 当wrapper正在building状态时被启动则会抛出该异常 * @throws IllegalStateException 当wrapper正在building状态时被启动则会抛出该异常
*/ */
public void work(ExecutorServiceWrapper executorService, public void work(OnceWork.Impl executorService,
long remainTime, long remainTime,
WorkerWrapperGroup group) { WorkerWrapperGroup group) {
work(executorService, null, remainTime, group); work(executorService, null, remainTime, group);
@ -277,7 +277,7 @@ public abstract class WorkerWrapper<T, V> {
* @param remainTime 剩余时间 * @param remainTime 剩余时间
* @throws IllegalStateException 当wrapper正在building状态时被启动则会抛出该异常 * @throws IllegalStateException 当wrapper正在building状态时被启动则会抛出该异常
*/ */
public void work(ExecutorServiceWrapper executorService, public void work(OnceWork work,
WorkerWrapper<?, ?> fromWrapper, WorkerWrapper<?, ?> fromWrapper,
long remainTime, long remainTime,
WorkerWrapperGroup group WorkerWrapperGroup group
@ -300,7 +300,7 @@ public abstract class WorkerWrapper<T, V> {
final Consumer<Boolean> __function__callbackResultOfFalse_beginNext = final Consumer<Boolean> __function__callbackResultOfFalse_beginNext =
(success) -> { (success) -> {
__function__callbackResult.accept(success); __function__callbackResult.accept(success);
beginNext(executorService, now, remainTime, group); beginNext(work, now, remainTime, group);
}; };
final BiConsumer<Boolean, Exception> __function__fastFail_callbackResult$false_beginNext = final BiConsumer<Boolean, Exception> __function__fastFail_callbackResult$false_beginNext =
(fastFail_isTimeout, fastFail_exception) -> { (fastFail_isTimeout, fastFail_exception) -> {
@ -387,7 +387,7 @@ public abstract class WorkerWrapper<T, V> {
wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper); wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper);
switch (judge.getDependenceAction()) { switch (judge.getDependenceAction()) {
case TAKE_REST: case TAKE_REST:
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); work.check();
return; return;
case FAST_FAIL: case FAST_FAIL:
if (setState(state, STARTED, ERROR)) { if (setState(state, STARTED, ERROR)) {
@ -471,7 +471,7 @@ public abstract class WorkerWrapper<T, V> {
* <p/> * <p/>
* 本方法不负责校验状态请在调用前自行检验 * 本方法不负责校验状态请在调用前自行检验
*/ */
protected void beginNext(ExecutorServiceWrapper executorService, long now, long remainTime, WorkerWrapperGroup group) { protected void beginNext(OnceWork onceWork, long now, long remainTime, WorkerWrapperGroup group) {
//花费的时间 //花费的时间
final long costTime = SystemClock.now() - now; final long costTime = SystemClock.now() - now;
final long nextRemainTIme = remainTime - costTime; final long nextRemainTIme = remainTime - costTime;
@ -490,7 +490,7 @@ public abstract class WorkerWrapper<T, V> {
} finally { } finally {
PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); PollingCenter.getInstance().checkGroup(group.new CheckFinishTask());
if (next != null) { if (next != null) {
next.work(executorService, this, nextRemainTIme, group); next.work(onceWork, this, nextRemainTIme, group);
} }
} }
} }
@ -498,8 +498,8 @@ public abstract class WorkerWrapper<T, V> {
else { else {
try { try {
group.addWrapper(nextWrappers); group.addWrapper(nextWrappers);
nextWrappers.forEach(next -> executorService.addThreadSubmit( nextWrappers.forEach(next -> onceWork.addThreadSubmit(
new Async.TaskCallable(next, nextRemainTIme, group, executorService, this) new Async.TaskCallable(next, nextRemainTIme, group, onceWork, this)
)); ));
setState(state, AFTER_WORK, SUCCESS); setState(state, AFTER_WORK, SUCCESS);
} finally { } finally {

View File

@ -143,8 +143,7 @@ class Case15 {
public String action(String param, Map<String, WorkerWrapper<?, ?>> allWrappers) { public String action(String param, Map<String, WorkerWrapper<?, ?>> allWrappers) {
if ("F".equals(id)) { if ("F".equals(id)) {
while (true) { while (true) {
System.out.println("wrapper(id=" + id + ") is working"); System.out.print(id + " I am alive" + i++ + ".");
System.out.println("I am alive" + i++);
/* /*
第一种问题内存溢出OOM由系统取消任务执行H的结果为{result=null, resultState=DEFAULT, ex=null}因为没有跑到H所以H的结果为null 第一种问题内存溢出OOM由系统取消任务执行H的结果为{result=null, resultState=DEFAULT, ex=null}因为没有跑到H所以H的结果为null

View File

@ -24,16 +24,13 @@ class Case16 {
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20,
30L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(10), 30L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(10),
new RejectedExecutionHandler() { (r, e) -> {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
System.out.println("Task " + r.toString() + System.out.println("Task " + r.toString() +
" rejected from " + " rejected from " +
e.toString()); e.toString());
if (!e.isShutdown()) { if (!e.isShutdown()) {
r.run(); r.run();
} }
}
}); });
private static WorkerWrapperBuilder<?, ?> builder(String id) { private static WorkerWrapperBuilder<?, ?> builder(String id) {