一、为什么要使用并行计算框架

1 复杂的微服务系统间调用
经常会有这样的调用场景:app(或web前端)调用后台的一个接口,该接口接到该请求后,需要调用其他多个微服务来获取数据,最终汇总一个最终结果返回给用户。

譬如用户请求“我的订单”,后台在收到请求后,就需要去调用用户详情rpc、商品详情rpc、库存rpc、优惠券rpc等等很多个服务。有些服务是可以并行去请求的,但有些服务是依赖于某个服务的返回值的(如查库存、优惠券,就依赖于商品详情回复到达后才能去请求)。

2 工作流式的任务编排
譬如在数据清洗领域,经常会有这样的需求,从多个数据源分别拉取数据,做第一步清洗,之后等某一步完成、或某几步都完成、或至少某几步完成,进行下一步任务。整个流程有明显的依赖顺序,以及任意可能存在的阻塞、异常、超时等情况。

如何将整个流程进行编排并让其按照设定顺序执行,并能合理处理异常情况,是一个并行框架所要有的功能。

所以一个并行框架拥有的功能简单来说,至少应具备下图的这种顺序编排能力,常见流程如下图所示:
在这里插入图片描述
3 并行计算提高性能
例如生成一个订单 串行计算所需要时间:
在这里插入图片描述
改为并行计算只需要一秒,如下图所示:
在这里插入图片描述

二、并行计算框架

并行计算框架:asyncTool

软件简介

解决任意的多线程并行、串行、阻塞、依赖、回调的并行框架,可以任意组合各线程的执行顺序,带全链路执行结果回调。多线程编排一站式解决方案。来自于京东主 App 后台。

该框架目前正在京东 App 后台接受苛刻、高并发、海量用户等复杂场景业务的检验测试,随时会根据实际情况发布更新和 bugFix。

详细介绍如下:

https://gitee.com/jd-platform-opensource/asyncTool

三、详细使用

3.1 结合微服务使用示例

场景,比如查询某个商品详情需要查询满减、立减、团购三个服务的信息后组成商品详情,如下图所示,其中满减、立减、团购是可以并行计算的。
在这里插入图片描述

3.2.1 立减服务代码示例
@Slf4j
@Component
public class LijianWorker implements IWorker<Long, String>, ICallback<Long, String> {
    @Autowired
    private LijianRemoteService lijianRemoteService;

    @Override
    public String action(Long object, Map<String, WorkerWrapper> allWrappers) {
        try {
            long start = System.currentTimeMillis();
            Thread.sleep(5000);
            UUserVo user = lijianRemoteService.getOpenUserById(1, "123456789");
            log.info("user:{}",user);
            long end =System.currentTimeMillis();
            log.info("立减运行时长:{}", end - start);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "下单立减10元!!!";
    }


    @Override
    public String defaultValue() {
        return "default value:下单立减10元!!!";
    }

    @Override
    public void begin() {
        System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
    }

    @Override
    public void result(boolean success, Long param, WorkResult<String> workResult) {
        System.out.println("立减 worker 的结果是:" + workResult.getResult());
    }

}

3.2.2 满减服务代码示例
@Slf4j
@Component
public class ManjianWorker implements IWorker<Long, String>, ICallback<Long, String> {
    @Autowired
    private RemoteManjianService remoteManjianService;
    @Override
    public String action(Long object, Map<String, WorkerWrapper> allWrappers) {
        try {
            long start = System.currentTimeMillis();
            log.info("入参:{}", object);
            Thread.sleep(3000);
            UUserVo user = remoteManjianService.getOpenUserById(object.intValue(), "123456789");
            log.info("user:{}",user);
            long end =System.currentTimeMillis();
            log.info("满减运行时长:{}", end - start);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "三件满减下单立减30元!!!";
    }

    @Override
    public String defaultValue() {
        return "default value:三件满减下单立减30元!!!";
    }

    @Override
    public void begin() {
        //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
    }


    @Override
    public void result(boolean success, Long param, WorkResult<String> workResult) {
        System.out.println("满减 worker 的结果是:" + workResult.getResult());
    }

}

**3.2.3 团购服务代码示例**

@Slf4j
@Component
public class TuangouWorker implements IWorker<Long, String>, ICallback<Long, String> {
    @Autowired
    private RemoteTuangouService remoteTuangouService;

    @Override
    public String action(Long object, Map<String, WorkerWrapper> allWrappers) {
        try {
            long start = System.currentTimeMillis();
            Thread.sleep(6000);
            UUserVo user = remoteTuangouService.getOpenUserById(1, "123456789");
            log.info("user:{}",user);
            long end =System.currentTimeMillis();
            log.info("团购运行时长:{}", end - start);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "下团购仅售1000元!!!";
    }


    @Override
    public String defaultValue() {
        return "default value: 团购仅售1000元!!!";
    }

    @Override
    public void begin() {
        System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
    }

    @Override
    public void result(boolean success, Long param, WorkResult<String> workResult) {
        System.out.println("团购 worker 的结果是:" + workResult.getResult());
    }

}
3.2.4 商品详情示例
@Slf4j
@Component
public class ItemWorker implements IWorker<Item, Item>, ICallback<Item, Item> {

    @Override
    public Item action(Item object, Map<String, WorkerWrapper> allWrappers) {
        log.info("-----------------");
        log.info("立减的查询结果是: " + allWrappers.get("one").getWorkResult());
        log.info("满减的查询结果是: " + allWrappers.get("two").getWorkResult());
        log.info("团购的查询结果是: " + allWrappers.get("third").getWorkResult());

        String lijian = (String) allWrappers.get("one").getWorkResult().getResult();
        String manjian = (String) allWrappers.get("two").getWorkResult().getResult();
        String tuangou = (String) allWrappers.get("third").getWorkResult().getResult();
        object.setLijian(lijian);
        object.setManjian(manjian);
        object.setTuangou(tuangou);
        return object;
    }

    @Override
    public Item defaultValue() {
        return new Item();
    }

    @Override
    public void begin() {
        System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
    }

    @Override
    public void result(boolean success, Item param, WorkResult<Item> workResult) {
        System.out.println("商品展示的结果是:" + workResult.getResult());
    }
}
3.2.5 组合并行计算

写法一:

@Service
public class ItemServiceImpl implements ItemService {
    @Autowired
    private ItemWorker itemWorker;
    @Autowired
    private LijianWorker lijianWorker;
    @Autowired
    private ManjianWorker manjianWorker;
    @Autowired
    private TuangouWorker tuangouWorker;

    @Override
    public void queryItemDetails() throws ExecutionException, InterruptedException {
        long itemID = 8102587462444444L;
        Item item =new Item();
        item.setItemName("小米手机 12 ");

        WorkerWrapper<Long, String> lijianWrapper = new WorkerWrapper.Builder<Long, String>()
                .worker(lijianWorker)
                .param(itemID)
//                .next(itemWrapper)
//                .depend(itemWrapper)
                .callback(lijianWorker)
                .id("one")
                .build();

        WorkerWrapper<Long, String> manjianWrapper = new WorkerWrapper.Builder<Long, String>()
                .worker(manjianWorker)
                .param(itemID)
//                .depend(itemWrapper)
//                .next(itemWrapper)
                .callback(manjianWorker)
                .id("two")
                .build();

        WorkerWrapper<Long, String> tuangouWrapper = new WorkerWrapper.Builder<Long, String>()
                .worker(tuangouWorker)
                .param(itemID)
//                .depend(itemWrapper)
//                .next(itemWrapper)
                .callback(tuangouWorker)
                .id("third")
                .build();

        WorkerWrapper<Item, Item> itemWrapper =  new WorkerWrapper.Builder<Item, Item>()
                .worker(itemWorker)
                .param(item)
                .callback(itemWorker)
                .depend(lijianWrapper, manjianWrapper,tuangouWrapper)
                .id("four")
                .build();

        Async.beginWork(10000, lijianWrapper, manjianWrapper,tuangouWrapper);
//        Thread.sleep(10000);
        System.out.println(itemWrapper.getWorkResult().getResult());
        Async.shutDown();
    }

方法二:

 @Override
    public void queryItemDetailsReverse() throws ExecutionException, InterruptedException {
        long itemID = 8102587462444444L;
        Item item =new Item();
        item.setItemName("小米手机 12 ");
        WorkerWrapper<Item, Item> itemWrapper =  new WorkerWrapper.Builder<Item, Item>()
                .worker(itemWorker)
                .param(item)
                .callback(itemWorker)
//                .depend(lijianWrapper, manjianWrapper,tuangouWrapper)
                .id("four")
                .build();

        WorkerWrapper<Long, String> lijianWrapper = new WorkerWrapper.Builder<Long, String>()
                .worker(lijianWorker)
                .param(itemID)
                .next(itemWrapper)
//                .depend(itemWrapper)
                .callback(lijianWorker)
                .id("one")
                .build();

        WorkerWrapper<Long, String> manjianWrapper = new WorkerWrapper.Builder<Long, String>()
                .worker(manjianWorker)
                .param(itemID)
//                .depend(itemWrapper)
                .next(itemWrapper)
                .callback(manjianWorker)
                .id("two")
                .build();

        WorkerWrapper<Long, String> tuangouWrapper = new WorkerWrapper.Builder<Long, String>()
                .worker(tuangouWorker)
                .param(itemID)
//                .depend(itemWrapper)
                .next(itemWrapper)
                .callback(tuangouWorker)
                .id("third")
                .build();

        Async.beginWork(10000, lijianWrapper, manjianWrapper,tuangouWrapper);
//        Thread.sleep(10000);
        System.out.println(itemWrapper.getWorkResult().getResult());
        Async.shutDown();
    }

运行结果:
在这里插入图片描述

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐