diff --git a/.gitignore b/.gitignore index a2a3040..66d02f6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ target/ !**/src/main/** !**/src/test/** + ### STS ### .apt_generated .classpath diff --git a/README.md b/README.md index 87ca5f9..bc4ea54 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ 有对区块链感兴趣的,可以参考项目作者另一个[GVP项目](https://gitee.com/tianyalei/md_blockchain),java区块链底层入门。 -有对高并发场景下,被热key打爆存储层,秒杀等场景中热数据本地缓存、热数据(刷子用户)限流等需要热key探测的,可关注[京东热key探测缓存框架](http://https://gitee.com/jd-platform-opensource/hotkey)。热key框架正在灰度内测期,已上线3000台服务器进行灰度。 +有对高并发场景下,被热key打爆存储层,秒杀等场景中热数据本地缓存、热数据(刷子用户)限流等需要热key探测的,可关注[京东热key探测缓存框架](http://https://gitee.com/jd-platform-opensource/hotkey)。热key框架正在灰度内测期,各项指标优异。 ## 并行常见的场景 diff --git a/src/main/java/com/jd/platform/async/callback/ICallback.java b/src/main/java/com/jd/platform/async/callback/ICallback.java index b77189d..6d1d8b2 100755 --- a/src/main/java/com/jd/platform/async/callback/ICallback.java +++ b/src/main/java/com/jd/platform/async/callback/ICallback.java @@ -6,15 +6,21 @@ import com.jd.platform.async.worker.WorkResult; /** * 每个执行单元执行完毕后,会回调该接口

* 需要监听执行结果的,实现该接口即可 + * * @author wuweifeng wrote on 2019-11-19. */ +@FunctionalInterface public interface ICallback { - void begin(); + /** + * 任务开始的监听 + */ + default void begin() { + + } /** * 耗时操作执行完毕后,就给value注入值 - * */ void result(boolean success, T param, WorkResult workResult); } diff --git a/src/main/java/com/jd/platform/async/callback/IWorker.java b/src/main/java/com/jd/platform/async/callback/IWorker.java index d7a16dd..ffe000a 100755 --- a/src/main/java/com/jd/platform/async/callback/IWorker.java +++ b/src/main/java/com/jd/platform/async/callback/IWorker.java @@ -1,25 +1,30 @@ package com.jd.platform.async.callback; -import com.jd.platform.async.wrapper.WorkerWrapper; - import java.util.Map; +import com.jd.platform.async.wrapper.WorkerWrapper; + /** * 每个最小执行单元需要实现该接口 + * * @author wuweifeng wrote on 2019-11-19. */ +@FunctionalInterface public interface IWorker { /** * 在这里做耗时操作,如rpc请求、IO等 * - * @param object - * object + * @param object object + * @param allWrappers 任务包装 */ V action(T object, Map allWrappers); /** * 超时、异常时,返回的默认值 + * * @return 默认值 */ - V defaultValue(); + default V defaultValue() { + return null; + } } diff --git a/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java b/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java index 12052f1..e65dd85 100644 --- a/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java +++ b/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java @@ -31,13 +31,10 @@ public class SystemClock { } private void scheduleClockUpdating() { - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "System Clock"); - thread.setDaemon(true); - return thread; - } + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread thread = new Thread(runnable, "System Clock"); + thread.setDaemon(true); + return thread; }); scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS); } diff --git a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index 50f1c2b..4ee7e6b 100755 --- a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -281,7 +281,6 @@ public class WorkerWrapper { //上游都finish了,进行自己 fire(); beginNext(poolExecutor, now, remainTime); - return; } } diff --git a/src/test/java/depend/LambdaTest.java b/src/test/java/depend/LambdaTest.java new file mode 100644 index 0000000..42c1bb2 --- /dev/null +++ b/src/test/java/depend/LambdaTest.java @@ -0,0 +1,74 @@ +package depend; + +import java.util.Map; + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +/** + * @author sjsdfg + * @since 2020/6/14 + */ +public class LambdaTest { + public static void main(String[] args) throws Exception { + WorkerWrapper, String> workerWrapper2 = new WorkerWrapper.Builder, String>() + .worker((WorkResult result, Map allWrappers) -> { + System.out.println("par2的入参来自于par1: " + result.getResult()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return result.getResult().getName(); + }) + .callback((boolean success, WorkResult param, WorkResult workResult) -> + System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult))) + .id("third") + .build(); + + WorkerWrapper, User> workerWrapper1 = new WorkerWrapper.Builder, User>() + .worker((WorkResult result, Map allWrappers) -> { + System.out.println("par1的入参来自于par0: " + result.getResult()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new User("user1"); + }) + .callback((boolean success, WorkResult param, WorkResult workResult) -> + System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult))) + .id("second") + .next(workerWrapper2) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker((String object, Map allWrappers) -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new User("user0"); + }) + .param("0") + .id("first") + .next(workerWrapper1, true) + .callback((boolean success, String param, WorkResult workResult) -> + System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult))) + .build(); + + //虽然尚未执行,但是也可以先取得结果的引用,作为下一个任务的入参。V1.2前写法,需要手工给 + //V1.3后,不用给wrapper setParam了,直接在worker的action里自行根据id获取即可.参考dependnew包下代码 + WorkResult result = workerWrapper.getWorkResult(); + WorkResult result1 = workerWrapper1.getWorkResult(); + workerWrapper1.setParam(result); + workerWrapper2.setParam(result1); + + Async.beginWork(3500, workerWrapper); + + System.out.println(workerWrapper2.getWorkResult()); + Async.shutDown(); + } +}