哈喽,新年好呀。我是狗哥,鸽了两周,今天回归。这是 Java 面试及源码剖析的第四篇,其他篇章的链接在这里有兴趣的小伙伴可以看看:
2、工作三年,小胖连 HashMap 源码都没读过?真的菜!
消息队列在日常工作中用得特别多。目前市面上比较常用的 MQ 消息队列中间件有 RabbitMQ、Kafka、RocketMQ 等。根据业务需求,有时还可用 Redis 做轻量的消息队列。
它的应用场景有很多,比如秒杀、记录日志等等。
秒杀就很常见了,当同一时间有大量的请求进来。如果不适用消息队列,有可能会把服务器打挂。就算不挂也会造成响应超时等问题。有了消息队列,我们可以把请求都放到消息队列里面排队处理。如果长度超过最大可承载数量,那我们选择抛弃当前用户请求。提示客户 "排队中",这样更友好。
记录日志也有对应的场景。在没消息队列前,我们是客户端进来请求,顺便记录日志。它是一个同步的行为,这会占用服务器响应的时间。而使用消息队列没我们可以在请求结束时,把日志扔到队列里面,由消费者处理,服务器直接返回请求结果。
相信大家都知道,对于一个新的框架、中间件。用起来是非常简单的,看半小时相信你就能用起来了。「但如果让你手写一个简单的消息队列,你能写出来么?」
❝狗哥用的 RabbitMQ 比较多,它是一个老牌的开源消息中间件。支持标准的 AMQP(Advanced Message Queuing Protocol,高级消息队列协议),使用 Erlang 语言开发,支持集群部署,和多种客户端语言混合调用,它支持的主流开发语言有以下这些:Java、.NET、Ruby、Python、PHP、JavaScript and Node、Objective-C and Swift、Rust、Scala 以及 Go。 ❞
RabbitMQ 中有三个重要的角色:
它的优点是:
下图就是它的工作流程:
消息队列
它一共有四种消息类型:
首先是「简单版」,必须有三个角色。消费者、生产者以及代理。只需借助 java 的 LinkedList 类即可。
import java.util.LinkedList;
import java.util.Queue;
public class SimpleQueue {
// 定义消息队列
private static Queue< String > queue = new LinkedList< >();
public static void main(String[] args) {
producer(); // 调用生产者
consumer(); // 调用消费者
}
// 生产者
public static void producer() {
// 添加消息
queue.add("first message.");
queue.add("second message.");
queue.add("third message.");
}
// 消费者
public static void consumer() {
while (!queue.isEmpty()) {
// 消费消息
System.out.println(queue.poll());
}
}
}
运行结果:可以看出消息是以先进先出的顺序消费的。
运行结果
加下来是「带延迟功能」的消息队列,这就必须需要借助 java 的 DelayQueue 类以及 Delayed 接口了。
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class SimpleDelayQueue {
// 延迟消息队列
private static DelayQueue delayQueue = new DelayQueue();
public static void main(String[] args) throws InterruptedException {
producer(); // 调用生产者
consumer(); // 调用消费者
}
// 生产者
public static void producer() {
// 添加消息
delayQueue.put(new MyDelay(1000, "消息1"));
delayQueue.put(new MyDelay(3000, "消息2"));
}
// 消费者
public static void consumer() throws InterruptedException {
System.out.println("开始执行时间:" +
DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()) {
System.out.println(delayQueue.take());
}
System.out.println("结束执行时间:" +
DateFormat.getDateTimeInstance().format(new Date()));
}
/**
* 自定义延迟队列
*/
static class MyDelay implements Delayed {
// 延迟截止时间(单位:毫秒)
long delayTime = System.currentTimeMillis();
private String msg;
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
/**
* 初始化
* @param delayTime 设置延迟执行时间
* @param msg 执行的消息
*/
public MyDelay(long delayTime, String msg) {
this.delayTime = (this.delayTime + delayTime);
this.msg = msg;
}
// 获取剩余时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 队列里元素的排序依据
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return this.msg;
}
}
}
运行结果:可以看出消息 1、消息 2 都实现了延迟执行的功能。
运行结果
本文聊了消息队列的使用场景、还介绍了我最常用的 RabbitMQ 的特性。同时还手动实现了简单版和带延迟功能的消息队列。它在我们工作中还是非常常用的,面试中问得也多。特别是诸如:聊聊你最常用的消息队列?如何手写一个消息队列等问题。