沿用昨天的代码,先定义交换机名称和routing key名称
DLX,Dead Letter Exchange 的缩写,又死信邮箱、死信交换机。DLX就是一个普通的交换机,和一般的交换机没有任何区别。 当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,rabbitmq会自动发送)。
什么是死信呢?什么样的消息会变成死信呢?
public interface UserCenterMq {
/**
* 用户系统exchange名
*/
String MQ_EXCHANGE_USER = "user.topic.exchange";
/**
* 发送红包routing key
*/
String ROUTING_KEY_POST_REDPACKET = "post.redpacket";
//String ROUTING_KEY_POST_REDPACKET = "post.#";
/**
* 死信队列:
*/
String deadRoutingKey = "dead_routing_key";
String deadExchangeName = "dead_exchange";
}
资源文件配置
spring:
rabbitmq:
host: 192.168.5.182
port: 5672
username: admin
password: admin
virtual-host: /
publisher-confirms: true
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
写RabbitMQ的配置文件
@Configuration
public class RabbitmqConfig {
/**
* 红包队列名
*/
public static final String RED_PACKET_QUEUE = "red.packet.queue";
/**
* 死信队列:
*/
public final static String deadQueueName = "dead_queue";
/**
* 声明队列,此队列用来接收用户注册的消息
*
* @return
*/
@Bean
public Queue redPacketQueue() {
Queue queue = new Queue(RED_PACKET_QUEUE);
return queue;
}
/**
* 死信队列:
*/
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}
@Bean
public Binding bindingDeadExchange() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(deadRoutingKey);
}
@Bean
public Binding bindingRedPacketDeadExchange() {
return BindingBuilder.bind(redPacketQueue()).to(deadExchange()).with(deadRoutingKey);
}
@Bean
public TopicExchange userTopicExchange() {
return new TopicExchange(UserCenterMq.MQ_EXCHANGE_USER);
}
/**
* 将红包队列和用户的exchange做个绑定
*
* @return
*/
@Bean
public Binding bindingRedPacket() {
Binding binding = BindingBuilder.bind(redPacketQueue()).to(userTopicExchange())
.with(UserCenterMq.ROUTING_KEY_POST_REDPACKET);
return binding;
}
}
重写消息发送生产者
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String exchange, String routingKey,String context) {
System.out.println("send content = " + context);
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
this.rabbitTemplate.convertAndSend(exchange, routingKey, context);
}
/**
* 确认后回调:
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
System.out.println("send ack fail, cause = " + cause);
} else {
System.out.println("send ack success");
}
}
/**
* 失败后return回调:
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
}
}
修改事务侦听代码(事务确认完成后发送消息到MQ)
@Component
public class UserTransactionEventListener {
@Autowired
private MessageSender messageSender;
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void beforeCommit(PayloadApplicationEvent<User> event) {
System.out.println("before commit, id: " + event.getPayload().getId());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterCommit(PayloadApplicationEvent<User> event) {
System.out.println("after commit, id: " + event.getPayload().getId());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void afterCompletion(PayloadApplicationEvent<User> event) {
System.out.println("after completion, id: " + event.getPayload().getId());
messageSender.send(UserCenterMq.MQ_EXCHANGE_USER,UserCenterMq.ROUTING_KEY_POST_REDPACKET, JSONObject.toJSONString(event.getPayload()));
//messageSender.send(UserCenterMq.MQ_EXCHANGE_USER,"post.redpacket", JSONObject.toJSONString(event.getPayload()));
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void afterRollback(PayloadApplicationEvent<User> event) {
System.out.println("after rollback, id: " + event.getPayload().getId());
}
}
在RabbitMQ的管理界面内可以看到
已经发送了一个消息到该队列,即这个user对象.
在红包模块中,我们来监听这个消息队列完成分布式事务.
model
@Data
public class RedPacket implements Serializable {
private long redPacketId;
private long userId;
private Double redPacketAmount;
}
dao(此处有一个red_packet表,3个字段,1个自增)
@Mapper
public interface RedPacketDao {
@Options(useGeneratedKeys = true, keyProperty = "red_packet_id")
@Insert("insert into red_packet (user_id,red_packet_amount) values (#{userId},#{redPacketAmount})")
void add(RedPacket redPacket);
}
service
public interface RedPacketService {
public void add(RedPacket redPacket);
}
@Transactional
@Service
public class RedPacketServiceImpl implements RedPacketService {
@Autowired
private RedPacketDao redPacketDao;
@Override
public void add(RedPacket redPacket) {
redPacketDao.add(redPacket);
}
}
RabbitMQ消费者(此处为一注册用户就发一个十块以内的随机红包)
@Component
@RabbitListener(queues = RabbitmqConfig.RED_PACKET_QUEUE)
public class PostRedPacketConsumer {
@Autowired
private RedPacketService redPacketService;
@RabbitHandler
public void postRedPacket(String userStr, Channel channel, Message message) throws IOException {
try {
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
User user = JSONObject.parseObject(userStr,User.class);
RedPacket redPacket = new RedPacket();
redPacket.setUserId(user.getId());
redPacket.setRedPacketAmount((double)(Math.random() * 10));
redPacketService.add(redPacket);
} catch (IOException e) {
e.printStackTrace();
//丢弃这条消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
System.out.println("receiver fail");
}
}
}
运行后,该队列被消费掉
红包表增加数据