前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ListeningExecutorService的使用

ListeningExecutorService的使用

作者头像
IT云清
发布2019-01-22 15:09:45
3.7K0
发布2019-01-22 15:09:45
举报
文章被收录于专栏:IT云清IT云清

由于普通的线程池,返回的Future,功能比较单一;Guava 定义了 ListenableFuture接口并继承了JDK concurrent包下的Future 接口,ListenableFuture 允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用。

1.使用方法如下:

  • 1.创建线程池
  • 2.装饰线程池
  • 3.任务处理
  • 4.回调函数处理
  • 5.所有任务完成后处理

场景模拟: 导入一张1211条数据的Excel表格:

  • 1.每条数据处理较慢
  • 2.处理完后需要汇总数据
  • 3.处理汇总成功的数据

2.代码示例如下:

2.1接口和调用
代码语言:javascript
复制
package com.java4all.test11;

import org.python.google.common.collect.ImmutableList;
import org.python.google.common.util.concurrent.*;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
 * Author: yunqing
 * Date: 2018/9/19
 * Description:
 */
@RestController
@RequestMapping(value = "testThread")
public class TestThread {


    /**线程池*/
    static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 10, 60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    /**
     * 数据处理
     * @return
     * @throws Exception
     */
    @RequestMapping(value = "parse",method = RequestMethod.GET)
    public String parse() throws Exception{
        List<String> result = new ArrayList<>();
        List<String> list = new ArrayList<>();
        
        //模拟原始数据
        for(int i = 0; i < 1211;i ++){
            list.add(i+"-");
            System.out.println("添加原始数据:"+i);
        }

        int size = 50;//切分粒度,每size条数据,切分一块,交由一条线程处理
        int countNum = 0;//当前处理到的位置
        int count = list.size()/size;//切分块数
        int threadNum = 0;//使用线程数
        if(count*size != list.size()){
            count ++;
        }

        final CountDownLatch countDownLatch = new CountDownLatch(count);

        //使用Guava的ListeningExecutorService装饰线程池
        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

        while (countNum < count*size){
            //切割不同的数据块,分段处理
            threadNum ++;
            countNum += size;
            MyCallable myCallable = new MyCallable();
            myCallable.setList(ImmutableList.copyOf(
                    list.subList(countNum-size,list.size() > countNum ? countNum : list.size())));

            ListenableFuture listenableFuture = executorService.submit(myCallable);

            //回调函数
            Futures.addCallback(listenableFuture, new FutureCallback<List<String>>() {
                //任务处理成功时执行
                @Override
                public void onSuccess(List<String> list) {
                    countDownLatch.countDown();
                    System.out.println("第h次处理完成");
                    result.addAll(list);
                }

                //任务处理失败时执行
                @Override
                public void onFailure(Throwable throwable) {
                    countDownLatch.countDown();
                    System.out.println("处理失败:"+throwable);
                }
            });
            
        }

        //设置时间,超时了直接向下执行,不再阻塞
        countDownLatch.await(3,TimeUnit.SECONDS);

        result.stream().forEach(s -> System.out.println(s));
        System.out.println("------------结果处理完毕,返回完毕,使用线程数量:"+threadNum);

        return "处理完了";
    }
}
2.2任务处理
代码语言:javascript
复制
package com.java4all.test11;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

/**
 * Author: yunqing
 * Date: 2018/9/19
 * Description:任务处理逻辑
 */
public class MyCallable implements Callable{

    private List<String> list ;
    @Override
    public Object call() throws Exception {
        List<String> listReturn = new ArrayList<>();
        //模拟对数据处理,然后返回
        for(int i = 0;i < list.size();i++){
            listReturn.add(list.get(i)+":处理时间:"+System.currentTimeMillis()+"---:处理线程:"+Thread.currentThread());
        }

        return listReturn;
    }

    public void setList(List<String> list) {
        this.list = list;
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年09月19日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.使用方法如下:
  • 2.代码示例如下:
    • 2.1接口和调用
      • 2.2任务处理
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档