前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Concurrent包之ExecutorService(执行器服务)

Concurrent包之ExecutorService(执行器服务)

作者头像
姜同学
发布2022-10-27 16:34:33
2630
发布2022-10-27 16:34:33
举报
文章被收录于专栏:姜同学姜同学

概述

线程池的意义:为了减少服务器端大量线程的创建和销毁,做到线程的复用。

  1. 线程池创建之后,线程池为空,没有任何线程
  2. 当有请求的时候,会在线程池中创建一个线程(核心线程)去处理这个请求
  3. 核心线程使用完毕之后不会被销毁,而是继续等待下一个请求
  4. 核心线程没有达到上限是,新来的请求会继续创建线程池的核心线程并使用
  5. 当核心线程被全部占用,新来的请求会被放入工作队列(阻塞队列)中
  6. 当阻塞队列也满了的时候新的请求会开启线程池的临时线程
  7. 临时线程被归还之后并不会立即销毁,而是存活指定的时间
  8. 临时线程即使处于空闲状态,也不会去处理工作队列里的请求,工作队列的里请求只能被核心线程处理
  9. 线程池中线程全部被占用之后,新来的请求会被拒绝执行处理器拒绝。

自定义线程池

代码语言:javascript
复制
package com.jmy.thredpool;

import java.util.concurrent.*;

public class MyExecutorService {
    public static void main(String[] args) {
        // 创建执行器服务
        /*
                              int corePoolSize,  核心线程数
                              int maximumPoolSize,  最大线程数 = 核心线程 + 临时线程
                              long keepAliveTime,    临时线程存活时间
                              TimeUnit unit,          时间单位
                              BlockingQueue<Runnable> workQueue  工作队列类型
         */
        ExecutorService es = new ThreadPoolExecutor(
                ,
                ,
                ,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // 自定义拒绝逻辑
                        System.out.println("请求被拒绝");
                    }
                }
        );

        /*
        7个请求被核心线程执行 8个请求放入工作队列 8个请求被临时线程执行 最后2个被拒绝
         */
        for (int i = ; i < ; i++) {
            es.execute(new MyThread());
        }

    }
}

class MyThread implements Runnable {

    @Override
    public void run() {
        System.out.println("Start...");

        try {
            Thread.sleep();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("end...");
    }
}

JDK提供的线程池

代码语言:javascript
复制
package com.jmy.thredpool;

import java.util.concurrent.*;

/*
jdk提供的两个线程池
 */
public class ExecutorServiceByJdk {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
        }
        源码分析:
            核心线程数为0
            临时线程数为2的31次方减1 可认为无限
            临时线程存活时间60秒
            工作队列为同步队列
        ------------------------------
        大池子小队列:适用于短连接,高并发的短任务场景
         */
        ExecutorService ex = Executors.newCachedThreadPool();

        /*
        public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
        }
        源码分析:
            核心线程数自定义
            没有临时线程
            工作队列为阻塞式链式队列可认为容量无限
        ------------------------------------------
        小池子大队列:适用于长连接任务,例如百度网盘
         */
        ExecutorService executorService = Executors.newFixedThreadPool();

        // Callable 只能使用线程池执行
        Future<String> future = ex.submit(new CThread());
        System.out.println(future.get());
    }
}

// 实现Callable接口实现run方法可以得到返回值,泛型指定返回值类型
class CThread implements Callable<String>{

    @Override
    public String call() throws Exception {
        return "执行成功";
    }
}

定时执行器服务

代码语言:javascript
复制
package com.jmy.thredpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/*
定时调度执行器服务
 */
public class ScheduleExecutorServiceDemo01 {
    public static void main(String[] args) {
        // 创建定时调度执行器服务 核心线程数为5
        ScheduledExecutorService ses = Executors.newScheduledThreadPool();

        // 延时任务 5秒后执行一次ScThread线程 且只执行一次
        ses.schedule(new ScThread(), , TimeUnit.SECONDS);
        /*
                public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, Runable 线程
                                                  long initialDelay, 延时shijian
                                                  long period,  间隔时间
                                                  TimeUnit unit);
                从上一个线程开始时计时 每隔5秒执行一次
         */
        ses.scheduleAtFixedRate(new ScThread(),,,TimeUnit.SECONDS);
        
        /*
         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
                从上一个线程结束开始计时,每5秒执行一次
         */
        ses.scheduleWithFixedDelay(new ScThread(),,,TimeUnit.SECONDS);
    }
}

class ScThread implements Runnable {

    @Override
    public void run() {
        System.out.println("start...");
    }
}

分叉合并

代码语言:javascript
复制
package com.jmy.thredpool;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/*
  分叉合并
 */
public class ForkJoinPoolDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        System.out.println(pool.submit(new Sum(, 5555555555L)).get());
    }
}

class Sum extends RecursiveTask<Long>{

    private long start;
    private long end;

    public Sum(long start, long end) {
        this.start = start;
        this.end = end;
    }

    // 分叉合并逻辑定义
    @Override
    protected Long compute() {
        if (end - start <= ) {
            long sum = ;
            for (long i = start; i < end; i++) {
                sum += i;
            }

            return sum;
        } else {
            long mid = (start - end)/;
            Sum left = new Sum(start,mid);
            Sum right = new Sum(mid+,end);
            // 分叉过程
            left.fork();
            right.fork();
            // 合并
            return left.join() + right.join();
        }

    }
}
代码语言:javascript
复制
1. Fork分叉:将一个大的任务拆分成多个小的任务
2. Join合并:将拆分的小的任务的结果进行汇总
3.数据量越大,分叉合并相对循环的效率就越高。如果数据量比较少,循环的效率
反而高于分叉合并
4. 分叉合并将任务拆分之后能够有效的提高CPU的利用率
5.分叉合并考虑到慢任务带来的问题,采取了"work-stealing"(工作窃取)策略。
即当一个核上的所有的任务执行完成之后,这个核并不会空闲下来,而是会随机
扫描一个核,然后从这个核的任务队列尾端"偷"一个任务回来执行
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-06T,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 自定义线程池
  • JDK提供的线程池
  • 定时执行器服务
  • 分叉合并
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档