微服务并行计算框架asyncTool使用说明
一、为什么要使用并行计算框架1 复杂的微服务系统间调用经常会有这样的调用场景:app(或web前端)调用后台的一个接口,该接口接到该请求后,需要调用其他多个微服务来获取数据,最终汇总一个最终结果返回给用户。譬如用户请求“我的订单”,后台在收到请求后,就需要去调用用户详情rpc、商品详情rpc、库存rpc、优惠券rpc等等很多个服务。有些服务是可以并行去请求的,但有些服务是依赖于某个服务的返回值的(
一、为什么要使用并行计算框架
1 复杂的微服务系统间调用
经常会有这样的调用场景:app(或web前端)调用后台的一个接口,该接口接到该请求后,需要调用其他多个微服务来获取数据,最终汇总一个最终结果返回给用户。
譬如用户请求“我的订单”,后台在收到请求后,就需要去调用用户详情rpc、商品详情rpc、库存rpc、优惠券rpc等等很多个服务。有些服务是可以并行去请求的,但有些服务是依赖于某个服务的返回值的(如查库存、优惠券,就依赖于商品详情回复到达后才能去请求)。
2 工作流式的任务编排
譬如在数据清洗领域,经常会有这样的需求,从多个数据源分别拉取数据,做第一步清洗,之后等某一步完成、或某几步都完成、或至少某几步完成,进行下一步任务。整个流程有明显的依赖顺序,以及任意可能存在的阻塞、异常、超时等情况。
如何将整个流程进行编排并让其按照设定顺序执行,并能合理处理异常情况,是一个并行框架所要有的功能。
所以一个并行框架拥有的功能简单来说,至少应具备下图的这种顺序编排能力,常见流程如下图所示:
3 并行计算提高性能
例如生成一个订单 串行计算所需要时间:
改为并行计算只需要一秒,如下图所示:
二、并行计算框架
并行计算框架:asyncTool
软件简介
解决任意的多线程并行、串行、阻塞、依赖、回调的并行框架,可以任意组合各线程的执行顺序,带全链路执行结果回调。多线程编排一站式解决方案。来自于京东主 App 后台。
该框架目前正在京东 App 后台接受苛刻、高并发、海量用户等复杂场景业务的检验测试,随时会根据实际情况发布更新和 bugFix。
详细介绍如下:
https://gitee.com/jd-platform-opensource/asyncTool
三、详细使用
3.1 结合微服务使用示例
场景,比如查询某个商品详情需要查询满减、立减、团购三个服务的信息后组成商品详情,如下图所示,其中满减、立减、团购是可以并行计算的。
3.2.1 立减服务代码示例
@Slf4j
@Component
public class LijianWorker implements IWorker<Long, String>, ICallback<Long, String> {
@Autowired
private LijianRemoteService lijianRemoteService;
@Override
public String action(Long object, Map<String, WorkerWrapper> allWrappers) {
try {
long start = System.currentTimeMillis();
Thread.sleep(5000);
UUserVo user = lijianRemoteService.getOpenUserById(1, "123456789");
log.info("user:{}",user);
long end =System.currentTimeMillis();
log.info("立减运行时长:{}", end - start);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "下单立减10元!!!";
}
@Override
public String defaultValue() {
return "default value:下单立减10元!!!";
}
@Override
public void begin() {
System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, Long param, WorkResult<String> workResult) {
System.out.println("立减 worker 的结果是:" + workResult.getResult());
}
}
3.2.2 满减服务代码示例
@Slf4j
@Component
public class ManjianWorker implements IWorker<Long, String>, ICallback<Long, String> {
@Autowired
private RemoteManjianService remoteManjianService;
@Override
public String action(Long object, Map<String, WorkerWrapper> allWrappers) {
try {
long start = System.currentTimeMillis();
log.info("入参:{}", object);
Thread.sleep(3000);
UUserVo user = remoteManjianService.getOpenUserById(object.intValue(), "123456789");
log.info("user:{}",user);
long end =System.currentTimeMillis();
log.info("满减运行时长:{}", end - start);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "三件满减下单立减30元!!!";
}
@Override
public String defaultValue() {
return "default value:三件满减下单立减30元!!!";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, Long param, WorkResult<String> workResult) {
System.out.println("满减 worker 的结果是:" + workResult.getResult());
}
}
**3.2.3 团购服务代码示例**
@Slf4j
@Component
public class TuangouWorker implements IWorker<Long, String>, ICallback<Long, String> {
@Autowired
private RemoteTuangouService remoteTuangouService;
@Override
public String action(Long object, Map<String, WorkerWrapper> allWrappers) {
try {
long start = System.currentTimeMillis();
Thread.sleep(6000);
UUserVo user = remoteTuangouService.getOpenUserById(1, "123456789");
log.info("user:{}",user);
long end =System.currentTimeMillis();
log.info("团购运行时长:{}", end - start);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "下团购仅售1000元!!!";
}
@Override
public String defaultValue() {
return "default value: 团购仅售1000元!!!";
}
@Override
public void begin() {
System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, Long param, WorkResult<String> workResult) {
System.out.println("团购 worker 的结果是:" + workResult.getResult());
}
}
3.2.4 商品详情示例
@Slf4j
@Component
public class ItemWorker implements IWorker<Item, Item>, ICallback<Item, Item> {
@Override
public Item action(Item object, Map<String, WorkerWrapper> allWrappers) {
log.info("-----------------");
log.info("立减的查询结果是: " + allWrappers.get("one").getWorkResult());
log.info("满减的查询结果是: " + allWrappers.get("two").getWorkResult());
log.info("团购的查询结果是: " + allWrappers.get("third").getWorkResult());
String lijian = (String) allWrappers.get("one").getWorkResult().getResult();
String manjian = (String) allWrappers.get("two").getWorkResult().getResult();
String tuangou = (String) allWrappers.get("third").getWorkResult().getResult();
object.setLijian(lijian);
object.setManjian(manjian);
object.setTuangou(tuangou);
return object;
}
@Override
public Item defaultValue() {
return new Item();
}
@Override
public void begin() {
System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, Item param, WorkResult<Item> workResult) {
System.out.println("商品展示的结果是:" + workResult.getResult());
}
}
3.2.5 组合并行计算
写法一:
@Service
public class ItemServiceImpl implements ItemService {
@Autowired
private ItemWorker itemWorker;
@Autowired
private LijianWorker lijianWorker;
@Autowired
private ManjianWorker manjianWorker;
@Autowired
private TuangouWorker tuangouWorker;
@Override
public void queryItemDetails() throws ExecutionException, InterruptedException {
long itemID = 8102587462444444L;
Item item =new Item();
item.setItemName("小米手机 12 ");
WorkerWrapper<Long, String> lijianWrapper = new WorkerWrapper.Builder<Long, String>()
.worker(lijianWorker)
.param(itemID)
// .next(itemWrapper)
// .depend(itemWrapper)
.callback(lijianWorker)
.id("one")
.build();
WorkerWrapper<Long, String> manjianWrapper = new WorkerWrapper.Builder<Long, String>()
.worker(manjianWorker)
.param(itemID)
// .depend(itemWrapper)
// .next(itemWrapper)
.callback(manjianWorker)
.id("two")
.build();
WorkerWrapper<Long, String> tuangouWrapper = new WorkerWrapper.Builder<Long, String>()
.worker(tuangouWorker)
.param(itemID)
// .depend(itemWrapper)
// .next(itemWrapper)
.callback(tuangouWorker)
.id("third")
.build();
WorkerWrapper<Item, Item> itemWrapper = new WorkerWrapper.Builder<Item, Item>()
.worker(itemWorker)
.param(item)
.callback(itemWorker)
.depend(lijianWrapper, manjianWrapper,tuangouWrapper)
.id("four")
.build();
Async.beginWork(10000, lijianWrapper, manjianWrapper,tuangouWrapper);
// Thread.sleep(10000);
System.out.println(itemWrapper.getWorkResult().getResult());
Async.shutDown();
}
方法二:
@Override
public void queryItemDetailsReverse() throws ExecutionException, InterruptedException {
long itemID = 8102587462444444L;
Item item =new Item();
item.setItemName("小米手机 12 ");
WorkerWrapper<Item, Item> itemWrapper = new WorkerWrapper.Builder<Item, Item>()
.worker(itemWorker)
.param(item)
.callback(itemWorker)
// .depend(lijianWrapper, manjianWrapper,tuangouWrapper)
.id("four")
.build();
WorkerWrapper<Long, String> lijianWrapper = new WorkerWrapper.Builder<Long, String>()
.worker(lijianWorker)
.param(itemID)
.next(itemWrapper)
// .depend(itemWrapper)
.callback(lijianWorker)
.id("one")
.build();
WorkerWrapper<Long, String> manjianWrapper = new WorkerWrapper.Builder<Long, String>()
.worker(manjianWorker)
.param(itemID)
// .depend(itemWrapper)
.next(itemWrapper)
.callback(manjianWorker)
.id("two")
.build();
WorkerWrapper<Long, String> tuangouWrapper = new WorkerWrapper.Builder<Long, String>()
.worker(tuangouWorker)
.param(itemID)
// .depend(itemWrapper)
.next(itemWrapper)
.callback(tuangouWorker)
.id("third")
.build();
Async.beginWork(10000, lijianWrapper, manjianWrapper,tuangouWrapper);
// Thread.sleep(10000);
System.out.println(itemWrapper.getWorkResult().getResult());
Async.shutDown();
}
运行结果:
更多推荐
所有评论(0)