java线程池模型

一, 线程池与普通线程

1 普通线程

Java实现多线程,常见的有以下三种方式:

  1. 1 继承Thread,重写该类的run()方法
  2. 2 实现Runnable

实现Runnable接口,并重写该接口的run()方法,该run()方法同样是线程执行体,创建Runnable实现类的实例,并以此实例作为Thread类的target来创建Thread对象,该Thread对象才是真正的线程对象。

  1. 3 使用Callable和Future接口创建线程。

具体是创建Callable接口的实现类,并实现clall()方法。并使用FutureTask类来包装Callable实现类的对象,且以此FutureTask对象作为Thread对象的target来创建线程。

2 线程池

Java线程创建时很简单的,但是假如并发的线程数量多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低线程,因为创建线程和销毁线程需要时间

所以,我们就希望有一种方法复用线程,实际上java提供的线程池模型,就是为了解决这个问题。

合理利用线程池能够带来三个好处:

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

二, 线程池的种类

Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。下面这张图完整描述了线程池的类体系结构。

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

然后ThreadPoolExecutor继承了类AbstractExecutorService。

常用的线程池实现类讲解:

1、newFixedThreadPool创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。

2、newCachedThreadPool创建一个可缓存的线程池。这种类型的线程池特点是:

1).工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。

2).如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。

3、newSingleThreadExecutor创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另一个取代它,保证顺序执行(我觉得这点是它的特色)。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的 。

4、newScheduleThreadPool创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer。(这种线程池原理暂还没完全了解透彻)

三, 测试线程池

1. newFixedThreadPool

/**
 * newFixedThreadPool
 */
public static void fixedThreadPoolTest() {
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); //这个函数返回的是 ThreadPoolExecutor
    for(int i=0;i<10;i++) {
        final int index = i;
        fixedThreadPool.execute(new Runnable() {
            public void run() {
                System.out.println("current Thread name is " + Thread.currentThread().getName() + " index : " + index);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    System.out.println("10个任务已分发完毕"); //这句大概率性优先于run 里面的print 语句先执行
}

2. newCachedThreadPool

 /**
     * newCachedThreadPool
     * @throws InterruptedException
     * @
     */
 public static void newCachedThreadPoolTest() throwsInterruptedException {
        ExecutorService cachedThreadPool= Executors.newCachedThreadPool();
        for(int i=0;i<10;i++) {
 final int index= i;
//         try {
//              Thread.sleep(index *100);
//          } catch (InterruptedExceptione) {
//              e.printStackTrace();
//          }
 cachedThreadPool.execute(new Runnable(){
 public void run() {
                    System.out.println("currentThread name is " +Thread.currentThread().getName()
                            + " index: " + index);
 }
            });
 }
        Thread.sleep(200);
 //cachedThreadPool.shutdown();//.awaitTermination(10,TimeUnit.SECONDS);  //这个函数功能存疑
 System.out.println("cachedThreadPool isShutdown() is : "+ cachedThreadPool.isShutdown()); //上行不注释,则结果为false,上行不注释,则结果为true
 System.out.println("cachedThreadPool isTerminated() is : "+cachedThreadPool.isTerminated());//上上行注释不注释,结果均为false

 cachedThreadPool.execute(new Runnable(){  // cachedThreadPool shutdown 之后,这个execute() 不能之前且会报异常
 public void run() {
                System.out.println("cachedThreadPoolis not shutdown and this task can be executing. "
 + "\n current Thread name is " + Thread.currentThread().getName());
 }
        });
 }

3. newSingleThreadExecutor

/**
 * 单线程线程池,保证所有任务都按照指定顺序(FIFO, LIFO, 优先级)执行
 * @param args
 */
public static void singleThreadExecutorTest() {
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    for(int i = 0;i<10;i++) {
        final int index = i;
        singleThreadExecutor.execute(new Runnable() {
            public void run() {
                try {
                    System.out.println("current Thread name is " + Thread.currentThread().getName() + " index : " + index);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

4. newScheduleThreadPool

/**
     * 产生一个ScheduledExecutorService对象,
     * newScheduledThreadPool(3),这个对象的线程池大小为3,若任务数量大于3,
     * 任务会在一个queue里等待执行
     * scheduleWithFixedDelay 函数:第一个参数new Runnable 就是任务
     * 所谓线程池,就是能接受任务。线程池的好处是帮你调度线程,不然还得自己写调度多个线程的方法,比如周期性执行任务
     */
    public static void scheduledThreadPoolTest() {
        //ScheduledExecutorService 只是个接口,ScheduledThreadPoolExecutor 才是实现此接口的类.ScheduledThreadPoolExecutor 还继承了ThreadPoolExecutor 线程池类
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3); //这个函数返回的是ScheduledThreadPoolExecutor

//          scheduledThreadPool.schedule(new Runnable() { //schedule 是只执行一次,不是周期性的
//
//              @Override
//              public void run() {
//                  System.out.println("delay 3 seconds index is " + index);
//              }
//          }, 3, TimeUnit.SECONDS);
        System.out.println("---------------------");
        scheduledThreadPool.scheduleWithFixedDelay(new Runnable() { //scheduleWithFixedDelay 是周期性的,
        // scheduleWithFixedDelay()方法不是ThreadPool 类的是ScheduledExecutorService 接口特有的
            public void run() {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("current Thread name is " + Thread.currentThread().getName() + " time is : " + new Date().getTime());
            }
        }, 1, 1, TimeUnit.SECONDS);
    }

5. newSingleThreadScheduledExecutor

public static void SingleThreadScheduledExecutorTest() {
    ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
    System.out.println("---------------------");
    scheduledThreadPool.scheduleWithFixedDelay(new Runnable() { //scheduleWithFixedDelay 是周期性的
        public void run() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("current Thread name is " + Thread.currentThread().getName() + " time is : " + new Date().getTime());
        }
    }, 1, 1, TimeUnit.SECONDS); //第二个参数1,虽然是1秒为周期,但是单线程线程池,如果上一个任务没执行完,那么会等2秒或者更多,
}  

代码参考:http://blog.csdn.net/nx188/article/details/51416237

四, 总结

没看过Spark 源码的时候,大家估计都会想,Spark使用scala开发,那么他的并发实现是不是基于scala呢?实际上,看过源码,之后你会发现,spark使用的还是java的线程模型和线程池模型。线程和线程池模型,在hbase,kafka等里使用也很广泛,为了更好的掌握,Spark,kafka,hbase这三个非常重要的大数据框架的原理源码,熟练掌握java的线程池模型,也是比不可少的。

推荐阅读:

1,SparkSql的Catalyst之图解简易版

2,Hbase源码系列之regionserver应答数据请求服务设计

3,Kafka源码系列之Broker的IO服务及业务处理

4,JAVA的网络IO模型彻底讲解

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2017-11-01

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏小灰灰

Java并发学习之四种线程创建方式的实现与对比

线程创建的几种方式 在并发编程中,最基本的就是创建线程了,那么一般的创建姿势是怎样的,又都有些什么区别 一般来讲线程创建有四种方式: 继承Thread 实现R...

44880
来自专栏IT技术精选文摘

Java并发包类总览

并发容器 这些容器的关键方法大部分都实现了线程安全的功能,却不使用同步关键字(synchronized)。值得注意的是Queue接口本身定义的几个常用方法的区...

240100
来自专栏Danny的专栏

探秘BOF 和EOF

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

12930
来自专栏xingoo, 一个梦想做发明家的程序员

AngularJS 中的Promise --- $q服务详解

先说说什么是Promise,什么是$q吧。Promise是一种异步处理模式,有很多的实现方式,比如著名的Kris Kwal's Q还有JQuery的Deffe...

19990
来自专栏微信公众号:Java团长

Java多线程之Callable和Future

本篇说明的是Callable和Future,它俩很有意思的,一个产生结果,一个拿到结果。

5910
来自专栏haifeiWu与他朋友们的专栏

FutureTask源码分析

FutureTask:一个可取消的异步任务执行类,这个类提供了Future接口的基本实现,主要有以下功能:

18230
来自专栏高爽的专栏

Java线程(七):Callable和Future

       接着上一篇继续并发包的学习,本篇说明的是Callable和Future,它俩很有意思的,一个产生结果,一个拿到结果。        Calla...

20400
来自专栏吴伟祥

四种任务调度的 Java 实现 转

 java.util.Timer 了,它是最简单的一种实现任务调度的方法,下面给出一个具体的例子:

16810
来自专栏Janti

JAVA多线程高并发学习笔记(三)——Callable、Future和FutureTask

为什么要是用Callable和Future Runnable的局限性 Executor采用Runnable作为基本的表达形式,虽然Runnable的run方法能...

35650
来自专栏Java进阶之路

使用CompletionService非阻塞获取多线程返回值

Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实现,可以来进行异...

14620

扫码关注云+社区

领取腾讯云代金券