前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >站在架构的角度思考线程池的设计和原理

站在架构的角度思考线程池的设计和原理

作者头像
架构探险之道
发布2023-03-04 11:07:21
4440
发布2023-03-04 11:07:21
举报
文章被收录于专栏:架构探险之道架构探险之道

站在架构的角度思考线程池的设计和原理

  • 简介
  • 线程池
    • 线程池设计猜想
  • 源码分析
    • execute
    • addWorker
    • Worker.start()
    • runWorker
    • getTask
  • 线程池配置
    • 线程数量设置
    • 线程池初始化设置
    • 线程池队列设置
    • 线程池管理
  • 阻塞队列 BlockingQueue
  • BlockingQueue的核心方法
  • LinkedBlockingQueue
    • 队列的优点

简介

线程使用上的问题

代码语言:javascript
复制
new Thread().start();
  • 线程的频繁创建和销毁
  • 线程的数量过多,会造成 CPU 资源的开销
    • 线程上下文频繁切换,消耗 CPU 资源

那么如何实现线程的复用呢?

池化技术常见场景及核心思想

  • 连接池、对象池、内存池、线程池
  • 池化技术的核心:(资源)复用,降低创建和销毁动作带来的额外性能损耗

线程池

核心能力

  • 提前创建一系列的线程,保存在这个线程池中。
  • 有任务要执行的时候,从线程池中取出线程来执行
  • 没有任务的时候,线程放回线程池

线程池设计猜想

目标:实现线程的重复使用.

分解:

  • 如何线程的复用?
    • 那如何让线程执行新的任务呢?也就是说,任务怎么给他执行?[共享内存]-> List.add()
    • 线程一直处于运行状态,合理吗?
    • 有任务来的时候,执行
    • 没有任务的时候,阻塞
    • 让线程实现复用的唯一的方法,就是让线程不结束。
  • 结论:通过阻塞队列的方式,来实现线程池中线程的复用。

线程池的实现原理的过程推演

源码分析

线程池中的核心线程是延迟初始化的。

  • 先初始化核心线程。
  • 调用阻塞队列的方法,把task存进去。(offer() -> true/false)
    • 如果true ,说明当前的请求量不大, 核心线程就可以搞定。
    • false,增加工作线程(非核心线程)
  • 如果添加失败,说明当前的工作线程数量达到了最大的线程数,直接调用拒绝策略。

execute

执行异步任务

代码语言:javascript
复制
 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
     // 判断当前工作线程数是否小于核心线程数(延迟初始化)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))  //添加工作线程的同时,执行command
                return;
            c = ctl.get();
        }
     // workQueue.offer 添加到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
     // 如果阻塞队列满了,则添加工作线程(扩容的线程)
        else if (!addWorker(command, false))
            reject(command);  //拒绝策略
    }

addWorker

添加工作线程

代码语言:javascript
复制
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
      // 自旋,通过原子操作 cas 来增加活跃线程数量
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS 操作更新活跃线程数,通过原子操作来增加线程数量
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        
     // 初始化工作线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 构建一个工作线程,此时还没有启动.
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); //添加到一个容器中。
                        int s = workers.size();
                        if (s > largestPoolSize) 
                            largestPoolSize = s; // 重新更新largestPoolSize
                        workerAdded = true; //添加成功
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); //启动线程
                    workerStarted = true;
                }
            }
        } finally { //失败, 回滚。
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker.start()

Worker 本身也是个线程,内部包裹了个线程 java.util.concurrent.ThreadPoolExecutor.Worker

代码语言:javascript
复制
 private final class Worker
        extends AbstractQueuedSynchronizer //Worker继承了AQS -> 实现了互斥锁
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

runWorker

执行 Worker 内线程,通过外部类方法,自身作为入参,执行 Worker java.util.concurrent.ThreadPoolExecutor#runWorker

代码语言:javascript
复制
public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //while 循环保证当前线程不结束,直到task为null
            while (task != null || (task = getTask()) != null) {
                //表示当前线程正在运行一个任务,如果其他地方要 shutdown().你必须要等我执行完成。
                w.lock(); //Worker继承了AQS -> 实现了互斥锁
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt(); //是否应该触发中断
                try {
                    beforeExecute(wt, task); //空的实现
                    Throwable thrown = null;
                    try {
                        task.run(); //执行task.run
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); ; //空的实现
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask

队列中取出待执行任务

代码语言:javascript
复制
 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {  //cas 自旋
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果线程池已经结束状态 或是任务队列为空,直接返回null. 需要清理掉所有的工作线程
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
            // 是否允许超时(allowCoreThreadTimeOut=true) 或 如果当前的工作线程数量大于核心线程数
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c)) //cas 减少工作线程数量
                    return null; //表示要销毁当前工作线程.
                continue;
            }
            
            //获取任务的过程
            try {
                Runnable r = timed ?
                    // 如果允许超时或是工作线程大于核心线程数,则尝试取出元素,否则等待线程存活时间(keepAliveTime)后取出
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();  //工作线程小于核心线程数 且 队列为空,直接取出,如果阻塞队列没有任务,当前工作线程会阻塞在这里
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) { //中断异常
                timedOut = false;
            }
        }
    }

线程池配置

线程数量设置

任务类型

  • 读写任务多,IO 密集型 CPU :2 * N+1
  • 内存计算任务多,CPU密集型: N +1

利用率和线程数设置

  • CPU 利用率不高:设置多个核心线程影响不大
  • CPU利用率很高,不宜设置过多线程树,会增加上下文切换。

线程池初始化设置

  • 按需构造

默认情况下,即使核心线程最初只是在新任务需要时才创建和启动的,也可以使用方法 prestartCoreThread()或 prestartAllCoreThreads() 对其进行动态重写。

  • 创建新线程

使用 ThreadFactory 创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。

  • 保持活动时间

如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止(参见getKeepAliveTime(java.util.concurrent.TimeUnit))。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动,则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。

线程池队列设置

  • 排队
    • 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
    • 在 ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
    • 在 ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。
    • 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
    • 定义和使用其他种类的 RejectedExecutionHandler 类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时。
    • 直接提交: 工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集合时出现锁定。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
    • 无界队列: 使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙的情况下将新任务加入队列。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
    • 有界队列: 当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
    • 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
    • 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
    • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
    • 所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
    • 排队有三种通用策略
    • 被拒绝的任务当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下,execute 方法都将调用其RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略:
  • 挂钩方法
    • 此类提供 protected 可重写的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,这两种方法分别在执行每个任务之前和之后调用。它们可用于操纵执行环境;例如,重新初始化ThreadLocal、搜集统计信息或添加日志条目。此外,还可以重写方法 terminated() 来执行 Executor 完全终止后需要完成的所有特殊处理。
    • 如果挂钩或回调方法抛出异常,则内部辅助线程将依次失败并突然终止。
  • 队列维护
    • 方法 getQueue() 允许出于监控和调试目的而访问工作队列。强烈反对出于其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 这两种方法可用于在取消大量已排队任务时帮助进行存储回收。

线程池管理

  • ExecutorService 执行器服务, 它使用可能的几个池线程之一执行每个提交的任务,为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展挂钩。但是,强烈建议程序员使用较为方便的 Executors 工厂方法来配置。
  • 线程池可以 解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行集合任务时使用的线程)的方法。每个ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
  • ThreadPoolExcutor 为一些 Executor 提供了基本的实现,这些 Executor 是由 Executors 中的工厂 newCahceThreadPool、newFixedThreadPool 和 newScheduledThreadExecutor 返回的。ThreadPoolExecutor是一个灵活的健壮的池实现,允许各种各样的用户定制。
  • Executors.newFixedThreadPool (int) * 固定大小的线程池
代码语言:javascript
复制
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • Executors.newCachedThreadPool() 无界线程池,可以进行自动线程回收,*可以缓存的线程池 ->理论上来说,有多少请求,该线程池就可以创建多

少的线程来处理。

代码语言:javascript
复制
   public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • Executors.newSingleThreadExecutor() 单个后台线程
代码语言:javascript
复制
  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • 如何进行线程池管理?
    • ThreadPoolExecutor 允许你提供一个 BlockingQueue 来持有等待执行的任务。任务排队有3种基本方法:无限队列、有限队列和同步移交。
    • newFixedThreadPool 和 newSingleThreadExectuor 默认使用的是一个无限的 LinkedBlockingQueue。如果所有的工作者线程都处于忙碌状态,任务会在队列中等候。如果任务持续快速到达,超过了它们被执行的速度,队列也会无限制地增加。稳妥的策略是使用有限队列,比如 ArrayBlockingQueue 或有限的 LinkedBlockingQueue 以及 PriorityBlockingQueue。
    • 对于庞大或无限的池,可以使用 SynchronousQueue ,完全绕开队列,直接将任务由生产者交给工作者线程。
    • 可以使用 PriorityBlockingQueue 通过优先级安排任务

ThreadPoolExecutor

  • ThreadPoolExecutor 将根据 corePoolSize 和 maximumPoolSize 设置的边界自动调整池大小。
  • 当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
  • 如果运行的线程多于corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。
  • 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
  • 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。
  • 在大多数情况下,核心和最大池大小仅基于构造方法来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。
代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,//核心线程数
                              int maximumPoolSize,//最大线程数
                              long keepAliveTime,//存活时间
                              TimeUnit unit,//存活时间单位
                              BlockingQueue<Runnable> workQueue,//阻塞队列
                              ThreadFactory threadFactory,//线程工厂,用来创建工作线程的。 默认实现(自定义线程池中线程的名字)
                              RejectedExecutionHandler handler) {//拒绝执行策略 。默认实现
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • 线程的创建与销毁
    • 核心池大小、最大池大小和存活时间共同管理着线程的创建与销毁。
    • 核心池的大小是目标的大小;线程池的实现试图维护池的大小;即使没有任务执行,池的大小也等于核心池的大小,并直到工作队列充满前,池都不会创建更多的线程。如果当前池的大小超过了核心池的大小,线程池就会终止它。
    • 最大池的大小是可同时活动的线程数的上限。
    • 如果一个线程已经闲置的时间超过了存活时间,它将成为一个被回收的候选者。
    • newFixedThreadPool 工厂为请求的池设置了核心池的大小和最大池的大小,而且池永远不会超时。
    • newCacheThreadPool 工厂将最大池的大小设置为 Integer.MAX_VALUE,核心池的大小设置为0,超时设置为一分钟。这样创建了无限扩大的线程池,会在需求量减少的情况下减少线程数量

阻塞队列 BlockingQueue

在ThreadPoolTaskExecutor源码中我们看到了BlockingQueuequeue = createQueue(this.queueCapacity);这样一句话用来得到一个队列,这个队列是用来存放任务的。当线程池中有空闲线程时就回去任务队列中拿任务并处理。BlockingQueue是一个阻塞并线程安全的一个队列。

多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干 生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数 据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。

  • SynchronousQueue UML
  • LinkedBlockingQueue UML

BlockingQueue的核心方法

  • 放入数据:
    • offer(anObject): 表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
    • offer(E o, long timeout, TimeUnit unit): 可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
    • put(anObject): 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续
  • 获取数据:
    • poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
    • take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
    • drainTo(...): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

ThreadPoolTaskExecutor 的代码可以发现,其主要是使用 BlockingQueue 的一种实现 LinkedBlockingQueue 进行实现。

代码语言:javascript
复制
/**
 * Create the BlockingQueue to use for the ThreadPoolExecutor.
 * <p>A LinkedBlockingQueue instance will be created for a positive
 * capacity value; a SynchronousQueue else.
 * @param queueCapacity the specified queue capacity
 * @return the BlockingQueue instance
 * @see java.util.concurrent.LinkedBlockingQueue
 * @see java.util.concurrent.SynchronousQueue
 */
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
  if (queueCapacity > 0) {
    return new LinkedBlockingQueue<>(queueCapacity);
  }
  else {
    return new SynchronousQueue<>();
  }
}

LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立 即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从 队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理 并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

  • 生成LinkedBlockingQueue时有一个大小限制,其默认为Integer.MAX_VALUE.
  • 另外LinkedBlockingQueue不接受null值,当添加null的时候,会直接抛出NullPointerException:
代码语言:javascript
复制
//java.util.concurrent.LinkedBlockingQueue

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary for space to become available.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

队列的优点

  1. 解耦

在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2. 冗余

有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。

3. 扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

4. 灵活性 & 峰值处理能力

在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为 以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。

5. 可恢复性

当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。

6. 送达保证

消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个"只送达一次"保证。无论有多少进 程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的 表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。

7. 排序保证

在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。

8. 缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。

9. 理解数据流

在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。

10. 异步通信

很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-07-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构探险之道 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 站在架构的角度思考线程池的设计和原理
    • 简介
      • 线程池
        • 线程池设计猜想
      • 源码分析
        • execute
        • addWorker
        • Worker.start()
        • runWorker
        • getTask
      • 线程池配置
        • 线程数量设置
        • 线程池初始化设置
        • 线程池队列设置
        • 线程池管理
      • 阻塞队列 BlockingQueue
        • BlockingQueue的核心方法
          • LinkedBlockingQueue
            • 队列的优点
        相关产品与服务
        消息队列 CMQ 版
        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档