This commit is contained in:
wuweifeng10 2020-08-03 20:39:25 +08:00
commit 4619e891e2
7 changed files with 98 additions and 16 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@ target/
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath

View File

@ -7,7 +7,7 @@
有对区块链感兴趣的,可以参考项目作者另一个[GVP项目](https://gitee.com/tianyalei/md_blockchain)java区块链底层入门。
有对高并发场景下被热key打爆存储层秒杀等场景中热数据本地缓存、热数据刷子用户限流等需要热key探测的可关注[京东热key探测缓存框架](http://https://gitee.com/jd-platform-opensource/hotkey)。热key框架正在灰度内测期已上线3000台服务器进行灰度
有对高并发场景下被热key打爆存储层秒杀等场景中热数据本地缓存、热数据刷子用户限流等需要热key探测的可关注[京东热key探测缓存框架](http://https://gitee.com/jd-platform-opensource/hotkey)。热key框架正在灰度内测期各项指标优异
## 并行常见的场景

View File

@ -6,15 +6,21 @@ import com.jd.platform.async.worker.WorkResult;
/**
* 每个执行单元执行完毕后会回调该接口</p>
* 需要监听执行结果的实现该接口即可
*
* @author wuweifeng wrote on 2019-11-19.
*/
@FunctionalInterface
public interface ICallback<T, V> {
void begin();
/**
* 任务开始的监听
*/
default void begin() {
}
/**
* 耗时操作执行完毕后就给value注入值
*
*/
void result(boolean success, T param, WorkResult<V> workResult);
}

View File

@ -1,25 +1,30 @@
package com.jd.platform.async.callback;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
import com.jd.platform.async.wrapper.WorkerWrapper;
/**
* 每个最小执行单元需要实现该接口
*
* @author wuweifeng wrote on 2019-11-19.
*/
@FunctionalInterface
public interface IWorker<T, V> {
/**
* 在这里做耗时操作如rpc请求IO等
*
* @param object
* object
* @param object object
* @param allWrappers 任务包装
*/
V action(T object, Map<String, WorkerWrapper> allWrappers);
/**
* 超时异常时返回的默认值
*
* @return 默认值
*/
V defaultValue();
default V defaultValue() {
return null;
}
}

View File

@ -31,13 +31,10 @@ public class SystemClock {
}
private void scheduleClockUpdating() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "System Clock");
thread.setDaemon(true);
return thread;
}
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable, "System Clock");
thread.setDaemon(true);
return thread;
});
scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
}

View File

@ -281,7 +281,6 @@ public class WorkerWrapper<T, V> {
//上游都finish了进行自己
fire();
beginNext(poolExecutor, now, remainTime);
return;
}
}

View File

@ -0,0 +1,74 @@
package depend;
import java.util.Map;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
/**
* @author sjsdfg
* @since 2020/6/14
*/
public class LambdaTest {
public static void main(String[] args) throws Exception {
WorkerWrapper<WorkResult<User>, String> workerWrapper2 = new WorkerWrapper.Builder<WorkResult<User>, String>()
.worker((WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) -> {
System.out.println("par2的入参来自于par1 " + result.getResult());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result.getResult().getName();
})
.callback((boolean success, WorkResult<User> param, WorkResult<String> workResult) ->
System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult)))
.id("third")
.build();
WorkerWrapper<WorkResult<User>, User> workerWrapper1 = new WorkerWrapper.Builder<WorkResult<User>, User>()
.worker((WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) -> {
System.out.println("par1的入参来自于par0 " + result.getResult());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new User("user1");
})
.callback((boolean success, WorkResult<User> param, WorkResult<User> workResult) ->
System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult)))
.id("second")
.next(workerWrapper2)
.build();
WorkerWrapper<String, User> workerWrapper = new WorkerWrapper.Builder<String, User>()
.worker((String object, Map<String, WorkerWrapper> allWrappers) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new User("user0");
})
.param("0")
.id("first")
.next(workerWrapper1, true)
.callback((boolean success, String param, WorkResult<User> workResult) ->
System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult)))
.build();
//虽然尚未执行但是也可以先取得结果的引用作为下一个任务的入参V1.2前写法需要手工给
//V1.3后不用给wrapper setParam了直接在worker的action里自行根据id获取即可.参考dependnew包下代码
WorkResult<User> result = workerWrapper.getWorkResult();
WorkResult<User> result1 = workerWrapper1.getWorkResult();
workerWrapper1.setParam(result);
workerWrapper2.setParam(result1);
Async.beginWork(3500, workerWrapper);
System.out.println(workerWrapper2.getWorkResult());
Async.shutDown();
}
}