加上异步执行的入口逻辑

This commit is contained in:
wuweifeng10 2019-12-27 16:18:19 +08:00
parent d2a4e7ee3e
commit 8278337a4a
5 changed files with 56 additions and 47 deletions

View File

@ -4,6 +4,7 @@ package com.tianyalei.async.callback;
import com.tianyalei.async.worker.WorkResult; import com.tianyalei.async.worker.WorkResult;
/** /**
* 默认回调类如果不设置的话会默认给这个回调
* @author wuweifeng wrote on 2019-11-19. * @author wuweifeng wrote on 2019-11-19.
*/ */
public class DefaultCallback<T, V> implements ICallback<T, V> { public class DefaultCallback<T, V> implements ICallback<T, V> {

View File

@ -0,0 +1,21 @@
package com.tianyalei.async.callback;
import com.tianyalei.async.group.WorkerWrapper;
import java.util.List;
/**
* @author wuweifeng wrote on 2019-12-27
* @version 1.0
*/
public class DefaultGroupCallback implements IGroupCallback {
@Override
public void success(List<WorkerWrapper> workerWrappers) {
}
@Override
public void failure(List<WorkerWrapper> workerWrappers, Exception e) {
}
}

View File

@ -1,12 +1,20 @@
package com.tianyalei.async.callback; package com.tianyalei.async.callback;
import com.tianyalei.async.group.WorkerWrapper;
import java.util.List; import java.util.List;
/** /**
* 如果是异步执行整组的话可以用这个组回调不推荐使用
* @author wuweifeng wrote on 2019-11-19. * @author wuweifeng wrote on 2019-11-19.
*/ */
public interface IGroupCallback { public interface IGroupCallback {
void success(List<?> result); /**
* 成功后可以从wrapper里去getWorkResult
void failure(Exception e); */
void success(List<WorkerWrapper> workerWrappers);
/**
* 失败了也可以从wrapper里去getWorkResult
*/
void failure(List<WorkerWrapper> workerWrappers, Exception e);
} }

View File

@ -1,6 +1,8 @@
package com.tianyalei.async.executor; package com.tianyalei.async.executor;
import com.tianyalei.async.callback.DefaultGroupCallback;
import com.tianyalei.async.callback.IGroupCallback;
import com.tianyalei.async.group.WorkerWrapper; import com.tianyalei.async.group.WorkerWrapper;
import java.util.Arrays; import java.util.Arrays;
@ -11,6 +13,7 @@ import java.util.concurrent.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* 类入口可以根据自己情况调整core线程的数量
* @author wuweifeng wrote on 2019-12-18 * @author wuweifeng wrote on 2019-12-18
* @version 1.0 * @version 1.0
*/ */
@ -50,6 +53,25 @@ public class Async {
beginWork(timeout, COMMON_POOL, workerWrapper); beginWork(timeout, COMMON_POOL, workerWrapper);
} }
/**
* 异步执行,直到所有都完成,或失败后发起回调
*/
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
if (groupCallback == null) {
groupCallback = new DefaultGroupCallback();
}
IGroupCallback finalGroupCallback = groupCallback;
CompletableFuture.runAsync(() -> {
try {
beginWork(timeout, COMMON_POOL, workerWrapper);
finalGroupCallback.success(Arrays.asList(workerWrapper));
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
}
});
}
/** /**
* 总共多少个执行单元 * 总共多少个执行单元
*/ */

View File

@ -9,7 +9,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
* 最终要执行时都要放到一个group里group集合会并发所有的Wrapper * 暂时用不上
* @author wuweifeng wrote on 2019-11-19. * @author wuweifeng wrote on 2019-11-19.
*/ */
public class WorkerGroup { public class WorkerGroup {
@ -53,31 +53,6 @@ public class WorkerGroup {
return this; return this;
} }
/**
* 添加需要串行执行的worker集合一个wrapper可能只有一个worker也可能是个要串行的worker集合
*
* @param workerWrapper workerWrapper
*/
public WorkerGroup addWrapper(WorkerWrapper workerWrapper) {
if (workerWrapper == null) {
throw new NullPointerException("workerWrapper cannot be null");
}
workerWrapperList.add(workerWrapper);
return this;
}
/**
* 添加一个需要并行执行的worker
*
* @param iWorker iWorker
*/
public <T, V> WorkerGroup addWrapper(IWorker<T, V> iWorker, T param, ICallback<T, V> iCallback) {
synchronized (this) {
WorkerWrapper<?, ?> workerWrapper = new WorkerWrapper<>(iWorker, param, iCallback);
workerWrapperList.add(workerWrapper);
}
return this;
}
public WorkerGroup addWrappers(List<WorkerWrapper<?, ?>> workerWrappers) { public WorkerGroup addWrappers(List<WorkerWrapper<?, ?>> workerWrappers) {
if (workerWrappers == null) { if (workerWrappers == null) {
@ -94,24 +69,6 @@ public class WorkerGroup {
return addWrappers(Arrays.asList(workerWrappers)); return addWrappers(Arrays.asList(workerWrappers));
} }
/**
* 添加一个不需要回调的worker
*
* @param iWorker async.worker
*/
public <T, V> WorkerGroup addWrapper(IWorker<T, V> iWorker, T param) {
return this.addWrapper(iWorker, param, null);
}
/**
* 添加一个不需要回调的worker
*
* @param iWorker async.worker
*/
public <T, V> WorkerGroup addWrapper(IWorker<T, V> iWorker) {
return this.addWrapper(iWorker, null);
}
/** /**
* 返回当前worker的数量用于决定启用的线程数量 * 返回当前worker的数量用于决定启用的线程数量
* *