前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot与rabbitmq整合

springboot与rabbitmq整合

作者头像
sucl
发布2019-08-07 15:06:42
1.3K0
发布2019-08-07 15:06:42
举报
文章被收录于专栏:企业平台构建

之前学习了rabbitmq,对其基本的用法有了一定的认识,但对其深层次的使用必须依赖于具体的业务,就像编程语言一样,提供的基础的使用规范,但是却产生了那么多高性能高可用的框架,都是依据最基础的功能,在与思想的磨合下而产生,同样,如何使用好mq技术,并能实现各种复杂的业务,那么就必须在某种思维的模式或业务领域的促使下才能完成。

本次使用shpringboot与rabbitmq整合来实现基础的功能,大致了解下基本的用法,但是对比较具体的细节方面不可能做到面面俱到。

首先通过idea构建一个maven的项目,这个没什么多说的,添加依赖:

代码语言:javascript
复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

和之前的一样,消息通信主要需要连接、信道、交换机、队列,通过之前对rabbitmq的学习,我们知道,向交换机和队列其实只用申明一次,多次的申明也只是复用,那么我们可以使用spring中的bean来定义,并交由spring容器管理,而连接这些都是spring内部管理了,最终以RabbitTemplate暴露出来给我们使用,具体的连接过程是不用开发者去关注了。

首先定义一个类AmqpConfiguration,我们将会定义一些交换机、队列:

代码语言:javascript
复制
@Bean
public Queue maintainQueue(){
    return new Queue(QUEUE, true, false, false, null);
}
代码语言:javascript
复制
@Bean
public TopicExchange topicExchange(){
    return new TopicExchange(EXCHANGE,true,false,null);
}
代码语言:javascript
复制
@Bean
public Binding maintainBinding(){
    return BindingBuilder.bind(maintainQueue()).to(topicExchange()).with(BINDING_KEY);
}

然后发送消息到刚定义的交换机上,添加类MessageSender,专门用于发送消息:

代码语言:javascript
复制
public void send(String message){
    rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE,AmqpConfiguration.ROUTING_KEY,message);
}

其实和在rabbitmq中用法一样,参数就是交换机名称、路由键、消息。

那么接收消息呢?以前的做法是申明队列、交换机、然后通过bindingkey维护队列与交换机的关系,让发送消息时能够转发到对应的队列上,我们定义一个MessageReceiver,里面接收消息如下:

代码语言:javascript
复制
@RabbitListener(queues = AmqpConfiguration.QUEUE)
@RabbitHandler
public void receive(Channel channel, Message message){
    String messsageText = new String(message.getBody());
    try {
        if(!validate(messsageText)){
            log.info("接收消息:{}",messsageText);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }else{
            log.info("拒绝消息:{}",messsageText);
            //ack返回false,并重新回到队列,可以批量拒绝 multiple=true
        //    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            //拒绝消息,只能拒绝一条
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

我们在@RabbitListener(queues = AmqpConfiguration.QUEUE)中指明从哪个队列里拿消息就行,其实参数可以通过一些注解来自动注入,比如

  • @Payload Message msg
  • @Headers Map<String, Object> headers

当然我们可以这样定义队列、交换机等

代码语言:javascript
复制
z@RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.MULTIPART_HANDLE_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.MULTIPART_HANDLE_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = RabbitMQConstant.MULTIPART_HANDLE_KEY ) )

这样就完成了消息的发送与接收了。

再看看另一个知识,死信

首先我们看下消息变成死信的条件:

  • 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

首先定义死信相关元素:

代码语言:javascript
复制
@Bean
public Queue dlxQueue(){
    return new Queue(QUEUE_DLX,false,false,false,null);
}
代码语言:javascript
复制
@Bean
public Binding dlxBindging(){
    return BindingBuilder.bind(dlxQueue()).to(directExchange()).with(ROUTING_DLX_KEY);
}

现在从两个方面来将消息转换死信:

代码语言:javascript
复制
@Bean
public Queue maintainQueue(){
    Map<String,Object> args = new HashMap<String,Object>();
    args.put("x-dead-letter-exchange", EXCHANGE_DLX);//死信交换机
    args.put("x-dead-letter-routing-key", ROUTING_DLX_KEY);//死信routingKey
    args.put("x-expires", QUEUE_TTL);//ms 队列过期时间
    args.put("x-message-ttl", MESSAGE_TTL);//消息过期时间
    return new Queue(QUEUE, true, false, false, args);
}

通过消息过期来完成,当然队列过期也一样,取最小值。

或者在接收方通过拒绝消息来实现:

代码语言:javascript
复制
if(!validate(messsageText)){
    log.info("接收消息:{}",messsageText);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
    log.info("拒绝消息:{}",messsageText);
    //ack返回false,并重新回到队列,可以批量拒绝 multiple=true
//    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
    //拒绝消息,只能拒绝一条
    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}

对特定的消息进行拒绝,channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false)第三参数要为false,不然就requeuing了。

最后完成一个延迟队列

延迟队列的实现也比较简单,主要通过死信完成,消息从死信队列中取消费即可。

同样先定义一些基础元素:

代码语言:javascript
复制
@Bean
    public Queue noneQueue(){
        Map<String,Object> args = new HashMap<String,Object>();
        args.put("x-dead-letter-exchange", EXCHANGE_DLX_DELAY);//死信交换机
        args.put("x-dead-letter-routing-key", ROUTING_DLX_KEY_DELAY);//死信routingKey
//        args.put("x-expires", QUEUE_TTL);//ms 队列过期时间
        args.put("x-message-ttl", 10000);//消息过期时间
        return new Queue(QUEUE_NONE, true, false, false, args);
    }

这个是正常的队列,但是我们不会消费它,但是会将其与业务交换机绑定,这样消息发送过来,但没有消费者,超时就进入死信了。

代码语言:javascript
复制
@Bean
public Queue delayQueue(){
    Map<String,Object> args = new HashMap<String,Object>();
    return new Queue(QUEUE_DELAY, true, false, false, args);
}

定义延迟队列,消费者从这里消费。

代码语言:javascript
复制
@Bean
public DirectExchange dealyExchange(){
    return new DirectExchange(EXCHANGE_DLX_DELAY,true,false,null);
}

延迟队列关联的交换机。

代码语言:javascript
复制
@Bean
public Binding delayBinding(){
    return BindingBuilder.bind(delayQueue()).to(dealyExchange()).with(ROUTING_DLX_KEY_DELAY);
}

上面noneQueue中定义了死信交换机与路由键,这里完成其绑定。

我们正常发消息发送到noneQuene上就行了:

代码语言:javascript
复制
rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE,AmqpConfiguration.ROUTING_KEY_NONE,message);

延迟队列的时间在noneQueue中定义了:

代码语言:javascript
复制
args.put("x-expires", QUEUE_TTL);//ms 队列过期时间
args.put("x-message-ttl", MESSAGE_TTL);//消息过期时间

主要流程:

  1. 定义业务队列、交换机、绑定队列到交换机
  2. 定义队列时,设置TTL、DLX
  3. 定义死信队列,绑定死信队列与交换机
  4. 发送消息
  5. 接收消息
  6. 消息是否超时,且设置死信,进入死信交换机
  7. 消费消息nack,且没有requeue,进入死信

代码:https://github.com/suspring/springboot-rabbitmq.git

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档