消息中间件作用:异步解耦、流量消峰
注意: RabbitMQ 它依赖于Erlang,需要先安装Erlang。
http://192.168.1.6:15672 默认账号:guest / guest
RabbitMQ官方教程:https://www.rabbitmq.com/getstarted.html
当生产者发送消息时,它并不是直接把消息发送到队列里的,而是使用交换机(Exchange)来发送
创建连接 ——> 获取连接 ——–>创建通道 —–> 声明队列 —->发送消息
消费端,通过监听方式,拉取队列对应的消息。
queueDeclare
队列声明
channel
通道
应答模式ACK
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
package com.zx.rabbitmqdemo.simpleQueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProductMessage {
//声明一个队列
private static final String QUEUE_NAME = "test_rabbitmq_simple";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置地址
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setHost("192.168.1.6");
//设置虚拟主机
connectionFactory.setVirtualHost("/");
//获取连接
Connection connection = connectionFactory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//申明一个队列
// durable: 如果为true,消息持久化,服务器重启后消息还存在
//exclusive:如果声明独占队列(仅限于此连接),则为true
//autoDelete:如果声明的是自动删除队列,则为true(服务器将在不再使用时将其删除)
//arguments:队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "生产者发送消息发送到队列:"+QUEUE_NAME+"中 ";
/* 发布消息: exchange:交换机 queue_name:队列名 props:消息路由头等的其他属性 body:消息体,二进制数组 */
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.close();
connection.close();
}
}
package com.zx.rabbitmqdemo.simpleQueue;
import com.rabbitmq.client.*;
import com.rabbitmq.client.impl.recovery.ConsumerRecoveryListener;
import java.io.UnsupportedEncodingException;
public class ConsumerMessage {
//声明一个队列
private static final String QUEUE_NAME = "test_rabbitmq_simple";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置地址
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setHost("192.168.1.6");
//设置虚拟主机
connectionFactory.setVirtualHost("/");
//获取连接
Connection connection = connectionFactory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//申明一个队列
// durable: 如果为true,消息持久化,服务器重启后消息还存在
//exclusive:如果声明独占队列(仅限于此连接),则为true
//autoDelete:如果声明的是自动删除队列,则为true(服务器将在不再使用时将其删除)
//arguments:队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//TODO 工作模式,需要进行手动应答,谁应答块,谁消费消息多
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//自动应答 简单队列模式
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
/*Consumer consumer = new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String msg = new String(body,"utf-8"); System.out.println(msg); } } ; // 3.监听队列 channel.basicConsume(QUEUE_NAME, true, consumer);*/
}
}
Exchange
交换机[消息管道声明交换机,如果存在就创建一个交换机,并且把该管道绑定到该交换机]
RoutingKey
路由key
、Exchange
交换机
p:product 、 x:exchange、多个队列 ; queue 绑定 exchange
Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列【默认交换机类型】
注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失
public class RabbitmqUtil {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置地址
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setHost("192.168.1.6");
//设置虚拟主机
connectionFactory.setVirtualHost("/");
//获取连接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
/** * 发布/订阅 publish / subscribe */
public class ProductMessage {
private static final String EXCHANGE_NAME = "my_fanout";
public static void main(String[] args) throws Exception{
Connection connection = RabbitmqUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机,type = fanout,如果交换机不存在,声明的同时也绑定交换机.
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//创建消息
String msg = "exchange type fanout rabbitmq 发布订阅";
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
//关闭通道
channel.close();
connection.close();
}
}
public class ConsumerMessage01 {
private static final String EXCHANGE_NAME = "my_fanout";
private static final String QUEUE_SMS = "sms_queue_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//消费者申明队列
//申明一个队列
// durable: 如果为true,消息持久化,服务器重启后消息还存在
//exclusive:如果声明独占队列(仅限于此连接),则为true
//autoDelete:如果声明的是自动删除队列,则为true(服务器将在不再使用时将其删除)
//arguments:队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_SMS, false, false, false, null);
//消费者绑定队列
channel.queueBind(QUEUE_SMS,EXCHANGE_NAME,"");
//消费者监听队列
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//TODO 工作模式,需要进行手动应答,谁应答块,谁消费消息多
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_SMS, true, deliverCallback, consumerTag -> {
});
}
}
public class ProductMessage {
//声明一个队列
private static final String QUEUE_NAME = "test_rabbitmq_simple";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置地址
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setHost("192.168.1.6");
//设置虚拟主机
connectionFactory.setVirtualHost("/");
//获取连接
Connection connection = connectionFactory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//申明一个队列
// durable: 如果为true,消息持久化,服务器重启后消息还存在
//exclusive:如果声明独占队列(仅限于此连接),则为true
//autoDelete:如果声明的是自动删除队列,则为true(服务器将在不再使用时将其删除)
//arguments:队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "生产者发送消息发送到队列:"+QUEUE_NAME+"中 ";
/* 发布消息: exchange:交换机 queue_name:队列名 props:消息路由头等的其他属性 body:消息体,二进制数组 */
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.close();
connection.close();
}
}
路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的
exchange_type = direct
路由模式,exchange 会根据routingkey 进行投递消息到队列中。
一个队列可以绑定多个routingkey
/** * 发布/订阅 publish / subscribe */
public class ProductMessage {
private static final String EXCHANGE_NAME = "my_direct";
private static final String ROUTING_KEY = "routing_key";
public static void main(String[] args) throws Exception{
Connection connection = RabbitmqUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机,type = fanout,如果交换机不存在,声明的同时也绑定交换机.
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//创建消息
String msg = "exchange type direct rabbitmq 路由策略routingkey";
//发送消息
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,msg.getBytes());
//关闭通道
channel.close();
connection.close();
}
}
public class ConsumerMessage01 {
private static final String EXCHANGE_NAME = "my_direct";
private static final String QUEUE_SMS = "sms_queue_exchange_direct";
private static final String ROUTING_KEY = "routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//消费者申明队列
//申明一个队列
// durable: 如果为true,消息持久化,服务器重启后消息还存在
//exclusive:如果声明独占队列(仅限于此连接),则为true
//autoDelete:如果声明的是自动删除队列,则为true(服务器将在不再使用时将其删除)
//arguments:队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_SMS, false, false, false, null);
//消费者绑定队列 绑定routingkey ,可以绑定多个
//channel.queueBind(QUEUE_SMS,EXCHANGE_NAME,ROUTING_KEY_SMS);
channel.queueBind(QUEUE_SMS,EXCHANGE_NAME,ROUTING_KEY);
//消费者监听队列
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//TODO 工作模式,需要进行手动应答,谁应答块,谁消费消息多
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_SMS, true, deliverCallback, consumerTag -> {
});
}
}
public class ConsumerMessage02 {
private static final String EXCHANGE_NAME = "my_direct";
private static final String ROUTING_KEY = "routing_key";
private static final String QUEUE_EMAIL = "email_queue_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//消费者申明队列
//申明一个队列
// durable: 如果为true,消息持久化,服务器重启后消息还存在
//exclusive:如果声明独占队列(仅限于此连接),则为true
//autoDelete:如果声明的是自动删除队列,则为true(服务器将在不再使用时将其删除)
//arguments:队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_EMAIL, false, false, false, null);
//消费者绑定队列
//channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,ROUTING_KEY);
channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,"");
//消费者监听队列
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//TODO 工作模式,需要进行手动应答,谁应答块,谁消费消息多
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_EMAIL, true, deliverCallback, consumerTag -> {
});
}
}
consumer02没有绑定routingkey,consumer01 绑定routingkey 。product申明交换机的同时指定routingkey
此模式实在路由key模式的基础上,使用了通配符来管理消费者接收消息
生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;
符号#
:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符号*
:只能匹配一个词lazy. 可以匹配lazy.irs或者lazy.cor
**
* 发布/订阅 publish / subscribe
*/
public class ProductMessage {
private static final String EXCHANGE_NAME = "my_topic";
private static final String ROUTING_KEY = "routing_key_los.sms";
public static void main(String[] args) throws Exception{
Connection connection = RabbitmqUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机,type = fanout, direct topic如果交换机不存在,声明的同时也绑定交换机.
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//创建消息
String msg = "exchange type topic rabbitmq routing_key_los.sms";
//发送消息
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,msg.getBytes());
//关闭通道
channel.close();
connection.close();
}
}
public class ConsumerMessage01 {
private static final String EXCHANGE_NAME = "my_topic";
private static final String QUEUE_SMS = "sms_queue_exchange_topic";
private static final String ROUTING_KEY = "routing_key_los.#";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//消费者申明队列
//申明一个队列
// durable: 如果为true,消息持久化,服务器重启后消息还存在
//exclusive:如果声明独占队列(仅限于此连接),则为true
//autoDelete:如果声明的是自动删除队列,则为true(服务器将在不再使用时将其删除)
//arguments:队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_SMS, false, false, false, null);
//消费者绑定队列 绑定routingkey ,可以绑定多个
channel.queueBind(QUEUE_SMS,EXCHANGE_NAME,ROUTING_KEY);
//消费者监听队列
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//TODO 工作模式,需要进行手动应答,谁应答块,谁消费消息多
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_SMS, true, deliverCallback, consumerTag -> {
});
}
}
public class ConsumerMessage02 {
private static final String EXCHANGE_NAME = "my_topic";
private static final String ROUTING_KEY = "routing_key_los.#";
private static final String QUEUE_EMAIL = "email_queue_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//消费者申明队列
//申明一个队列
// durable: 如果为true,消息持久化,服务器重启后消息还存在
//exclusive:如果声明独占队列(仅限于此连接),则为true
//autoDelete:如果声明的是自动删除队列,则为true(服务器将在不再使用时将其删除)
//arguments:队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_EMAIL, false, false, false, null);
//消费者绑定队列
channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,ROUTING_KEY);
//消费者监听队列
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//TODO 工作模式,需要进行手动应答,谁应答块,谁消费消息多
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_EMAIL, true, deliverCallback, consumerTag -> {
});
}
}
// durable: 如果为true,消息持久化,服务器重启后消息还存在
//exclusive:如果声明独占队列(仅限于此连接),则为true
//autoDelete:如果声明的是自动删除队列,则为true(服务器将在不再使用时将其删除)
//arguments:队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
public class ProductMessage {
//声明一个队列
private static final String QUEUE_NAME = "test_rabbitmq_simple_tx";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置地址
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setHost("192.168.1.6");
//设置虚拟主机
connectionFactory.setVirtualHost("/");
//获取连接
Connection connection = connectionFactory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//申明一个队列
// durable: 如果为true,消息持久化,服务器重启后消息还存在
//exclusive:如果声明独占队列(仅限于此连接),则为true
//autoDelete:如果声明的是自动删除队列,则为true(服务器将在不再使用时将其删除)
//arguments:队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try {
channel.txSelect();//开启事物
String msg = "生产者发送消息发送到队列:" + QUEUE_NAME + "中 ";
/* 发布消息: exchange:交换机 queue_name:队列名 props:消息路由头等的其他属性 body:消息体,二进制数组 */
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
//TODO 发送完消息之后 ,出现int i = 1/0的错误 ,消息会发送到rabbitmq服务器中
int i = 1 / 0;
channel.txCommit();//提交事物
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();//回滚事物
} finally {
channel.close();
connection.close();
}
}
}
发送消息确认:用来确认生产者 producer
将消息发送到 broker
,broker
上的交换机 exchange
再投递给队列 queue
的过程中,消息是否成功投递。
producer
到 rabbitmq broker
有一个 confirmCallback
确认模式。
exchange
到 queue
投递失败有一个 returnCallback
退回模式。
Callback
来确保消的100%送达。
配置文件中配置开启confirmcallback / return callback 模式
消费端如果出现业务上的异常,比如int i = 1/0
,消费端默认会进行重试。RabbitMQ服务器上的消息没有被消费端消费。补偿机制是队列服务器(RabbitMQ服务器)发送的。
@RabbitListener 注解.底层使用AOP进行拦截,只要该方法没有抛出异常。会自动提交事物,RabbitMQ会删除消息。如果被AOP异常通知拦截,补货异常信息,会自动实现补偿机制,一致补偿到不抛出异常,该消息一致会缓存在RabbitMQ服务器上缓存。
修改补偿机制,默认间隔5s重试.可以在配置文件中配置重试时间间隔和重试次数.
listener:
simple:
retry:
####开启消费者重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 3000
重试了配置的重试次数之后,就放弃消息重试,如果程序还在报异常,需要我们把消息转入到死信队列对,或者不用后续处理,RabbitMQ会把该消息删除。
利用RabbitMQ的重试机制,去重复调用第三方接口。比如:消费者消费消息抛出异常处理的原理.
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息
x-dead-letter-exchange: 用来设置死信后发送的交换机
x-dead-letter-routing-key:用来设置死信的routingKey
生产者
@Component
public class FanoutConfig {
/** * 定义死信队列相关信息 */
public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange";
/** * 死信队列 交换机标识符 */
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/** * 死信队列交换机绑定键标识符 */
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
// 邮件队列
private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
// 短信队列
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
// fanout 交换机
private String EXCHANGE_NAME = "fanoutExchange";
// 1.定义邮件队列
@Bean
public Queue fanOutEamilQueue() {
// 将普通队列绑定到死信队列交换机上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args);
return queue;
}
// 2.定义短信队列
@Bean
public Queue fanOutSmsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
// 2.定义交换机
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
// 3.队列与交换机绑定邮件队列
@Bean
Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
}
// 4.队列与交换机绑定短信队列
@Bean
Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
}
/** * 配置死信队列 * * @return */
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}
}
消费者配置
@RabbitListener(queues = "fanout_email_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId);
JSONObject jsonObject = JSONObject.parseObject(msg);
Integer timestamp = jsonObject.getInteger("timestamp");
try {
int result = 1 / timestamp;
System.out.println("result:" + result);
// 通知mq服务器删除该消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
// // 丢弃该消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
@Component
public class DeadConsumer {
@RabbitListener(queues = "dead_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("死信邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
MQ解决分布式事务三个重要概念
生产者 一定确保消息投递到MQ服务器(使用)
补偿队列
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/100742.html原文链接:https://javaforall.cn