RabbitAdmin类可以很好的操作RabbitMQ,在spring中直接进行注入即可
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartUp(true);
return rabbitAdmin;
}
autoStartUp必须设置为true
,否则Spring容器不会加载RabbitAdmin类实例:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
package com.pyy.spring;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan({"com.pyy.spring.*"})
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.43.113:5672");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
@Test
public void testAdmin() {
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>()));
rabbitAdmin.declareBinding(BindingBuilder.bind(
new Queue("test.topic.queue1", false)) // 直接创建队列
.to(new TopicExchange("test.topic", false, false))// 直接创建交换机建立关系
.with("user.#"));// 直接指定路由键
rabbitAdmin.declareBinding(BindingBuilder.bind(
new Queue("test.topic.queue1", false)) // 直接创建队列
.to(new FanoutExchange("test.topic", false, false)));// 直接创建交换机建立关系
}
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false,null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingkey);
@Bean
public TopicExchange exchange() {
return new TopicExchange("topicExchange", true, false);
}
@Bean
public Queue queue() {
return new Queue("queue", true);
}
@Bean
public Binding binding() {
return new BindingBuilder.bind(queue()).to(exchange()).with("spring.*");
}
可靠性消息投递方法
、回调监听消息接口ConfirmCallback
、返回值确认接口ReturnCallback
等等。同样我们需要进行注入到Spring容器中,然后直接使用。配置注入:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//rabbitTemplate.setConfirmCallback(null);
// rabbitTemplate.setReturnCallback(null);
return rabbitTemplate;
}
编写测试类:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
// 1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述:。。。");
messageProperties.getHeaders().put("type", "自定义消息类型");
Message message = new Message("hello Rabbitmq".getBytes(), messageProperties);
// 2 发送消息
rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("------添加额外设置--------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的描述");
return message;
}
});
}
@Test
public void testSendMessage2() {
// 1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("消息1234".getBytes(), messageProperties);
// 2 发送消息
rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", message);
rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", "hello object message send");
rabbitTemplate.convertAndSend("test.topic.exchange", "user.abc", "12234");
rabbitTemplate.send("test.topic.exchange", "user.#", message);
}
简单消息监听容器
注意:
SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态修改其消费者数量大小、接收消息的模式等。
很多基于RabbitMQ的自定制的后端管控台在进行动态设置的时候,也是根据这一特性实现的。所有可以看出SpringAMQP非常强大。
@Bean
public Queue queue001() {
Queue queue = new Queue("queue001", true);
return queue;
}
@Bean
public Queue queue002() {
Queue queue = new Queue("queue002", true);
return queue;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002());
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println("---消费者---" + msg);
}
});
return container;
}
思考问题:
SimpleMessageListenerContainer为什么可以动态感知配置变更?
MessageListenerAdapter 即消息监听适配器
MessageDelegate:
package com.pyy.spring;
public class MessageDelegate {
/**
* 方法名称固定、参数类型固定
* @param messageBody
*/
public void hadnleMessage(byte[] messageBody) {
System.out.println("默认方法,消息内容:" + new String(messageBody));
}
public void consumeMessage(byte[] messageBody) {
System.out.println("字节数组方法,消息内容:" + new String(messageBody));
}
public void consumeMessage(String messageBody) {
System.out.println("字符串方法,消息内容:" + new String(messageBody));
}
}
TextConverter:
package com.pyy.spring;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
RabbitConfig:
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002());
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// container.setMessageListener(new ChannelAwareMessageListener() {
// @Override
// public void onMessage(Message message, Channel channel) throws Exception {
// String msg = new String(message.getBody());
// System.out.println("---消费者---" + msg);
// }
// });
// 适配器方式,默认方法名称:handleMessage
// 可以自定义方法名称:
// 也可以添加一个转换器:从字节数组转换为String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");// 设置默认监听方法名称
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
return container;
}
defaultListenerMethod
默认监听方法名称:用于设置监听方法名称Delegate
委托对象:实际真是的委托对象,用于处理消息MessageConverter
,一般来讲都需要实现这个接口
重写下面两个方法:
toMessage
: java对象转换为Message
formMessage
: Message对象转换为java对象Jackson2JsonMessageConverter
:可以进行java对象的转换功能DefaultJackson2JavaTypeMapper
映射器:可以进行java对象的映射关系