前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >偷师学艺 线程池

偷师学艺 线程池

作者头像
收心
发布2022-09-30 08:19:48
2180
发布2022-09-30 08:19:48
举报
文章被收录于专栏:Java实战博客
代码语言:javascript
复制
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.exception.ExceptionUtils;

import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;

/**
 * 线程池批量并发工具类
 *
 * @author xiongwh
 * @date 2021/08/04
 */
@Log4j2
public class ThreadPoolUtil {

    public static Builder newBuilder(int threadSize) {
        return new Builder(threadSize, null);
    }

    public static Builder newBuilder(int threadSize, String threadName) {
        return new Builder(threadSize, threadName);
    }

    public static class Builder {

        private final ThreadPoolExecutor threadPool;

        public Builder(int threadSize, String threadName) {
            if (null == threadName || "".equals(threadName)) {
                threadName = "thread-pool-";
            }
            threadName = threadName.endsWith("-") ? threadName + "thread-%d" : threadName + "-thread-%d";
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadName).build();
            this.threadPool = new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
        }

        public <T> Builder execute(List<T> list, int batchSize, Consumer<List<T>> action) {
            if (null == list || list.isEmpty()) {
                return this;
            }
            if (batchSize < 1) {
                throw new RuntimeException("the parameter batchSize cannot be less than 1");
            }
            CountDownLatch downLatch = new CountDownLatch((int) Math.ceil((double) list.size() / batchSize));
            int index = 0, toIndex = 0, listSize = list.size();
            do {
                toIndex = Math.min(listSize, toIndex + batchSize);
                List<T> subList = list.subList(index * batchSize, toIndex);
                threadPool.execute(() -> {
                    try {
                        action.accept(subList);
                    } finally {
                        downLatch.countDown();
                    }
                });
                index++;
            } while (toIndex < listSize);
            try {
                downLatch.await();
            } catch (InterruptedException e) {
                log.error("ThreadPoolUtil.execute exception:{}", ExceptionUtils.getStackTrace(e));
            }
            return this;
        }

        public <T> Builder execute(List<T> list, Consumer<T> action) {
            if (null == list || list.isEmpty()) {
                return this;
            }
            CountDownLatch downLatch = new CountDownLatch(list.size());
            list.forEach(item -> threadPool.execute(() -> {
                try {
                    action.accept(item);
                } finally {
                    downLatch.countDown();
                }
            }));
            try {
                downLatch.await();
            } catch (InterruptedException e) {
                log.error("ThreadPoolUtil.execute exception:{}", ExceptionUtils.getStackTrace(e));
            }
            return this;
        }

        public <T> Builder execute(List<T> list, Consumer<T> firstAction, Consumer<T> action) {
            if (null == list || list.isEmpty()) {
                return this;
            }
            firstAction.accept(list.get(0));
            CountDownLatch downLatch = new CountDownLatch(list.size());
            list.forEach(item -> threadPool.execute(() -> {
                try {
                    action.accept(item);
                } finally {
                    downLatch.countDown();
                }
            }));
            try {
                downLatch.await();
            } catch (InterruptedException e) {
                log.error("ThreadPoolUtil.execute exception:{}", ExceptionUtils.getStackTrace(e));
            }
            return this;
        }

        public void shutdown() {
            if (!threadPool.isShutdown()) {
                threadPool.shutdown();
            }
        }

    }

}

使用

代码语言:javascript
复制
// 创建线程池
ThreadPoolUtil.Builder threadPool = ThreadPoolUtil.newBuilder(10, "syncAcctTask");
// 使用
 threadPool.execute(newRes, threadNum - 1, list -> {
                try {
                    // 执行批量保存数据
                    boolean success = recordService.batchInsert(list);
                    if (!success) {
                        log.error("【数据同步】批量保存dh_new_account_record失败,{}", JSON.toJSONString(list));
                    }
                } catch (Exception e) {
                    log.error("【数据同步】批量保存dh_new_account_record异常,list:{},exception:{}", JSON.toJSONString(list), ExceptionUtils.getStackTrace(e));
                    // todo 使用詹帅的消息mail(当前尚未合并过来) .senJobErrorEmail("批量保存dh_new_account_record异常", e);
                }
            });
// 关闭线程池,不能少!
threadPool.shutdown();

特殊说明: 以上文章,均是我实际操作,写出来的笔记资料,不会盗用别人文章!烦请各位,请勿直接盗用!转载记得标注来源!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档