首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >消息队列

消息队列

作者头像
橘子君丶
发布2023-03-06 13:39:09
2.6K0
发布2023-03-06 13:39:09
举报
文章被收录于专栏:springBoot3.0springBoot3.0

关于消息队列

???? 文章简介:Kafka ???? 创作目的:消息队列 ☀️ 今日天气:天气很好 ???? 每日一言:“所行皆坦途 所求皆如愿。”


kafka常用于构建TB级别的异步消息系统

首先谈到对于框架的含义 :

Java 框架由一系列可重用的预编写代码组成,它们起着模板的作用,开发人员可以根据需要通过填充自定义代码来创建应用。

框架创建后可反复使用,这样开发人员即可以在一定的结构上来编写应用,而无需从头开始手动创建。

Java 框架中可以包含预定义类(例如对象类别)和函数,用于处理、输入和管理硬件设备,以及与系统软件进行交互。当然,具体的框架内容要取决于框架的类型、Java 开发人员的技能水平、他们所要完成的工作以及自己的偏好。

框架(Framework )的本质:对特定的类或方法进行封装的集成

如果我们不使用框架是否还能解决类似的问题呢 ?
答案是:

可以的,比如Kafka框架。在我们不使用Kafka的情况下,我们也能通过Java自带的API:BlockingQueue解决阻塞队列、实现消息系统或解决类似的问题、 ![6NUAJZIC_RJ`5NI4TDRZS.png](http://blog-dm-01.oss-cn-hangzhou.aliyuncs.com/articles/5509dbc218cd3763fa8bdd4298d9f36f.png)

关于Kafka使用的冷知识 现象:在windows的命令行里启动kafka之后,当关闭命令行窗口时,就会强制关闭kafka。这种关闭方式为暴力关闭,很可能会导致kafka无法完成对日志文件的解锁。届时,再次启动kafka的时候,就会提示日志文件被锁,无法成功启动。 方案:将kafka的日志文件全部删除,再次启动即可。 建议:不要暴力关闭kafka,建议通过在命令行执行kafka-server-stop命令来关闭它。 其他:将来在Linux上部署kafka之后,采用后台运行的方式,就会避免这样的问题

那么什么是阻塞队列呢 ?

阻塞队列—BlockingQueue(Java自带的API)

image20230101144635210.png
image20230101144635210.png
生产者&消费者

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,如下图所示,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况:

存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,生产者等着消费者消费产品,从而向空间中添加产品。互相等待,从而发生死锁。

image20230101150313447.png
image20230101150313447.png

上代码:

public class ProducerConsumerTest {
   public static void main(String[] args) {
      CubbyHole c = new CubbyHole();
      Producer p1 = new Producer(c, 1);
      Consumer c1 = new Consumer(c, 1);
      p1.start();
      c1.start();
   }
}
class CubbyHole {
   private int contents;
   private boolean available = false;
   public synchronized int get() {
      while (available == false) {
         try {
            wait();
         }
         catch (InterruptedException e) {
         }
      }
      available = false;
      notifyAll();
      return contents;
   }
   public synchronized void put(int value) {
      while (available == true) {
         try {
            wait();
         }
         catch (InterruptedException e) {
         }
      }
      contents = value;
      available = true;
      notifyAll();
   }
}

class Consumer extends Thread {
   private CubbyHole cubbyhole;
   private int number;
   public Consumer(CubbyHole c, int number) {
      cubbyhole = c;
      this.number = number;
   }
   public void run() {
      int value = 0;
         for (int i = 0; i < 10; i++) {
            value = cubbyhole.get();
            System.out.println("消费者 #" + this.number+ " got: " + value);
         }
    }
}

class Producer extends Thread {
   private CubbyHole cubbyhole;
   private int number;

   public Producer(CubbyHole c, int number) {
      cubbyhole = c;
      this.number = number;
   }

   public void run() {
      for (int i = 0; i < 10; i++) {
         cubbyhole.put(i);
         System.out.println("生产者 #" + this.number + " put: " + i);
         try {
            sleep((int)(Math.random() * 100));
         } catch (InterruptedException e) { }
      }
   }
}

运行结果:

消费者 #1 got: 0
生产者 #1 put: 0
生产者 #1 put: 1
消费者 #1 got: 1
生产者 #1 put: 2
消费者 #1 got: 2
生产者 #1 put: 3
消费者 #1 got: 3
生产者 #1 put: 4
消费者 #1 got: 4
生产者 #1 put: 5
消费者 #1 got: 5
生产者 #1 put: 6
消费者 #1 got: 6
生产者 #1 put: 7
消费者 #1 got: 7
生产者 #1 put: 8
消费者 #1 got: 8
生产者 #1 put: 9
消费者 #1 got: 9

同等案例(二)

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @program: BlockingQueue
 * @description:
 * @author: DM
 * @create: 2023
 **/

public class BlockingQueueTests {
    public static void main(String[] args) {
        //因为数组实现所以要求设定队列容量
        BlockingQueue queue=new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();//启动生产者线程

        new Thread(new Consumer(queue)).start();//三个消费者并发消费数据
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();


    }
}

//生成者
class Producer implements Runnable{
    //传入阻塞队列
    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue){
        this.queue=queue;
    }

    @Override
    public void run() {
        try {
            for (int i=0;i<100;i++){
                //不管企业和组件中间都有间隔
                Thread.sleep(20);//停顿20毫秒
                queue.put(i);
                System.out.println(Thread.currentThread().getName()+"生产:"+queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

//消费者
class Consumer implements Runnable{
    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue){
        this.queue=queue;
    }

    @Override
    public void run() {
        try {
            while (true){
                //不管企业和组件中间都有间隔
                Thread.sleep(new Random().nextInt(1000));//停顿0~1000毫秒
                //使用数据
                queue.take();
                System.out.println(Thread.currentThread().getName()+"消费:"+queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

运行结果:

Thread-0生产:1
Thread-0生产:2
Thread-0生产:3
Thread-0生产:4
Thread-0生产:5
Thread-0生产:6
Thread-0生产:7
Thread-0生产:8
Thread-0生产:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-0生产:10
Thread-1消费:10
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
//中间为生产者生产与消费者消费的过程(由于生产数据量相对较长所以省略)
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-3消费:8
Thread-2消费:7
Thread-1消费:6
Thread-2消费:5
Thread-3消费:4
Thread-1消费:3
Thread-2消费:2
Thread-3消费:1
Thread-1消费:0

Process finished with exit code 130

BlockingQueue实现类

BlockingQueue常见的有下面5个实现类,主要是应用场景不同。

  • ArrayBlockingQueue 基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。
  • LinkedBlockingQueue 基于链表实现的阻塞队列,默认是无界队列,创建可以指定容量大小
  • SynchronousQueue 一种没有缓冲的阻塞队列,生产出的数据需要立刻被消费
  • PriorityBlockingQueue 实现了优先级的阻塞队列,基于数据显示,是无界队列
  • DelayQueue 实现了延迟功能的阻塞队列,基于PriorityQueue实现的,是无界队列

BlockingQueue源码解析

BlockingQueue的5种子类实现方式大同小异,这次就以最常用的ArrayBlockingQueue做源码解析。

ArrayBlockingQueue类属性

先看一下ArrayBlockingQueue类里面有哪些属性:

// 用来存放数据的数组
final Object[] items;

// 下次取数据的数组下标位置
int takeIndex;

// 下次放数据的数组下标位置
int putIndex;

// 当前已有元素的个数
int count;

// 独占锁,用来保证存取数据安全
final ReentrantLock lock;

// 取数据的条件
private final Condition notEmpty;

// 放数据的条件
private final Condition notFull;

ArrayBlockingQueue中4组存取数据的方法实现也是大同小异,本次以put和take方法进行解析。

put方法源码解析

image20230101153423466.png
image20230101153423466.png

无论是放数据还是取数据都是从队头开始,逐渐往队尾移动。

// 放数据,如果队列已满,就一直阻塞,直到有其他线程从队列中取走数据
public void put(E e) throws InterruptedException {
    // 校验元素不能为空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
      // 加锁,加可中断的锁
    lock.lockInterruptibly();
    try {
        // 如果队列已满,就一直阻塞,直到被唤醒
        while (count == items.length)
            notFull.await();
          // 如果队列未满,就往队列添加元素
        enqueue(e);
    } finally {
          // 结束后,别忘了释放锁
        lock.unlock();
    }
}

// 实际往队列添加数据的方法
private void enqueue(E x) {
    // 获取数组
    final Object[] items = this.items;
    // putIndex 表示本次插入的位置
    items[putIndex] = x;
    // ++putIndex 计算下次插入的位置
    // 如果本次插入的位置,正好等于队尾,下次插入就从 0 开始
    if (++putIndex == items.length)
        putIndex = 0;
      // 元素数量加一
    count++;
    // 唤醒因为队列空等待的线程
    notEmpty.signal();
}

源码中有个有意思的设计,添加元素的时候如果已经到了队尾,下次就从队头开始添加,相当于做成了一个循环队列。

像下面这样:

image20230101151835057.png
image20230101151835057.png

4.3 take方法源码

// 取数据,如果队列为空,就一直阻塞,直到有其他线程往队列中放数据
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
      // 加锁,加可中断的锁
    lock.lockInterruptibly();
    try {
        // 如果队列为空,就一直阻塞,直到被唤醒
        while (count == 0)
            notEmpty.await();
        // 如果队列不为空,就从队列取数据
        return dequeue();
    } finally {
          // 结束后,别忘了释放锁
        lock.unlock();
    }
}

// 实际从队列取数据的方法
private E dequeue() {
      // 获取数组
    final Object[] items = this.items;
    // takeIndex 表示本次取数据的位置,是上一次取数据时计算好的
    E x = (E) items[takeIndex];
    // 取完之后,就把队列该位置的元素删除
    items[takeIndex] = null;
    // ++takeIndex 计算下次拿数据的位置
    // 如果本次取数据的位置,正好是队尾,下次就从 0 开始取数据
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 元素数量减一
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒被队列满所阻塞的线程
    notFull.signal();
    return x;
}

总结

  1. ArrayBlockingQueue基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。
  2. ArrayBlockingQueue底层采用循环队列的形式,保证数组位置可以重复使用。
  3. ArrayBlockingQueue存取都采用ReentrantLock加锁,保证线程安全,在多线程环境下也可以放心使用。
  4. 使用ArrayBlockingQueue的时候,预估好队列长度,保证生产者和消费者速率相匹配。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-01-01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 关于消息队列
    • 首先谈到对于框架的含义 :
      • 阻塞队列—BlockingQueue(Java自带的API)
        • BlockingQueue实现类
          • BlockingQueue源码解析
            • ArrayBlockingQueue类属性
            • put方法源码解析
            • 4.3 take方法源码
            • 总结
        相关产品与服务
        消息队列 CMQ 版
        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档