消息中间件基于队列模型实现异步/同步传输数据 作用:可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
注意事项:接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理
可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
客户端发送请求到达服务器端,服务器端实现会员注册业务逻辑, 1.insertMember() --插入会员数据 1s 2.sendSms()----发送登陆短信提醒 3s 3.sendCoupons()----发送新人优惠券 3s 总共响应需要6s时间,可能会导致客户端阻塞6s时间,对用户体验 不是很好。
多线程与MQ方式实现异步?
互联网项目: 客户端 安卓/IOS
服务器端:php/java 最好使用mq实现异步
用户向数据库中插入一条数据之后,在单独开启一个线程异步发送短信和优惠操作。 客户端只需要等待1s时间 优点:适合于小项目 实现异步 缺点:有可能会消耗服务器cpu资源资源
先向数据库中插入一条会员数据,让后再向MQ中投递一个消息,MQ服务器端在将消息推送给消费者异步解耦处理发送短信和优惠券。
MQ可以实现异步/解耦/流量削峰问题; 多线程也可以实现异步,但是消耗到cpu资源,没有实现解耦。
Producer 生产者:投递消息到MQ服务器端; Consumer 消费者:从MQ服务器端获取消息处理业务逻辑; Broker MQ服务器端 Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题 Queue 存放消息模型 队列 先进先出 后进后出原则 数组/链表 Message 生产者投递消息报文:json
多线程版本mq; 基于网络通讯版本mq netty实现
public class GtfThreadMQ {
private static LinkedBlockingQueue<String> queue=new LinkedBlockingQueue<>();
public static void main(String[] args) {
//生产者线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
queue.put("userid");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生产者").start();
//消费者线程
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("1");
while (true){
String poll = queue.poll();
if (poll != null) {
System.out.println(poll);
}
}
}
},"消费者").start();
}
}
消费者netty客户端与nettyServer端MQ服务器端保持长连接,MQ服务器端保存 消费者连接。 生产者netty客户端发送请求给nettyServer端MQ服务器端,MQ服务器端在将该 消息内容发送给消费者。
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的。 RabitMQ官方网站: https://www.rabbitmq.com/
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5 </version>
</dependency>
</dependencies>
public class GetCon {
/**
* 获取连接
* @return
*/
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.配置Host
connectionFactory.setHost("127.0.0.1");
//3.设置Port
connectionFactory.setPort(5672);
//4.设置账户和密码
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
//5.设置VirtualHost
connectionFactory.setVirtualHost("gtf_virtualhost");
return connectionFactory.newConnection();
}
}
/**
* 生产者
*/
public class Product {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = GetCon.getConnection();
Channel channel = connection.createChannel();
String msg="1";
// channel.basicPublish(null,"gtf_queues",null,msg.getBytes());
channel.basicPublish("","gtf_queues",null,msg.getBytes("UTF-8"));
channel.close();
connection.close();
}
}
public class Consumer {
private static final String QUEUE_NAME = "gtf_queues";
public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
// 1.创建连接
Connection connection = GetCon.getConnection();
// 2.设置通道
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msg);
// 消费者完成 消费该消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
RabbitMQ如何保证消息不丢失
默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。
采用工作队列 在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。 channel.basicQos(1);
Direct exchange(直连交换机) Fanout exchange(扇型交换机) Topic exchange(主题交换机) Headers exchange(头交换机) /Virtual Hosts—区分不同的团队 ----队列 存放消息 ----交换机 路由消息存放在那个队列中 类似于nginx —路由key 分发规则
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。
原理:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<!-- springboot-web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
@Component
public class RabbitMQConfig {
/**
* 定义交换机
*/
private String EXCHANGE_SPRINGBOOT_NAME = "/gtf_ex";
/**
* 短信队列
*/
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
/**
* 邮件队列
*/
private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
/**
* 配置smsQueue
*
* @return
*/
@Bean
public Queue smsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
/**
* 配置emailQueue
*
* @return
*/
@Bean
public Queue emailQueue() {
return new Queue(FANOUT_EMAIL_QUEUE);
}
/**
* 配置fanoutExchange
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);
}
// 绑定交换机 sms
@Bean
public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(smsQueue).to(fanoutExchange);
}
// 绑定交换机 email
@Bean
public Binding bindingEmailFanoutExchange(Queue emailQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(emailQueue).to(fanoutExchange);
}
}
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: admin
####密码
password: 123456
### 地址
virtual-host: /gtf_virtualhost
redis:
host: 127.0.0.1
# password: 123456
port: 6379
死信队列产生的背景 RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。
死信队列和普通队列区别不是很大 普通与死信队列都有自己独立的交换机和路由key、队列和消费者。 区别: 1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到 普通队列中缓存起来,普通队列对应有自己独立普通消费者。 2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费 的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机 对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。
1.30分钟订单超时设计 A. Redis过期key : B. 死信延迟队列实现: 采用死信队列,创建一个普通队列没有对应的消费者消费消息,在30分钟过后 就会将该消息转移到死信备胎消费者实现消费。 备胎死信消费者会根据该订单号码查询是否已经支付过,如果没有支付的情况下 则会开始回滚库存操作。
A.消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试? 该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。 B. 消费者获取消息后,因为代码问题抛出数据异常,是否需要重试? 该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。 可以将日志存放起来,后期通过定时任务或者人工补偿形式。 如果是重试多次还是失败消息,需要重新发布消费者版本实现消费 可以使用死信队列
总结:如果消费者处理消息时,因为代码原因抛出异常是需要从新发布版本才能解决的,那么就不需要重试,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿。