前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >批量处理工具类

批量处理工具类

作者头像
LiosWong
发布2021-12-13 13:56:12
3950
发布2021-12-13 13:56:12
举报
文章被收录于专栏:后端沉思录后端沉思录

业务开发中,时常会批量执行任务,例如批量同时调用4个http接口或者rpc接口,这类业务代码执行具有通用性,为了提高开发效率、可复用性、可扩展性,简化代码,抽象出通用的工具类,方便开发同学使用。使用者只关心入参、具体任务执行、以及任务执行结果、线程池,并不关心批量处理的过程。

任务处理流程图

代码

代码语言:javascript
复制
public class BatchQuery {

    /**
     * 并行且异步处理结果
     *
     * @param tasks    任务列表
     * @param p        参数
     * @param handle   具体业务处理
     * @param complete 完成处理逻辑
     * @param executor 线程池
     * @param <T>
     * @param <P>
     * @param <R>
     */
    public static <T, P, R> void asyncQueryHandleAsync(List<T> tasks, P p, Function<T, P, R> handle,
        BiConsumer<R, Throwable> complete, Executor executor) {

        Objects.requireNonNull(p);

        Optional.ofNullable(tasks).ifPresent(task -> {

            val cfs = task.stream()
                .map(t ->
                    CompletableFuture.supplyAsync(
                        () -> handle.apply(t, p), executor).whenCompleteAsync(complete)
                ).toArray(CompletableFuture[]::new);

            //等待总任务完成
            CompletableFuture.allOf(cfs).join();
        });
    }

    /**
     * 并行且同步处理结果
     *
     * @param tasks    任务列表
     * @param p        参数
     * @param handle   具体业务处理
     * @param complete 完成处理逻辑
     * @param executor 线程池
     * @param <T>
     * @param <P>
     * @param <R>
     */
    public static <T, P, R> void asyncQueryHandleSync(List<T> tasks, P p, Function<T, P, R> handle,
        BiConsumer<R, Throwable> complete, Executor executor) {

        Objects.requireNonNull(p);

        Optional.ofNullable(tasks).ifPresent(task -> {

            val cfs = task.stream()
                .map(t ->
                    CompletableFuture.supplyAsync(
                        () -> handle.apply(t, p), executor).whenComplete(complete)
                ).toArray(CompletableFuture[]::new);

            //等待总任务完成
            CompletableFuture.allOf(cfs).join();
        });
    }
}


@FunctionalInterface
public interface Function<T, P, R> {

    /**
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @param p
     * @return the function result
     */
    R apply(T t, P p);
}

使用示例

代码语言:javascript
复制
 List<OrderDTO> orderDTOS = Lists.newArrayList();
        try {
            BatchQuery.asyncQueryHandleSync(BIZ_TYPE_LIST, onTheWayOrderSwitchProcessCtx.getOnTheWayOrderSwitchParam(),
                (bizTypeEnum, orderSwitchParam) ->
                    standardApiSupport
                        .getDriverOrderApiByBizType(bizTypeEnum.getValue(), CONFIG)
                        .queryOrderList
                            (
                                OrderConverter
                                    .buildDriverOrderReqDTO(currentStartTime, currentEndTime, bizTypeEnum,
                                        orderSwitchParam)
                            ), (r, ex) -> {
                    if (Objects.isNull(ex) && JudgeIsSuccessUtil.judgeDataNotNull(r)) {
                        orderDTOS.addAll(r.getDataList());
                    }
                }, RPC_SEARCH_EXECUTOR_SERVICE);
        } catch (Exception e) {
            throw new ApiException();
        }

使用者需要传入具体的任务,指定线程池,以及共同参数P,P的存在具有合理性,往往任务会使用共同的参数,因此自定义了函数式接口Function,以及具体的处理handle,handle里可以做差异化处理,执行结果会在complete中拿到,做具体的业务处理,可以大大减少重复代码。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 任务处理流程图
  • 代码
  • 使用示例
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档