专栏首页我要变牛使用线程池时一定要注意的五个点

使用线程池时一定要注意的五个点

一、使用线程池在流量突发期间能够平滑地服务降级

很多场景下应用程序必须能够处理一系列传入请求,简单的处理方式是通过一个线程顺序的处理这些请求,如下图:

单线程策略的优势和劣势都非常明显:

优势:设计和实现简单;劣势:这种方式会带来处理效率的问题,单线程的处理能力是有限,不能发挥多核处理器优势。

在这种场景下我们就需要考虑并发,一个简单的并发策略就是Thread-Per-Message模式,即为每个请求使用一个新的线程。

Thread-Per-Message策略的优势和劣势也非常明显:

优势:设计和实现比较简单,能够同时处理多个请求,提升响应效率;

劣势:主要在两个方面

1.资源消耗 引入了在串行执行中所没有的开销,包括线程创建和调度,任务处理,资源分配和回收以及频繁上下文切换所需的时间和资源。2.安全

  • 攻击者可以通过一次进行大量请求使系统瘫痪并且拒绝服务 (DoS),从而导致系统立即不响应而不是平滑地退出。
  • 从安全角度来看,一个组件可能由于连续的错误而耗尽所有资源,因此使所有其他组件无法获得资源。

有没有一种方式可以并发执行又可以克服Thread-Per-Message的问题?

采用线程池的策略,线程池通过控制并发执行的工作线程的最大数量来解决Thread-Per-Message带来的问题。可见下图,请求来临时先放入线程池的队列

线程池可以接受一个RunnableCallable<T>任务,并将其存储在临时队列中,当有空闲线程时可以从队列中拿到一个任务并执行。

反例(使用 Thread-Per-Message 策略)

class Helper {
    public void handle(Socket socket) {
        // do something
    }
}

final class RequestHandler {
    private final Helper helper = new Helper();

    //......
    private RequestHandler(int port) throws IOException {
        //do something
    }

    public void handleRequest() {
        new Thread(new Runnable() {
            public void run() {
                try {
                    helper.handle(server.accept());
                } catch (IOException e) {
                    // Forward to handler
                }
            }
        }).start();
    }
}

正例(使用 线程池 策略)

class Helper {
    public void handle(Socket socket) {
        // do something
    }
}

final class RequestHandler {
    private final Helper helper = new Helper();
    private final ServerSocket server;
    private final ExecutorService exec;

    private RequestHandler(int port, int poolSize) throws IOException {
        server = new ServerSocket(port);
        exec = Executors.newFixedThreadPool(poolSize);
    }

    public static RequestHandler newInstance(int poolSize) throws IOException {
        return new RequestHandler(0, poolSize);
    }

    public void handleRequest() {
        Future<?> future = exec.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    helper.handle(server.accept());
                } catch (IOException e) {
                    // Forward to handler
                }
            }
        });
    }
    // ... Other methods such as shutting down the thread pool
    // and task cancellation ...
}

JAVA 中(JDK 1.5+)线程池的种类:

  • newFixedThreadPool()
  • newCachedThreadPool()
  • newSingleThreadExecutor()
  • newScheduledThreadPool() 线程池的详细使用方法可参见Java API文档

二、不要在有界线程池中执行相互依赖的任务

程序不能使用来自有界线程池的线程来执行依赖于线程池中其他任务的任务。

有两个场景:

  1. 当线程池中正在执行的线程阻塞在依赖于线程池中其他任务的完成上,这样就会出现称为线程饥饿(threadstarvation)死锁的死锁形式。
  2. 线程饥饿死锁还会发生在当前执行的任务向线程池提交其他任务并等待这些任务完成的时候,然而此时线程池缺乏一次容纳所有任务的能力。

要缓解上面两个场景产生的问题有两个简单的办法:

  1. 扩大线程池中的线程数,以容纳更多的任务,但 决定一个线程池合适的大小可能是困难的甚至不可能的。
  2. 线程池中的队列改为无界,由于系统资源有限,无界队列只能说是尽可能容纳任务 但饥饿死锁的现象无法消除。

真正解决此类方法还是需要梳理线程池执行业务流程,不要在有界线程池中执行相互依赖的任务,防止出现竞争和死锁。

三、确保提交到线程池的任务可中断

向线程池提交的任务需要支持中断。从而保证线程可以中断,线程池可以关闭。线程池支持 java.util.concurrent.ExecutorService.shutdownNow() 方法,该方法尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务的列表。

但是 shutdownNow() 除了尽力尝试停止处理主动执行的任务之外不能保证一定能够停止。例如,典型的实现是通过Thread.interrupt()来停止,因此任何未能响应中断的任务可能永远不会终止,也就造成线程池无法真正的关闭。

反例:

public final class Worker implements Runnable { // Thread‐safe class
    private AtomBoolean flag = new AtomBoolean(true);

    public Worker() throws IOException {
        //do something
    }

    // Only one thread can use the socket at a particular time
    @Override
    public void run() {
        try {
            while (flag.get()) {
                // do something
            }
        } catch (IOException ie) {
            // Forward to handler
        }
    }

    public void shutdown() {
        this.flag.set(false);
    }
}

正例:

public final class Worker implements Runnable { // Thread‐safe class
    public Worker() throws IOException {
        //do something
    }

    // Only one thread can use the socket at a particular time
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // do something
            }
        } catch (IOException ie) {
            // Forward to handler
        }
    }
}

四、确保在线程池中执行的任务不能悄无声息地失败

线程池中的所有任务必须提供机制,如果它们异常终止,则需要通知应用程序.

如果不这样做不会导致资源泄漏,但由于池中的线程仍然被会重复使用,使故障诊断非常困难或不可能。

在应用程序级别处理异常的最好方法是使用异常处理。异常处理可以执行诊断操作,清理和关闭Java虚拟机,或者只是记录故障的详细信息。

也就是说在线程池里执行的任务也需要能够抛出异常并被捕获处理。

任务恢复或清除操作可以通过重写 java.util.concurrent.ThreadPoolExecutor 类的 afterExecute() 钩子来执行。

当任务通过执行其 run() 方法中的所有语句并且成功结束任务,或者由于异常而导致任务停止时,将调用此钩子。

可以通过自定义 ThreadPoolExecutor 服务来重载 afterExecute() 钩子。

还可以通过重载 terminated() 方法来释放线程池获取的资源,就像一个finally块。

反例:

final class PoolService {
    private final ExecutorService pool = Executors.newFixedThreadPool(10);

    public void doSomething() {
        pool.execute(new Task());
    }
}

final class Task implements Runnable {
    @Override
    public void run() {
        // do something
        throw new NullPointerException();
    }
}

任务意外终止时作为一个运行时异常,无法通知应用程序。此外,它缺乏恢复机制。因此,如果Task抛出一个NullPointerException ,异常将被忽略。

正例:

class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    // ... Constructor ...
    public CustomThreadPoolExecutor(
        int corePoolSize, int maximumPoolSize, long keepAliveTime,
                TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
                }
    @Override
    public void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null) {
            // Exception occurred, forward to handler
        }
            // ... Perform task‐specific cleanup actions
        }
    @Override
    public void terminated() {
        super.terminated();
        // ... Perform final clean‐up actions
    }
}

另外一种方式是使用 ExecutorService.submit() 方法(代替 execute() 方法)将任务提交到线程池并获取 Future 对象。

当通过 ExecutorService.submit() 提交任务时,抛出的异常并未到达未捕获的异常处理机制,因为抛出的异常被认为是返回状态的一部分,因此被包装在ExecutionException ,并由Future.get() 返回。

Future<?> future = threadPool.submit(new Task());
try {
    future.get();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();  // Reset interrupted status
} catch (ExecutionException e) {
    Throwable exception = e.getCause();
    // Forward to exception reporter
}

五、确保在使用线程池时重新初始化ThreadLocal变量

java.lang.ThreadLocal 类提供线程内的本地变量。根据Java API

这些变量与其它正常变量不同,每个线程访问(通过其get或set方法)都有其属于各自线程的,独立初始化的变量拷贝。ThreadLocal实例通常是一些希望将状态与线程(例如,用户ID或事务ID)相关联的类中的私有静态字段。

ThreadLocal对象需要关注那些对象被线程池中的多个线程执行的类。

线程池缓存技术允许线程重用以减少线程创建开销,或者当创建无限数量的线程时可以降低系统的可靠性。

当 ThreadLocal 对象在一个线程中被修改,随后变得可重用时,在重用的线程上执行的下一个任务将能看到该线程上执行过的上一个任务修改的ThreadLocal 对象的状态。

所以要在使用线程池时重新初始化的ThreadLocal对象实例。

反例:

public enum Day {
    MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY;
}

public final class Diary {
    private static final ThreadLocal<Day> days = new ThreadLocal<Day>() {
        // Initialize to Monday
        protected Day initialValue() {
            return Day.MONDAY;
        }
    };
    private static Day currentDay() {
        return days.get();
    }
    public static void setDay(Day newDay) {
        days.set(newDay);
    }
    // Performs some thread‐specific task
    public void threadSpecificTask() {
        // Do task ...
    }
}

public final class DiaryPool {
    final int numOfThreads = 2; // Maximum number of threads allowed in pool
    final Executor exec;
    final Diary diary;

    DiaryPool() {
        exec = (Executor) Executors.newFixedThreadPool(numOfThreads);
        diary = new Diary();
    }

    public void doSomething1() {
        exec.execute(new Runnable() {
            @Override
            public void run() {
                diary.setDay(Day.FRIDAY);
                diary.threadSpecificTask();
            }
        });
    }

    public void doSomething2() {
        exec.execute(new Runnable() {
            @Override
            public void run() {
                diary.threadSpecificTask();
            }
        });
    }

    public static void main(String[] args) {
        DiaryPool dp = new DiaryPool();
        dp.doSomething1(); // Thread 1, requires current day as Friday
        dp.doSomething2(); // Thread 2, requires current day as Monday
        dp.doSomething2(); // Thread 3, requires current day as Monday
    }
}

DiaryPool类创建了一个线程池,它可以通过一个共享的无界的队列来重用固定数量的线程。

在任何时候,不超过numOfThreads个线程正在处理任务。如果在所有线程都处于活动状态时提交其他任务,则 它们在队列中等待,直到线程可用。

当线程循环时,线程的线程局部状态仍然存在。

下表显示了可能的执行顺序:

时间

任务

线程池

提交方法

日期

1

t1

1

doSomething1()

星期五

2

t2

2

doSomething2()

星期一

3

t3

1

doSomething3()

星期五

在这个执行顺序中,期望从doSomething2() 开始的两个任务( t 2和t 3 doSomething2() 将当天视为星 期一。然而,因为池线程1被重用,所以t 3观察到星期五。

解决方案(try-finally条款)

符合规则的方案removeDay() 方法添加到Diary类,并在try‐finally 块中的实现doSomething1() 类的doSomething1() 方法的语句。finally 块通过删除当前线程中的值来恢复threadlocal类型的days对象的初始状态。

public final class Diary {
    // ...
    public static void removeDay() {
        days.remove();
    }
}

public final class DiaryPool {
    // ...
    public void doSomething1() {
        exec.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Diary.setDay(Day.FRIDAY);
                    diary.threadSpecificTask();
                } finally {
                    Diary.removeDay(); // Diary.setDay(Day.MONDAY)
                    // can also be used
                }
            }
        });
    }
// ...
}

如果threadlocal变量再次被同一个线程读取,它将使用initialValue()方法重新初始化 ,除非任务已经明确设置了变量的值。这个解决方案将维护的责任转移到客户端( DiaryPool ),但是当Diary类不能被修改时是一个好的选择。

解决方案(beforeExecute())

使用一个自定义ThreadPoolExecutor 来扩展 ThreadPoolExecutor 并覆盖beforeExecute() 方法。beforeExecute() 方法在 Runnable 任务在指定线程中执行之前被调用。该方法在线程 “t” 执行任务 “r” 之前重新初始化 threadlocal 变量。

class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                    long keepAliveTime, TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void beforeExecute(Thread t, Runnable r) {
        if (t == null || r == null) {
            throw new NullPointerException();
        }
        Diary.setDay(Day.MONDAY);
        super.beforeExecute(t, r);
    }
}

public final class DiaryPool {
    // ...
    DiaryPool() {
        exec = new CustomThreadPoolExecutor(NumOfthreads, NumOfthreads,
                10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
        diary = new Diary();
    }
    // ...
}

本文分享自微信公众号 - 你呀不牛(notNiu),作者:不牛

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-05-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 线程池使用FutureTask时候需要注意的一点事

    线程池使用FutureTask的时候如果拒绝策略设置为了 DiscardPolicy和 DiscardOldestPolicy并且在被拒绝的任务的F...

    加多
  • 使用线程池你需要注意这几点

    在程序开发中,我们会用各种池化技术来缓存创建昂贵的对象,比如线程池、连接池、内存池等。一般是预先创建一些对象放入池中,使用的时候直接取出使用,用完归还以便复用。...

    用户2781897
  • 多线程场景下使用 ArrayList,这几点一定要注意!

    ArrayList 不是线程安全的,这点很多人都知道,但是线程不安全的原因及表现,怎么在多线程情况下使用ArrayList,可能不是很清楚,这里总结一下。

    JAVA葵花宝典
  • 多线程场景下使用 ArrayList,这几点一定要注意!

    ArrayList 不是线程安全的,这点很多人都知道,但是线程不安全的原因及表现,怎么在多线程情况下使用ArrayList,可能不是很清楚,这里总结一下。

    PHP开发工程师
  • 【答疑解惑】头文件使用过程中需要注意的几个点

    头文件c/c++程序十分常见,java 中换成了import。我们经常用到它,正因为如此,一些细节东西容易被忽略。今天我们就讲讲: 头文件的作用有三: 1、...

    程序员互动联盟
  • 使用ES6的fetch API读取数据时要注意的一个和cookie相关的坑

    When I am doing a test of comparison between Stateful and Stateless BSP applicat...

    Jerry Wang
  • 没有分析过线程池源码 ,谁给你勇气去面试

    作者:若丨寒 https://www.jianshu.com/p/bc978c220da6

    程序员小强
  • Android开发笔记(七十六)线程池管理

    在前面的《Android开发笔记(四十八)Thread类实现多线程》,我们介绍了线程类Thread的使用,可是缺乏线程的统一管理,这会产生如下问题: 1、...

    用户4464237
  • 金秋十月,读阿里JAVA开发手册有感而发

    最近重温阿里巴巴Java开发手册这本书,思考了什么样的代码是好代码,给大家分享一下我的想法,有哪里不对,欢迎指出,感激不尽。

    捡田螺的小男孩
  • 最近学习了 HTTP 连接池

    6.1大促值班发现的一个问题,一个rpc接口在0~2点用户下单高峰的时候表现rt高(超过1s,实际上针对性优化过的接口rt超过这个值也是有问题的,通常rpc接口...

    芋道源码
  • Quartz Spring与Spring Task总结

    Spring对Quartz作了一个封装,同时,Spring自己也提供了一个任务定时器(spring-task),现把它总结一下。 对于Quartz,我们使...

    bear_fish
  • Java线程池深度揭秘

    Executor 是一个接口(主要用于定义规范),定义了 execute 方法,用于接收 Runnable 对象。

    一猿小讲
  • 2行代码搞定一个定时器!

    spring中 @Scheduled & @EnableScheduling 这2个注解,可以用来快速开发定时器,使用特别的简单。

    路人甲Java
  • 12.ThreadPoolExecutor线程池原理及其execute方法

    jdk1.7.0_79   对于线程池大部分人可能会用,也知道为什么用。无非就是任务需要异步执行,再者就是线程需要统一管理起来。对于从线程池中获取线程,大部分...

    用户1148394
  • 别问了,我真的不喜欢这个注解!

    我之前写过一些关于线程池的文章,然后有同学去翻了一圈,发现我没有写过一篇关于 @Async 注解的文章,于是他来问我:

    why技术
  • 任务调度框架Quartz原理简介

    Quartz是OpenSymphony开源组织的一个Java开源项目, 在2009被Terracotta收购。Quartz官网

    SmileNicky
  • 菜鸟的进阶之路:了解使用多线程

    关于多进程和多线程,教科书上最经典的一句话是“进程是资源分配的最小单位,线程是CPU调度的最小单位”,一般来说进程是独立的而同一进程中的线程是共享的,但是开一个...

    码农小胖哥
  • RabbitMQ交换器Exchange介绍与实践

    有了Rabbit的基础知识之后(基础知识详见:深入解读RabbitMQ工作原理及简单使用),本章我们重点学习一下Rabbit里面的exchange(交换器)的知...

    Java中文社群-磊哥
  • Python 并发编程:PoolExec

    使用多线程(threading)和多进程(multiprocessing)完成常规的并发需求,在启动的时候 start、join 等步骤不能省,复杂的需要还要用...

    py3study

扫码关注云+社区

领取腾讯云代金券