前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞懂Spring-AMQP

一文搞懂Spring-AMQP

作者头像
爱撒谎的男孩
发布2020-03-11 14:53:18
1K0
发布2020-03-11 14:53:18
举报
文章被收录于专栏:码猿技术专栏码猿技术专栏

文章目录

  1. 1. RabbitAdmin
  2. 2. MessageConvert
  3. 3. 生产者
    1. 3.1. TTL(消息或者队列)
      1. 3.1.1. 消息TTL
      2. 3.1.2. 队列TTL
    2. 3.2. 消息ack和nack
    3. 3.3. 消息Return
  4. 4. 消费者
    1. 4.1. 消息异步监听
    2. 4.2. 消费端的并发
    3. 4.3. 消费端限流(流量削峰)
    4. 4.4. 消息ack
    5. 4.5. 消息重回队列
    6. 4.6. 死信队列
  5. 5. 事务【不推荐】
  6. 6. 项目地址
  7. 7. 更多文章请移步公众号

RabbitAdmin

  • 队列,交换器的声明创建、删除、清空
  • 创建:

123456

@Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory ) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; }

  • 简单使用:创建一个direct交换器,绑定了一个队列

123456789

@Test public void test1(){ DirectExchange direct_1 = new DirectExchange("direct_1", true, false); rabbitAdmin.declareExchange(direct_1); Queue direct_q_1 = new Queue("direct_q_1", true); rabbitAdmin.declareQueue(direct_q_1); Binding binding = BindingBuilder.bind(direct_q_1).to(direct_1).with("direct"); rabbitAdmin.declareBinding(binding); }

MessageConvert

  • 消息转换器,在发送消息和接收消息的时候将消息内容转换成指定的格式。
  • 默认的消息转换器是SimpleMessageConverter,此转换器的功能就是将发送的消息体转换成字节数组(Object,String,Serializable),rabbitTemplate中会用到消息转换器的方法如下:

12345678910111213141516171819

void convertAndSend(Object message) throws AmqpException;void convertAndSend(String routingKey, Object message) throws AmqpException;void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;Object receiveAndConvert() throws AmqpException;Object receiveAndConvert(String queueName) throws AmqpException;

  • 实现自己的消息转换器后调用rabbitTemplate的API(public void setMessageConverter(MessageConverter messageConverter))设置即可。
  • 在与SpringBoot整合时,可以注入自己的消息转换器,amqp提供了Jackson2JsonMessageConverter,使用JackSon将消息内容转换为json字符串,配置如下:

1234567891011121314151617181920212223

/** * 注入JackSon的MessageConverter,用于收发消息的格式化成json数据 * @param ObjectMapper 这个是jackson自动注入的,详情请看JacksonAutoConfiguration */ @Bean public Jackson2JsonMessageConverter messageConverter(ObjectMapper ){ return new Jackson2JsonMessageConverter(objectMapper); }/*** 重新注入RabbitTemplate,并且设置相关属性*/@Bean @Primary public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter, CustomConfirmCallBack confirmCallBack, CustomReturnCallBack returnCallBack){ RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMandatory(true); //设置消息转换器 template.setMessageConverter(messageConverter); template.setReturnCallback(returnCallBack); template.setConfirmCallback(confirmCallBack); return template; }

生产者

TTL(消息或者队列)

  • TTL表示消息或者队列的生命周期,在消息发送或者队列创建的时候可以设置消息的存活时间,如果此条消息或者队列中的所有消息到达指定的时候还是没有被消费,那么消息将会被清空或者存入死信队列中。
消息TTL
  • 在发送消息的时候指定的TTL,(MessageProperties)API如下:
    • public void setExpiration(String expiration):单位毫秒
队列TTL
  • 在创建队列的时候指定过期时间,在创建Queue的时候需要指定过期时间(x-message-ttl),设置在arguments

消息ack和nack

  • 消息确认机制,生产者发送消息可能因为网络、交换机不存在等其他问题导致消息投递失败,消息ack机制可以在消息投递之后针对失败或者成功做一些业务的处理。
  • 只要消息发送到exchange,ConfirmCallback回调的ack=true,而returncallback是能否发送到队列的回调函数
  • 监听步骤:
    • 设置connectionFacotry的发布确认模式为ConfirmType.CORRELATED,代码如下:

    12//设置消息发送ack,默认noneconnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

    • 自定义RabbitTemplate.ConfirmCallback的实现类,重写其中的方法,如下:

    1234567891011121314151617181920212223/** * @Description 消息确认回调,在消息发出之后回调 * @Author CJB * @Date 2020/2/21 15:36 */@Componentpublic class MyConfirmCallback implements RabbitTemplate.ConfirmCallback { /** * * @param correlationData 发送消息时携带的参数,在业务上能够唯一识别,比如主键id等 * @param ack 消息是否发送成功的标志,true成功,false失败 * @param cause 消息发送失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println(correlationData.getId()+"---->"+ack+"--->"+cause); //消息投递失败执行逻辑,比如消息入库,设置失败标记等操作 if (!ack){ System.err.println("消息投递失败"); } }}

    • 在RabbitTemplate中设置

    1template.setConfirmCallback(myConfirmCallback);

消息Return

  • 用于处理一些路由不可达的消息,比如发送消息时指定的路由投递不到相应的队列,此时Return Listener就会监听到这些消息进行处理
  • 实现步骤:
    • 设置ConnectionFactorypublisherReturns为true

    12//设置开启发布消息的Return监听 connectionFactory.setPublisherReturns(true);

    • 设置RabbitTemplate的`mandatory为true,或者mandatory-expression执行的结果为true

    1template.setMandatory(true);

    • 自定义实现RabbitTemplate.ReturnCallback的类,重写其中的方法,如下:

    123456789101112131415161718192021222324/** * @Description ReturnListener的监听,处理发送消息时路由不可达的消息 * @Author CJB * @Date 2020/2/21 17:04 */@Componentpublic class MyReturnCallBack implements RabbitTemplate.ReturnCallback { /** * 在消息路由不可达会回调此方法,用于处理这些消息,比如记录日志,消息补偿等等操作 * @param message 投递的消息 * @param replyCode 响应的状态吗 * @param replyText 响应的文本 * @param exchange 交换机 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.err.println("message:"+new String(message.getBody())); System.err.println("replyCode:"+replyCode); System.err.println("replyText:"+replyText); System.err.println("exchange:"+exchange); System.err.println("routingKey:"+routingKey); }}

    • 在RabbitTemplate中设置

    1template.setReturnCallback(myReturnCallBack);

消费者

消息异步监听

  • 异步监听消息需要设置一个监听器,一旦监听的队列中有消息发送,此监听器将会起作用。
  • 步骤如下:
    • 注入SimpleMessageListenerContainer

    12345678910111213141516171819@Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //添加监听的队列 container.addQueueNames("queue1"); //设置消费者ack消息的模式,默认是自动,此处设置为手动 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置消费者的并发数量 container.setConcurrentConsumers(1); //设置单个消费请求能够处理的消息条数,默认250 container.setPrefetchCount(250); //设置最大的并发数量 container.setMaxConcurrentConsumers(10); //设置消费者的tag的生成策略,队列的名字+"_"+UUID container.setConsumerTagStrategy(queue -> queue+"_"+ UUID.randomUUID().toString()); //设置消息监听器 container.setMessageListener(customMessageListener1()); return container; }

    • 自定义一个消息监听器MessageListener的实现类,此处有两个接口:
      • MessageListener:实现该接口,重写其中的方法,不过此种的实现没有channel对象
      • ChannelAwareMessageListener: 其中重写的方法除了Message对象,还提供了Channel对象,用于手动ack等操作。

      12345678910111213141516171819202122232425262728/** * 自定义Message监听器 * @return */ @Bean public MessageListener customMessageListener(){ return msg-> System.err.println("消费者:"+new String(msg.getBody())); } @Bean public ChannelAwareMessageListener customMessageListener1(){ return (msg,chanel)->{ long deliveryTag = msg.getMessageProperties().getDeliveryTag(); try{ System.err.println("message:"+new String(msg.getBody())); System.err.println("properties:"+ deliveryTag); //.....执行系列的逻辑 //逻辑顺利执行完成之后执行ack chanel.basicAck(deliveryTag,false); }catch (Exception ex){ //记录日志等操作 //消息执行出现异常,nack,设置不重回队列,如果设置了死信队列,那么将会到死信队列中 chanel.basicNack(deliveryTag,false,false); } }; }

消费端的并发

  • 默认一个队列只有一个消费者监听,但是我们可以同时设置多个消费者监听这个消息,提高消息消费的效率。
  • SimpleMessageListenerContainer中的两个属性可以完成设置,如下:
    • concurrentConsumers:消费者的数量,默认1
    • maxConcurrentConsumers:最大消费者的数量

消费端限流(流量削峰)

  • 假设rabbitmq服务器有上万条信息没有处理,当开启一个消费端的话,那么就有可能出现服务器卡死的情况。
  • Rabbitmq提供了一种qos(服务质量保证)功能,即在非确认消息的前提下(手动确认消息),如果一定数目的消息(基于consumer或者channel的qos的设置)未被确认前(没有ack或者nack),不进行消费新的消息。
  • amqp实现如下:
    • SimpleMessageListener中有一个属性prefetchCount,该属性用来限制消费端的同时处理的请求,默认是250,使用spring AMQP直接设置即可,与SpringBoot整合,配置如下:

12345

spring: rabbitmq: listener: simple: prefetch: 1

消息ack

  • 默认是自动ack的,即是在接收到这条消息之后无论有没有正确消费,这条消息都会从队列中删除。当然可以设置手动ack,即是在消费者接收消息,正确处理完成之后,手动确认ack,那么此条消息才会从队列中删除。
  • API(Channel类):
    • void basicAck(long deliveryTag, boolean multiple):ack消息
      • deliveryTag:Message中的属性
      • multiple:是否批量ack消息
    • void basicNack(long deliveryTag, boolean multiple, boolean requeue):nack消息
      • requeue:是否重回队列,如果设置了重回队列,那么这条消息会被重新进入队列中的最后一条消息,如果设置了false并且此队列设置了死信队列,那么将会被放入死信队列中。
  • 实现步骤:
    • 设置消费者的确认模式为手动确认,使用的是SimpleMessageListenerContainer的API

    12//设置消费者ack消息的模式,默认是自动,此处设置为手动 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

    • 消息异步监听的实现类是ChannelAwareMessageListener,通过自己的业务逻辑判断何时需要ack何时需要nack

    12345678910111213141516171819@Bean public ChannelAwareMessageListener customMessageListener1(){ return (msg,chanel)->{ long deliveryTag = msg.getMessageProperties().getDeliveryTag(); try{ System.err.println("message:"+new String(msg.getBody())); System.err.println("properties:"+ deliveryTag); //.....执行系列的逻辑 //逻辑顺利执行完成之后执行ack chanel.basicAck(deliveryTag,false); }catch (Exception ex){ //记录日志等操作 //消息执行出现异常,nack,设置不重回队列,如果设置了死信队列,那么将会到死信队列中 chanel.basicNack(deliveryTag,false,false); } }; }

消息重回队列

  • 重回队列的机制即是消息在nack之后如果设置了重回队列,那么此条消息将会被重新放入到此队列中的最后一条,之后将会被重新投递到消费端消费。
  • 重回队列的机制并不支持使用,如果是业务逻辑上的异常导致消息重回队列,那么重新消费也是没有多大意义。在实际的工作上可以采用补偿机制解决。
  • 设置重回队列如下:
    • SimpleMessageListenerContainer中设置默认的行为如下:

    12//设置不重回队列,默认为true,即是消息被拒绝或者nack或者监听器抛出异常之后会重新返回队列 container.setDefaultRequeueRejected(false);

    • 在nack消息的时候有一个requeue的属性设置,如下:

    12//消息执行出现异常,nack,requeue=false设置不重回队列,如果设置了死信队列,那么将会到死信队列中chanel.basicNack(deliveryTag,false,false);

死信队列

  • 消息变成死信的情况如下(前提:消息所在队列设置了死信队列):
    • 消息被拒绝(nack/reject)并且requeue=false(不设置重回队列)
    • 消息的TTL过期
    • 队列达到最大长度
  • 死信队列在rabbitmq中其实是一个exchange,只是普通的交换机和队列。
  • 想要消息被拒绝或者过期之后能够回到死信队列中,需要在队列声明的时候添加一个x-dead-letter-exchange,指定死信的交换机

12345678910111213

String exchange="a_exchange"; String queueName="a_queue"; TopicExchange topicExchange = new TopicExchange(exchange, true, true); Map<String,Object> arguments=new HashMap<>(); //指定死信队列,dlx-exchange是死信交换机 arguments.put("x-dead-letter-exchange", "dlx-exchange");//设置死信队列的路由键,需要根据这个路由键找到对应的队列arguments.put("x-dead-letter-routing-key", "dlx-key"); Queue queue = new Queue(queueName, true, false, false, arguments); Binding binding = BindingBuilder.bind(queue).to(topicExchange).with("test.#"); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(topicExchange); rabbitAdmin.declareBinding(binding);

事务【不推荐】

  • rabbitmq默认是没有开启事务的,提交和发送消息甚至业务逻辑中间涉及到数据库操作都不在同一个事务中。
  • amqp如何设置事务:
    • 关闭生产的消息确认(ack),当然默认是不开启的,投递消息的确认和事务是不能同时存在的
    • 设置RabbitTemplate中的setChannelTransacted方法为true,表示使用事务。
    • 定义事务管理器RabbitTransactionManager,实现了PlatformTransactionManager,这个事务管理器的事务只针对rabbitmq消息的发送和获取,对数据库的事务无效

    123456@Bean public PlatformTransactionManager transactionManager(){ RabbitTransactionManager manager = new RabbitTransactionManager(); manager.setConnectionFactory(connectionFactory()); return manager; }

    • 同步发送和消费消息的事务,使用@Transactional注解(无需声明RabbitTransactionManager,直接使用数据源的事务即可完成数据和mq消息的事务),如下:

    1234567891011121314@Transactional public void sendMsg(String msg){ //接收消息 Message message1 = rabbitTemplate.receive("a_queue"); System.err.println(new String(message1.getBody())); String queueName="direc_q_1"; String exchangeName="direct_1"; String routingKey="direct"; Message message = MessageBuilder.withBody(msg.getBytes()).andProperties(new MessageProperties()).build(); rabbitTemplate.send(exchangeName,routingKey,message,new CorrelationData(UUID.randomUUID().toString())); //此处出现异常,事务将会回滚 System.out.println(1/0); }

  • 异步消费消息使用的是监听器,此时就需要在SimpleMessageListenerContainer中设置,如下: 1234//开启事务 container.setChannelTransacted(true); //设置事务管理器 container.setTransactionManager(transactionManager());

项目地址

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-03-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RabbitAdmin
  • MessageConvert
  • 生产者
    • TTL(消息或者队列)
      • 消息TTL
      • 队列TTL
    • 消息ack和nack
      • 消息Return
      • 消费者
        • 消息异步监听
          • 消费端的并发
            • 消费端限流(流量削峰)
              • 消息ack
                • 消息重回队列
                  • 死信队列
                  • 事务【不推荐】
                  • 项目地址
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档