前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Java并发编程三】多线程案例(手撕单例模式,阻塞队列,定时器,线程池)

【Java并发编程三】多线程案例(手撕单例模式,阻塞队列,定时器,线程池)

作者头像
小皮侠
发布2024-10-17 08:30:08
1140
发布2024-10-17 08:30:08
举报
文章被收录于专栏:Java学习从基础到就业

多线程案例

1.单例模式

单例模式是校招中最常考的设计模式之一。

啥是设计模式?

设计模式好比象棋中的 " 棋谱 "。 红方当头炮 , 黑方马来跳 . 针对红方的一些走法 , 黑方应招的时候有一些固定的套路. 按照套路来走局势就不会吃亏 . 软件开发中也有很多常见的 " 问题场景 ". 针对这些问题场景 , 大佬们总结出了一些固定的套路 . 按照这个套路来实现代码, 能提高程序的整体下限 。

单例模式能保证某个类在程序中只存在唯一一份实例 , 而不会创建出多个实例。

单例模式具体的实现方式 , 有非常多种,本篇文章主要讲述“饿汉模式”和“懒汉模式”两种方法。

饿汉模式

饿汉模式即类加载的同时, 创建实例。

下面是饿汉模式的代码实现:

代码语言:javascript
复制
class Singleton {
    private static Singleton instance = new Singleton();
    private Singleton() {}
    public static Singleton getInstance() {
        return instance;
   }
}
懒汉模式

懒汉模式在类加载的时候不创建实例,第一次使用的时候才创建实例。

懒汉模式的代码实现(单线程版本):

代码语言:javascript
复制
class Singleton {
    private static Singleton instance = null;
    private Singleton() {}
    public static Singleton getInstance() {
        if (instance == null) {
            instance = new Singleton();
       }
        return instance;
   }
}

上述懒汉模式的代码实现是线程不安全的。

线程安全问题发生在首次创建实例时. 如果在多个线程中同时调用 getInstance 方法 , 就可能导致创建出多个实例。一旦实例已经创建好了, 后面在多线程环境调用 getInstance 就不再有线程安全问题了 ( 不再修改instance 了 )

加上 synchronized 可以改善这里的线程安全问题。

代码语言:javascript
复制
class Singleton {
    private static Singleton instance = null;
    private Singleton() {}
    public synchronized static Singleton getInstance() {
        if (instance == null) {
            instance = new Singleton();
       }
        return instance;
   }
}

上述代码还可以进行改进,加锁和解锁是一件开销比较高的事情. 而懒汉模式的线程不安全只是发生在首次创建实例的时候.因此后续使用的时候, 不必再进行加锁了.

同时为了避免 “内存可见性” 导致读取的 instance 出现偏差, 于是在第一行补充上 volatile。

在加锁的基础上, 做出了进一步改动:

  • 使用双重 if 判定, 降低锁竞争的频率.
  • 给 instance 加上了 volatile.
代码语言:javascript
复制
class Singleton {
  private static volatile Singleton instance = null;
  private Singleton() {}
  public static Singleton getInstance() {
    if (instance == null) {//判断 instance 实例是否创建出来,如果有,说明已经是单例了,这个 if 是防频繁加锁解锁的
      synchronized (Singleton.class) {
     if (instance == null) {//多个线程获得这把锁,只有第一个获得锁的对象才会创建实例对象,这个 if 是防多线程的
       instance = new Singleton();
       }
     }
   }
    return instance;
 }
}

2.阻塞队列

阻塞队列是一种特殊的队列 . 也遵守 " 先进先出 " 的原则 .

阻塞队列能是一种线程安全的数据结构 , 并且具有以下特性 :

  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.

Java标准库中内置了阻塞队列,我们可以直接使用标准库中的BlockingQueue。

  • BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
代码语言:javascript
复制
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞. 
String elem = queue.take()。

阻塞队列的一个典型应用场景就是 " 生产者消费者模型 ". 这是一种非常典型的开发模型 .

生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.

  1. 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力 .
  2. 阻塞队列也能使生产者和消费者之间 解耦 .
阻塞队列的实现
  1. 通过 "循环队列" 的方式来实现.
  2. 使用 synchronized 进行加锁控制.
  3. put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).
  4. take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
代码语言:javascript
复制
public class BlockingQueue {
    private int[] items = new int[1000];
    private volatile int size = 0;
    private int head = 0;
    private int tail = 0;
    public void put(int value) throws InterruptedException {
        synchronized (this) {
            // 此处最好使用 while.
            // 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
            // 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了
            // 就只能继续等待
            while (size == items.length) {
                wait();
           }
            items[tail] = value;
            tail = (tail + 1) % items.length;
            size++;
            notifyAll();
       }
   }
    public int take() throws InterruptedException {
        int ret = 0;
        synchronized (this) {
            while (size == 0) {
                wait();
           }
            ret = items[head];
            head = (head + 1) % items.length;
            size--;
            notifyAll();
       }
        return ret;
   }
    public synchronized int size() {
        return size;
   }
    // 测试代码
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue blockingQueue = new BlockingQueue();
        Thread customer = new Thread(() -> {
            while (true) {
                try {
                    int value = blockingQueue.take();
                    System.out.println(value);
               } catch (InterruptedException e) {
                    e.printStackTrace();
               }
           }
       }, "消费者");
        customer.start();
        Thread producer = new Thread(() -> {
            Random random = new Random();
            while (true) {
                try {
                    blockingQueue.put(random.nextInt(10000));
               } catch (InterruptedException e) {
                    e.printStackTrace();
               }
           }
       }, "生产者");
        producer.start();
        customer.join();
        producer.join();
   }
}

3.定时器

定时器也是软件开发中的一个重要组件. 类似于一个 "闹钟". 达到一个设定的时间之后, 就执行某个指定好的代码。比如网络通信中, 如果对方 500ms 内没有返回数据, 则断开连接尝试重连。比如一个 Map, 希望里面的某个 key 在 3s 之后过期(自动删除)。类似于这样的场景就需要用到定时器。

标准库中的定时器
  • 标准库中提供了一个 Timer 类. Timer 类的核心方法为 schedule 。
  • schedule 包含两个参数. 第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后执行 (单位为毫秒)。
代码语言:javascript
复制
Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        System.out.println("hello");
   }
}, 3000);
实现自己的定时器

首先要了解实现一个定时器需要哪些构成:

  1. 一个带优先级的阻塞队列
  2. 队列中的每个元素是一个 Task 对象.
  3. Task 中带有一个时间属性, 队首元素就是即将
  4. 同时有一个 worker 线程一直扫描队首元素, 看队首元素是否需要执行

为啥要带优先级呢? 因为阻塞队列中的任务都有各自的执行时刻 (delay). 最先执行的任务一定是 delay 最小的. 使用带 优先级的队列就可以高效的把这个 delay 最小的任务找出来。

完整代码如下:

代码语言:javascript
复制
/**
* 定时器的构成:
* 一个带优先级的阻塞队列
* 队列中的每个元素是一个 Task 对象.
* Task 中带有一个时间属性, 队首元素就是即将
* 同时有一个 worker 线程一直扫描队首元素, 看队首元素是否需要执行
*/
public class Timer {
    static class Task implements Comparable<Task> {
        private Runnable command;
        private long time;
        public Task(Runnable command, long time) {
            this.command = command;
            // time 中存的是绝对时间, 超过这个时间的任务就应该被执行
            this.time = System.currentTimeMillis() + time;
       }
        public void run() {
            command.run();
       }
        @Override
        public int compareTo(Task o) {
            // 谁的时间小谁排前面
            return (int)(time - o.time);
       }
   }
    // 核心结构
    private PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue();
    // 存在的意义是避免 worker 线程出现忙等的情况
    private Object mailBox = new Object();
    class Worker extends Thread{
        @Override
        public void run() {
            while (true) {
                try {
                    Task task = queue.take();
                    long curTime = System.currentTimeMillis();
                    if (task.time > curTime) {
                        // 时间还没到, 就把任务再塞回去
                        queue.put(task);
                        synchronized (mailBox) {
                            // 指定等待时间 wait
                            mailBox.wait(task.time - curTime);
                       }
                   } else {
                        // 时间到了, 可以执行任务
                        task.run();
                   }
               } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
               }
           }
       }
   }
    public Timer() {
        // 启动 worker 线程
        Worker worker = new Worker();
        worker.start();
   }
    // schedule 原意为 "安排"
    public void schedule(Runnable command, long after) {
        Task task = new Task(command, after);
        queue.offer(task);
        synchronized (mailBox) {
            mailBox.notify();
       }
   }
    public static void main(String[] args) {
        Timer timer = new Timer();
        Runnable command = new Runnable() {
            @Override
            public void run() {
                System.out.println("我来了");
                timer.schedule(this, 3000);
           }
       };
        timer.schedule(command, 3000);
   }
}

4.线程池

线程池是现代编程中非常常见的池化技术的一种,线程池最大的好处就是减少每次启动、销毁线程的损耗。

标准库中的线程池

Java标准库中提供了一个类用来创建线程池,使用 Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池. 返回值类型为 ExecutorService,通过 ExecutorService.submit 可以注册一个任务到线程池中。

Executors 创建线程池的几种方式

  1. newFixedThreadPool: 创建固定线程数的线程池
  2. newCachedThreadPool: 创建线程数目动态增长的线程池.
  3. newSingleThreadExecutor: 创建只包含单个线程的线程池.
  4. newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer.

Executors 本质上是 ThreadPoolExecutor 类的封装.

代码语言:javascript
复制
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Runnable() {
    @Override
    public void run() {
        System.out.println("hello");
   }
});
实现一个自己的线程池
  1. 核心操作为 submit, 将任务加入线程池中
  2. 使用 Worker 类描述一个工作线程. 使用 Runnable 描述一个任务.
  3. 使用一个 BlockingQueue 组织所有的任务
  4. 每个 worker 线程要做的事情: 不停的从 BlockingQueue 中取任务并执行.
  5. 指定一下线程池中的最大线程数 maxWorkerCount; 当当前线程数超过这个最大值时, 就不再新增线程了.

具体实现代码如下:

代码语言:javascript
复制
class Worker extends Thread {
    private LinkedBlockingQueue<Runnable> queue = null;
    public Worker(LinkedBlockingQueue<Runnable> queue) {
        super("worker");
        this.queue = queue;
   }
    @Override
    public void run() {
        // try 必须放在 while 外头, 或者 while 里头应该影响不大
        try {
            while (!Thread.interrupted()) {
                Runnable runnable = queue.take();
                runnable.run();
           }
       } catch (InterruptedException e) {
       }
   }
}
public class MyThreadPool {
    private int maxWorkerCount = 10;
    private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue();
    public void submit(Runnable command) {
        if (workerList.size() < maxWorkerCount) {
            // 当前 worker 数不足, 就继续创建 worker
            Worker worker = new Worker(queue);
            worker.start();
       }
        // 将任务添加到任务队列中
        queue.put(command);
   }
    public static void main(String[] args) throws InterruptedException {
        MyThreadPool myThreadPool = new MyThreadPool();
        myThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("吃饭");
           }
       });
      Thread.sleep(1000);
   }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-10-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 多线程案例
    • 1.单例模式
      • 2.阻塞队列
        • 3.定时器
          • 4.线程池
          相关产品与服务
          容器服务
          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档