前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java 并发排序

Java 并发排序

作者头像
一灰灰blog
发布2018-02-06 14:45:40
1K0
发布2018-02-06 14:45:40
举报
文章被收录于专栏:小灰灰小灰灰

利用并发来实现排序

<!-- create time: 2016-03-14 09:49:12 -->

1. 说明

本节主要结合排序的实例来演示多线程执行任务的流程,主要使用了线程池 ExecutorService , 闭锁 Futrue, 完成服务 CompletionService 以及最常见的冒泡排序算法

基本介绍

  • ExecutorService 线程池

private static ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("int-sort"), new ThreadPoolExecutor.CallerRunsPolicy());

    上面是创建一个线程池的实例,其中几个参数分别为:(来自jdk)
        
    ```java
    /**
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
     public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    ```
    
- Futrue
    
    > 就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果
    
    ```java
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • CompletionService 如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService

2. 实例

import io.netty.util.concurrent.DefaultThreadFactory;
import org.junit.Test;

import java.util.concurrent.*;

/**
 * Created by yihui on 16/3/11.
 */
public class RunnableTest {
    private static ExecutorService executorService = new ThreadPoolExecutor(5,
            10,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("sort-calculate"),
            new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * 随机生成一些数
     *
     * @param size
     * @return
     */
    private int[] genNums(final int size) {
        int[] num = new int[size];

        for (int i = 0; i < size; i++) {
            num[i] = (int) (Math.random() * 1230);
        }

        return num;
    }


    // 冒泡排序
    private int[] sort(int[] num, int size) {
        if (size <= 1) {
            return num;
        }

        int tmp;
        for (int i = 0; i < size; i++) {
            for (int j = i + 1; j < size; j++) {
                if (num[i] > num[j]) {
                    tmp = num[i];
                    num[i] = num[j];
                    num[j] = tmp;
                }
            }
        }

        return num;
    }

    // 合并两个排序数组
    public int[] merge(int[] ans, int[] sub) {
        if (ans == null) {
            return sub;
        }


        int ansSize = ans.length;
        int subSize = sub.length;
        int[] result = new int[subSize + ansSize];

        for (int i =0, ansIndex=0, subIndex=0; i < ansSize + subSize; i ++) {
            if (subIndex >= subSize) {
                result[i] = ans[ansIndex ++];
                continue;
            }

            if (ansIndex >= ansSize) {
                result[i] = sub[subIndex ++];
                continue;
            }

            if (ans[ansIndex] < sub[subIndex]) {
                result[i] = ans[ansIndex ++];
            } else {
                result[i] = sub[subIndex ++];
            }
        }
        return result;
    }


    public int[] calculate(int[] numbers, int size) {
        CompletionService<int[]> completionService = new ExecutorCompletionService<int[]>(executorService);
        if (size <= 50) {
            return this.sort(numbers, size);
        }


        // 将数组分割,50个作为一组,进行排序
        int subNum = (size - 1) / 50 + 1;
        for (int i = 0; i < subNum; i++) {
            int len = 50;
            if (i == subNum - 1) {
                len = size - 50 * i;
            }
            final int[] subNumbers = new int[len];
            System.arraycopy(numbers, i * 50 + 0, subNumbers, 0, len);

            final int finalLen = len;
            Callable<int[]> runnable = new Callable<int[]>() {
                @Override
                public int[] call() throws Exception {
                    return sort(subNumbers, finalLen);
                }
            };
            completionService.submit(runnable);
        }

        int[] ans = null;


        // 开始对提交的排序任务的结果进行合并
        try{
             for (int i = 0; i < subNum; i ++) {
                 // get and remove the result
                 Future<int[]> f = completionService.take();
                 int[] tmp = f.get();
                 ans = this.merge(ans, tmp);
             }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return ans;
    }

    // 输出数组
    private void print(int[] num, int size, boolean newLine) {
        for (int i = 0; i < size; i++) {
            System.out.print(num[i] + ",");
        }
        if (newLine) {
            System.out.println();
        }
    }

    @Test
    public void tt() {
        int size = 250;
        int[] numbers = this.genNums(size);
        int[] numbers2 = new int[size];
        System.arraycopy(numbers, 0, numbers2, 0, size);

        long start = System.nanoTime();
        this.sort(numbers, size);
        long end = System.nanoTime();
        this.print(numbers, size, true);
        System.out.println("Cost is : " + (end - start) / 1000);

        this.print(numbers2, size, true);
        start = System.nanoTime();
        int[] ans = this.calculate(numbers2, size);
        end = System.nanoTime();
        this.print(ans, size, true);
        System.out.println("cost is : " + (end - start) / 1000);
    }


    // 用于测试排序算法,以及合并算法的正确性
    @Test
    public void test() {
        int size = 10;
        int[] numbers = this.genNums(size);
        int[] ans1 = this.sort(numbers, size);
        this.print(ans1, size, true);

        size += 5;
        int[] numbers2 = this.genNums(size);
        int[] ans2 = this.sort(numbers2, size);
        this.print(ans2, size, true);

        int[] ans = this.merge(ans1, ans2);
        this.print(ans, 25, true);
    }
}

3. 说明

针对上面的实例,我们重点需要关注的对象集中在 calculate方法中

执行流程:

  • 对数组进行分割,按照50个一组(最后一组可能不满足50个,所以需要额外注意一下)
  • 将子数组的排序,作为一个task扔到线程池中执行,因为要保留其返回结果,因此采用Callable 结合 CompletionService 来做,将每个task返回的结果封装到Future中,并塞入completionQueue队列中
  • 从完成队列中获取结果,合并排序
  • 完毕后返回最终的结果
// 提交task
completionService.submit(new Callable<int[]>() {
    @Override
    public int[] call() throws Exception {
        return sort(subNumbers, finalLen);
    }
});

// 获取结果
for (...) {
    // take 表示从阻塞队列中获取并移除Future  === get之后remove掉
    Futrue<int[]> futrue = completionService.take();
    int[] ans = futrue.get(); 
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 利用并发来实现排序
    • 1. 说明
      • 基本介绍
    • 2. 实例
      • 3. 说明
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档