Java中CompletableFuture批量调用远程接口并且操作返回值

此代码的业务需求场景

原文链接:https://blog.csdn.net/w605283073/article/details/101399427
用户前端在小程序端进行订阅模板消息操作,后台将每次用户的请求订阅,记录下来。同一个openid和模板id只发送一次消息。
采用异步批量请求的方式请求三方远程接口,对于微服务之间同样适用。
获取远程接口的返回值(按照请求顺序返回),进行判断调用是否成功。成功更新本地数据库。

    public static <T, V> List<V> partitionCall2ListAsync(List<T> dataList,
                                                         int size,
                                                         ExecutorService executorService,
                                                         Function<List<T>, List<V>> function) {
 
        if (CollectionUtils.isEmpty(dataList)) {
            return new ArrayList<>(0);
        }
        Preconditions.checkArgument(size > 0, "size must not be a minus");
 
        List<CompletableFuture<List<V>>> completableFutures = Lists.partition(dataList, size)
                .stream()
                .map(eachList -> {
                    if (executorService == null) {
                        return CompletableFuture.supplyAsync(() -> function.apply(eachList));
                    } else {
                        return CompletableFuture.supplyAsync(() -> function.apply(eachList), executorService);
                    }
 
                })
                .collect(Collectors.toList());
 
 
        CompletableFuture<Void> allFinished = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
        try {
            allFinished.get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return completableFutures.stream()
                .map(CompletableFuture::join)
                .filter(CollectionUtils::isNotEmpty)
                .reduce(new ArrayList<V>(), ((list1, list2) -> {
                    List<V> resultList = new ArrayList<>();
                    if(CollectionUtils.isNotEmpty(list1)){
                       resultList.addAll(list1);
                       }
 
                    if(CollectionUtils.isNotEmpty(list2)){
                         resultList.addAll(list2);
                       }
                    return resultList;
                }));
    }

项目中实际代码

public class SubscribeMessageServiceImpl {

	ExecutorService executorService = Executors.newFixedThreadPool(10);

	private AtomicInteger atomicInteger = new AtomicInteger(0);

	private int total = 30;

	@Override
	public void sendSubscribeMessageToUser() {
		final LambdaQueryWrapper<SubscribeMessageDO> wrapper = new LambdaQueryWrapper<>();
//		每次从数据库取1000
		wrapper.eq(SubscribeMessageDO::getSendState, 2).eq(SubscribeMessageDO::getNoticeAllowanceFlag, 1).last("limit 1000");

		final List<SubscribeMessageDO> doList = list(wrapper);
		Stopwatch stopwatch = Stopwatch.createStarted();
		// 分批执行
		int size = 3;
		final List<SubscribeMessageSendVo> doList1 = partitionCall2ListAsync(doList, 3, executorService, this::someCall);
		Stopwatch stop = stopwatch.stop();
		System.err.println("执行时间: "+stop.elapsed(TimeUnit.SECONDS)+" 秒");

		// 正好几轮
		int turns;
		if (total % size == 0) {
			turns = total / size;
		} else {
			turns = total / size + 1;
		}
		System.err.println("共调用了{}次: "+turns);

		for (SubscribeMessageSendVo subscribeMessageSendVo : doList1) {
			final SubscribeMessageDO subscribeMessageDO = subscribeMessageSendVo.getSubscribeMessageDO();
			if (subscribeMessageSendVo.getCode() == 200 && subscribeMessageSendVo.getData().getErrcode() == 0) {
				final UpdateWrapper<SubscribeMessageDO> updateWrapper = new UpdateWrapper<>();
				updateWrapper.eq("id", subscribeMessageDO.getId()).set("send_state", 1).set("send_time", new Date());
				update(updateWrapper);
			}
		}
	}

	public static <T, V> List<V> partitionCall2ListAsync(List<T> dataList,
														 int size,
														 ExecutorService executorService,
														 Function<List<T>, List<V>> function
	) {
		if (CollectionUtils.isEmpty(dataList)) {
			return new ArrayList<>(0);
		}
		Preconditions.checkArgument(size > 0, "size must not be a minus");

		List<CompletableFuture<List<V>>> completableFutures = Lists.partition(dataList, size)
				.stream()
				.map(eachList -> {
					if (executorService == null) {
						return CompletableFuture.supplyAsync(() -> function.apply(eachList));
					} else {
						return CompletableFuture.supplyAsync(() -> function.apply(eachList), executorService);
					}
				})
				.collect(Collectors.toList());

		CompletableFuture<Void> allFinished = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
		try {
			allFinished.get();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		return completableFutures.stream()
				.map(CompletableFuture::join)
				.filter(CollectionUtils::isNotEmpty)
				.reduce(new ArrayList<V>(), ((list1, list2) -> {
					List<V> resultList = new ArrayList<>();
					if (CollectionUtils.isNotEmpty(list1)) {
						resultList.addAll(list1);
					}
					if (CollectionUtils.isNotEmpty(list2)) {
						resultList.addAll(list2);
					}
					return resultList;
				}));
	}

	/**
	 * 请求调用三方接口并且转换成自己的对象
	 * @param eachList 每次请求的数据
	 * @return 返回自己的对象
	 */
	private List<SubscribeMessageSendVo> someCall(List<SubscribeMessageDO> eachList) {
		log.info("当前-->{},strList.size:{}", atomicInteger.incrementAndGet(), eachList.size());

		final List<SubscribeMessageSendVo> subscribeMessageSendVoList = new ArrayList<>();
		for (SubscribeMessageDO subscribeMessageDO : eachList) {
	
			SubscribeMessageSendVo subscribeMessageSendVo = new SubscribeMessageSendVo();
			try {
				subscribeMessageSendVo = contentCenterService.subscribeMessageSend(subscribeMessageDO );
			} catch (Exception e) {
				e.printStackTrace();
			}
			subscribeMessageSendVo.setSubscribeMessageDO(subscribeMessageDO);
			subscribeMessageSendVoList.add(subscribeMessageSendVo);
		}

		return subscribeMessageSendVoList;
	}
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐