之前学习了rabbitmq,对其基本的用法有了一定的认识,但对其深层次的使用必须依赖于具体的业务,就像编程语言一样,提供的基础的使用规范,但是却产生了那么多高性能高可用的框架,都是依据最基础的功能,在与思想的磨合下而产生,同样,如何使用好mq技术,并能实现各种复杂的业务,那么就必须在某种思维的模式或业务领域的促使下才能完成。
本次使用shpringboot与rabbitmq整合来实现基础的功能,大致了解下基本的用法,但是对比较具体的细节方面不可能做到面面俱到。
首先通过idea构建一个maven的项目,这个没什么多说的,添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
和之前的一样,消息通信主要需要连接、信道、交换机、队列,通过之前对rabbitmq的学习,我们知道,向交换机和队列其实只用申明一次,多次的申明也只是复用,那么我们可以使用spring中的bean来定义,并交由spring容器管理,而连接这些都是spring内部管理了,最终以RabbitTemplate暴露出来给我们使用,具体的连接过程是不用开发者去关注了。
首先定义一个类AmqpConfiguration,我们将会定义一些交换机、队列:
@Bean
public Queue maintainQueue(){
return new Queue(QUEUE, true, false, false, null);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(EXCHANGE,true,false,null);
}
@Bean
public Binding maintainBinding(){
return BindingBuilder.bind(maintainQueue()).to(topicExchange()).with(BINDING_KEY);
}
然后发送消息到刚定义的交换机上,添加类MessageSender,专门用于发送消息:
public void send(String message){
rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE,AmqpConfiguration.ROUTING_KEY,message);
}
其实和在rabbitmq中用法一样,参数就是交换机名称、路由键、消息。
那么接收消息呢?以前的做法是申明队列、交换机、然后通过bindingkey维护队列与交换机的关系,让发送消息时能够转发到对应的队列上,我们定义一个MessageReceiver,里面接收消息如下:
@RabbitListener(queues = AmqpConfiguration.QUEUE)
@RabbitHandler
public void receive(Channel channel, Message message){
String messsageText = new String(message.getBody());
try {
if(!validate(messsageText)){
log.info("接收消息:{}",messsageText);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
log.info("拒绝消息:{}",messsageText);
//ack返回false,并重新回到队列,可以批量拒绝 multiple=true
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
//拒绝消息,只能拒绝一条
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (Exception e) {
e.printStackTrace();
}
}
我们在@RabbitListener(queues = AmqpConfiguration.QUEUE)中指明从哪个队列里拿消息就行,其实参数可以通过一些注解来自动注入,比如
当然我们可以这样定义队列、交换机等
z@RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.MULTIPART_HANDLE_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.MULTIPART_HANDLE_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = RabbitMQConstant.MULTIPART_HANDLE_KEY ) )
这样就完成了消息的发送与接收了。
再看看另一个知识,死信:
首先我们看下消息变成死信的条件:
首先定义死信相关元素:
@Bean
public Queue dlxQueue(){
return new Queue(QUEUE_DLX,false,false,false,null);
}
@Bean
public Binding dlxBindging(){
return BindingBuilder.bind(dlxQueue()).to(directExchange()).with(ROUTING_DLX_KEY);
}
现在从两个方面来将消息转换死信:
@Bean
public Queue maintainQueue(){
Map<String,Object> args = new HashMap<String,Object>();
args.put("x-dead-letter-exchange", EXCHANGE_DLX);//死信交换机
args.put("x-dead-letter-routing-key", ROUTING_DLX_KEY);//死信routingKey
args.put("x-expires", QUEUE_TTL);//ms 队列过期时间
args.put("x-message-ttl", MESSAGE_TTL);//消息过期时间
return new Queue(QUEUE, true, false, false, args);
}
通过消息过期来完成,当然队列过期也一样,取最小值。
或者在接收方通过拒绝消息来实现:
if(!validate(messsageText)){
log.info("接收消息:{}",messsageText);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
log.info("拒绝消息:{}",messsageText);
//ack返回false,并重新回到队列,可以批量拒绝 multiple=true
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
//拒绝消息,只能拒绝一条
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
对特定的消息进行拒绝,channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false)第三参数要为false,不然就requeuing了。
最后完成一个延迟队列:
延迟队列的实现也比较简单,主要通过死信完成,消息从死信队列中取消费即可。
同样先定义一些基础元素:
@Bean
public Queue noneQueue(){
Map<String,Object> args = new HashMap<String,Object>();
args.put("x-dead-letter-exchange", EXCHANGE_DLX_DELAY);//死信交换机
args.put("x-dead-letter-routing-key", ROUTING_DLX_KEY_DELAY);//死信routingKey
// args.put("x-expires", QUEUE_TTL);//ms 队列过期时间
args.put("x-message-ttl", 10000);//消息过期时间
return new Queue(QUEUE_NONE, true, false, false, args);
}
这个是正常的队列,但是我们不会消费它,但是会将其与业务交换机绑定,这样消息发送过来,但没有消费者,超时就进入死信了。
@Bean
public Queue delayQueue(){
Map<String,Object> args = new HashMap<String,Object>();
return new Queue(QUEUE_DELAY, true, false, false, args);
}
定义延迟队列,消费者从这里消费。
@Bean
public DirectExchange dealyExchange(){
return new DirectExchange(EXCHANGE_DLX_DELAY,true,false,null);
}
延迟队列关联的交换机。
@Bean
public Binding delayBinding(){
return BindingBuilder.bind(delayQueue()).to(dealyExchange()).with(ROUTING_DLX_KEY_DELAY);
}
上面noneQueue中定义了死信交换机与路由键,这里完成其绑定。
我们正常发消息发送到noneQuene上就行了:
rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE,AmqpConfiguration.ROUTING_KEY_NONE,message);
延迟队列的时间在noneQueue中定义了:
args.put("x-expires", QUEUE_TTL);//ms 队列过期时间
args.put("x-message-ttl", MESSAGE_TTL);//消息过期时间
主要流程: