前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程池工作窃取实例

线程池工作窃取实例

作者头像
code4it
发布2018-09-17 15:00:23
8530
发布2018-09-17 15:00:23
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要来展示一下简版的work stealing线程池的实现。

Executors

Executors默认提供了几个工厂方法

代码语言:javascript
复制
/**
     * Creates a thread pool that maintains enough threads to support
     * the given parallelism level, and may use multiple queues to
     * reduce contention. The parallelism level corresponds to the
     * maximum number of threads actively engaged in, or available to
     * engage in, task processing. The actual number of threads may
     * grow and shrink dynamically. A work-stealing pool makes no
     * guarantees about the order in which submitted tasks are
     * executed.
     *
     * @param parallelism the targeted parallelism level
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code parallelism <= 0}
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

    /**
     * Creates a work-stealing thread pool using all
     * {@link Runtime#availableProcessors available processors}
     * as its target parallelism level.
     * @return the newly created thread pool
     * @see #newWorkStealingPool(int)
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

思路

ForkJoinPool主要用到的是双端队列,不过这里我们粗糙的实现的话,也可以不用到deque。

代码语言:javascript
复制
public class WorkStealingChannel<T> {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkStealingChannel.class);

    BlockingDeque<T>[] managedQueues;

    AtomicLongMap<Integer> stat = AtomicLongMap.create();

    public WorkStealingChannel() {
        int nCPU = Runtime.getRuntime().availableProcessors();
        int queueCount = nCPU / 2 + 1;
        managedQueues = new LinkedBlockingDeque[queueCount];
        for(int i=0;i<queueCount;i++){
            managedQueues[i] = new LinkedBlockingDeque<T>();
        }
    }

    public void put(T item) throws InterruptedException {
        int targetIndex = Math.abs(item.hashCode() % managedQueues.length);
        BlockingQueue<T> targetQueue = managedQueues[targetIndex];
        targetQueue.put(item);
    }

    public T take() throws InterruptedException {
        int rdnIdx = ThreadLocalRandom.current().nextInt(managedQueues.length);
        int idx = rdnIdx;
        while (true){
            idx = idx % managedQueues.length;
            T item = null;
            if(idx == rdnIdx){
                item = managedQueues[idx].poll();
            }else{
                item = managedQueues[idx].pollLast();
            }
            if(item != null){
                LOGGER.info("take ele from queue {}",idx);
                stat.addAndGet(idx,1);
                return item;
            }
            idx++;
            if(idx == rdnIdx){
                break;
            }
        }

        //走完一轮没有,则随机取一个等待
        LOGGER.info("wait for queue:{}",rdnIdx);
        stat.addAndGet(rdnIdx,1);
        return managedQueues[rdnIdx].take();
    }

    public AtomicLongMap<Integer> getStat() {
        return stat;
    }
}

测试实例

代码语言:javascript
复制
public class WorkStealingDemo {

    static final WorkStealingChannel<String> channel = new WorkStealingChannel<>();

    static volatile boolean running = true;

    static class Producer extends Thread{
        @Override
        public void run() {
            while(running){
                try {
                    channel.put(UUID.randomUUID().toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer extends Thread{
        @Override
        public void run() {
            while(running){
                try {
                    String value = channel.take();
                    System.out.println(value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void stop(){
        running = false;
        System.out.println(channel.getStat());
    }

    public static void main(String[] args) throws InterruptedException {
        int nCPU = Runtime.getRuntime().availableProcessors();
        int consumerCount = nCPU / 2 + 1;
        for (int i = 0; i < nCPU; i++) {
            new Producer().start();
        }

        for (int i = 0; i < consumerCount; i++) {
            new Consumer().start();
        }

        Thread.sleep(30*1000);
        stop();
    }
}

输出

代码语言:javascript
复制
{0=660972, 1=660613, 2=661537, 3=659846, 4=659918}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-09-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Executors
  • 思路
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档