前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >给注册用户发红包,RabbitMQ实现(分布式事务2)

给注册用户发红包,RabbitMQ实现(分布式事务2)

作者头像
算法之名
发布2019-08-21 17:14:15
7090
发布2019-08-21 17:14:15
举报
文章被收录于专栏:算法之名算法之名

沿用昨天的代码,先定义交换机名称和routing key名称

死信队列

DLX,Dead Letter Exchange 的缩写,又死信邮箱、死信交换机。DLX就是一个普通的交换机,和一般的交换机没有任何区别。 当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,rabbitmq会自动发送)。

什么是死信呢?什么样的消息会变成死信呢?

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)
代码语言:javascript
复制
public interface UserCenterMq {

   /**
    * 用户系统exchange名
    */
   String MQ_EXCHANGE_USER = "user.topic.exchange";

   /**
    * 发送红包routing key
    */
   String ROUTING_KEY_POST_REDPACKET = "post.redpacket";
   //String ROUTING_KEY_POST_REDPACKET = "post.#";
   /**
    * 死信队列:
    */
   String deadRoutingKey = "dead_routing_key";
   String deadExchangeName = "dead_exchange";
}

资源文件配置

代码语言:javascript
复制
spring:
  rabbitmq:
      host: 192.168.5.182
      port: 5672
      username: admin
      password: admin
      virtual-host: /
      publisher-confirms: true
      publisher-returns: true
      listener:
        simple:
          acknowledge-mode: manual

写RabbitMQ的配置文件

代码语言:javascript
复制
@Configuration
public class RabbitmqConfig {

   /**
    * 红包队列名
    */
   public static final String RED_PACKET_QUEUE = "red.packet.queue";
   /**
    * 死信队列:
    */
   public final static String deadQueueName = "dead_queue";
   /**
    * 声明队列,此队列用来接收用户注册的消息
    * 
    * @return
    */
   @Bean
   public Queue redPacketQueue() {
      Queue queue = new Queue(RED_PACKET_QUEUE);
      return queue;
   }
   /**
    * 死信队列:
    */

   @Bean
   public Queue deadQueue() {
      Queue queue = new Queue(deadQueueName, true);
      return queue;
   }

   @Bean
   public DirectExchange deadExchange() {
      return new DirectExchange(deadExchangeName);
   }

   @Bean
   public Binding bindingDeadExchange() {
      return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(deadRoutingKey);
   }

   @Bean
   public Binding bindingRedPacketDeadExchange() {
      return BindingBuilder.bind(redPacketQueue()).to(deadExchange()).with(deadRoutingKey);
   }

   @Bean
   public TopicExchange userTopicExchange() {
      return new TopicExchange(UserCenterMq.MQ_EXCHANGE_USER);
   }

   /**
    * 将红包队列和用户的exchange做个绑定
    * 
    * @return
    */
   @Bean
   public Binding bindingRedPacket() {
      Binding binding = BindingBuilder.bind(redPacketQueue()).to(userTopicExchange())
            .with(UserCenterMq.ROUTING_KEY_POST_REDPACKET);
      return binding;
   }
}

重写消息发送生产者

代码语言:javascript
复制
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange, String routingKey,String context) {
        System.out.println("send content = " + context);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.convertAndSend(exchange, routingKey, context);
    }

    /**
     * 确认后回调:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            System.out.println("send ack fail, cause = " + cause);
        } else {
            System.out.println("send ack success");
        }
    }

    /**
     * 失败后return回调:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }
}

修改事务侦听代码(事务确认完成后发送消息到MQ)

代码语言:javascript
复制
@Component
public class UserTransactionEventListener {
    @Autowired
    private MessageSender messageSender;

    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
    public void beforeCommit(PayloadApplicationEvent<User> event) {
        System.out.println("before commit, id: " + event.getPayload().getId());
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void afterCommit(PayloadApplicationEvent<User> event) {
        System.out.println("after commit, id: " + event.getPayload().getId());
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
    public void afterCompletion(PayloadApplicationEvent<User> event) {
        System.out.println("after completion, id: " + event.getPayload().getId());
        messageSender.send(UserCenterMq.MQ_EXCHANGE_USER,UserCenterMq.ROUTING_KEY_POST_REDPACKET, JSONObject.toJSONString(event.getPayload()));
        //messageSender.send(UserCenterMq.MQ_EXCHANGE_USER,"post.redpacket", JSONObject.toJSONString(event.getPayload()));
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void afterRollback(PayloadApplicationEvent<User> event) {
        System.out.println("after rollback, id: " + event.getPayload().getId());
    }
}

在RabbitMQ的管理界面内可以看到

已经发送了一个消息到该队列,即这个user对象.

在红包模块中,我们来监听这个消息队列完成分布式事务.

model

代码语言:javascript
复制
@Data
public class RedPacket implements Serializable {
    private long redPacketId;
    private long userId;
    private Double redPacketAmount;
}

dao(此处有一个red_packet表,3个字段,1个自增)

代码语言:javascript
复制
@Mapper
public interface RedPacketDao {
    @Options(useGeneratedKeys = true, keyProperty = "red_packet_id")
    @Insert("insert into red_packet (user_id,red_packet_amount) values (#{userId},#{redPacketAmount})")
    void add(RedPacket redPacket);
}

service

代码语言:javascript
复制
public interface RedPacketService {
    public void add(RedPacket redPacket);
}
代码语言:javascript
复制
@Transactional
@Service
public class RedPacketServiceImpl implements RedPacketService {
    @Autowired
    private RedPacketDao redPacketDao;
    @Override
    public void add(RedPacket redPacket) {
        redPacketDao.add(redPacket);
    }
}

RabbitMQ消费者(此处为一注册用户就发一个十块以内的随机红包)

代码语言:javascript
复制
@Component
@RabbitListener(queues = RabbitmqConfig.RED_PACKET_QUEUE)
public class PostRedPacketConsumer {
    @Autowired
    private RedPacketService redPacketService;

    @RabbitHandler
    public void postRedPacket(String userStr, Channel channel, Message message) throws IOException {
        try {
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            User user = JSONObject.parseObject(userStr,User.class);
            RedPacket redPacket = new RedPacket();
            redPacket.setUserId(user.getId());
            redPacket.setRedPacketAmount((double)(Math.random() * 10));
            redPacketService.add(redPacket);
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            System.out.println("receiver fail");
        }
    }
}

运行后,该队列被消费掉

红包表增加数据

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 死信队列
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档