springboot集成rabbitmq(实战)

RabbitMQ简介 RabbitMQ使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现(AMQP的主要特征是面向消息、队列、路由、可靠性、安全)。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现很出色。

相关概念

消息队列通常有三个概念:发送消息(生产者)、队列、接收消息(消费者)。RabbitMQ在这个基本概念之上,多做了一层抽象,在发送消息和队列之间,加入了交换机。这样发送消息和队列就没有直接关系,而是通过交换机来做转发,交换机会根据分发策略把消息转给队列。

图一(MQ基本模型):

P为发送消息(生产者)、Q为消息队列、C为接收消息(消费者)

图二(RabbitMQ模型):

P为发送消息(生产者)、X为交换机、Q为消息队列、C为接收消息(消费者)

                                              图一                             图二

RabbitMQ比较重要的几个概念:

虚拟主机:RabbitMQ支持权限控制,但是最小控制粒度为虚拟主机。一个虚拟主机可以包含多个交换机、队列、绑定。

交换机:RabbitMQ分发器,根据不同的策略将消息分发到相关的队列。

队列:缓存消息的容器。

绑定:设置交换机与队列的关系。

是时候表演真正的技术了

为了方便演示,我们分别创建两个springboot项目:

spring-boot-rabbitmq-producer(生产者)

spring-boot-rabbitmq-consumer(消费者)

注意:实际项目中,一个系统可能即为生产者、又为消费者。

1.添加基础配置

生产者、消费者基础配置相同。

1.1)集成rabbitmq,添加maven依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

 1.2)添加rabbitmq服务配置(application.properties)

#rabbitmq相关配置
spring.rabbitmq.host=192.168.15.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

2.交换机——DirectExchange

DirectExchange是RabbitMQ的默认交换机,直接使用routingKey匹配队列。

2.1)添加一个配置类(消费者)

配置一个routingKey为notify.payment的消息队列

@Configuration
public class DirectConfig {
	@Bean
	public Queue paymentNotifyQueue() {
		return new Queue("notify.payment");
	}
}

2.2)添加一个消息监听类(消费者)

监听routingKey为notify.payment的队列消息

@Component
@RabbitListener(queues = "notify.payment")
public class PaymentNotifyReceive {
	@RabbitHandler
	public void receive(String msg) {
		LogUtil.info("notify.payment receive message: "+msg);
	}
}

2.3)添加一个消息发送类(生产者)

将消息发送至默认的交换机且routingKey为notify.payment

@Component
public class PaymentNotifySender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void sender(String msg){
		LogUtil.info("notify.payment send message: "+msg);
		rabbitTemplate.convertAndSend("notify.payment", msg);
	}
}

2.4)添加一个测试类(生产者)

@RunWith(SpringRunner.class)
@SpringBootTest
public class PaymentNotifySenderTests {
	@Autowired
	private PaymentNotifySender sender;
	
	@Test
	public void test_sender() {
		sender.sender("支付订单号:"+System.currentTimeMillis());
	}
}

2.5)执行test_sender()方法

生产者日志:

2018-05-14 16:28:53.264  INFO 10624 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 10624 (started by lianjinsoft...
2018-05-14 16:28:53.265  INFO 10624 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
2018-05-14 16:28:53.305  INFO 10624 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@...
2018-05-14 16:28:54.133  INFO 10624 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:28:55.104  INFO 10624 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:28:55.114  INFO 10624 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.246 seconds (JVM running for 3.199)
2018-05-14 16:28:55.343  INFO 10624 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:1526286535342
2018-05-14 16:28:55.383  INFO 10624 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:28:55.444  INFO 10624 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#38be305c:0/SimpleConnection@71984c3 ...
2018-05-14 16:28:55.483  INFO 10624 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
2018-05-14 16:28:55.485  INFO 10624 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消费者日志:

2018-05-14 16:28:55.490  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:1526286535342

分析日志:

从生产者日志第7行可以看出,消息已经成功发送。

从消费者日志可以看出,消息已经成功接收。

3.交换机——TopicExchange

TopicExchange是按规则转发消息,是交换机中最灵活的一个。也是最常用的一个。

3.1)添加一个配置类(消费者)

配置一个routingKey为api.core的消息队列并绑定在coreExchange交换机上(交换机的匹配规则为api.core.*)

配置一个routingKey为api.payment的消息队列并绑定在paymentExchange交换机上(交换机的匹配规则为api.payment.#)

@Configuration
public class TopicConfig {
	@Bean
	public Queue coreQueue() {
		return new Queue("api.core");
	}
	
	@Bean
	public Queue paymentQueue() {
		return new Queue("api.payment");
	}
	
	@Bean
	public TopicExchange coreExchange() {
		return new TopicExchange("coreExchange");
	}
	
	@Bean
	public TopicExchange paymentExchange() {
		return new TopicExchange("paymentExchange");
	}
	
	@Bean
	public Binding bindingCoreExchange(Queue coreQueue, TopicExchange coreExchange) {
		return BindingBuilder.bind(coreQueue).to(coreExchange).with("api.core.*");
	}
	
	@Bean
	public Binding bindingPaymentExchange(Queue paymentQueue, TopicExchange paymentExchange) {
		return BindingBuilder.bind(paymentQueue).to(paymentExchange).with("api.payment.#");
	}
}

3.2)添加两个消息监听类(消费者)

监听routingKey为api.core的队列消息

@Component
public class ApiCoreReceive {
	@RabbitHandler
	@RabbitListener(queues = "api.core")
	public void user(String msg) {
		LogUtil.info("api.core receive message: "+msg);
	}
}

监听routingKey为api.payment的队列消息

@Component
public class ApiPaymentReceive {
	@RabbitHandler
	@RabbitListener(queues = "api.payment")
	public void order(String msg) {
		LogUtil.info("api.payment.order receive message: "+msg);
	}
}

3.3)添加两个消息发送类(生产者)

添加一个user()方法,发送消息至coreExchange交换机且routingKey为api.core.user

添加一个userQuery()方法,发送消息至coreExchange交换机且routingKey为api.core.user.query

@Component
public class ApiCoreSender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void user(String msg){
		LogUtil.info("api.core.user send message: "+msg);
		rabbitTemplate.convertAndSend("coreExchange", "api.core.user", msg);
	}
	
	public void userQuery(String msg){
		LogUtil.info("api.core.user.query send message: "+msg);
		rabbitTemplate.convertAndSend("coreExchange", "api.core.user.query", msg);
	}
}

添加一个order()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order

添加一个orderQuery()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order.query

添加一个orderDetailQuery()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order.detail.query

@Component
public class ApiPaymentSender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void order(String msg){
		LogUtil.info("api.payment.order send message: "+msg);
		rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order", msg);
	}
	
	public void orderQuery(String msg){
		LogUtil.info("api.payment.order.query send message: "+msg);
		rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.query", msg);
	}
	
	public void orderDetailQuery(String msg){
		LogUtil.info("api.payment.order.detail.query send message: "+msg);
		rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.detail.query", msg);
	}
}

3.4)添加两个测试类(生产者)

测试ApiCoreSender类中的相关方法

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiCoreSenderTests {
	@Autowired
	private ApiCoreSender sender;
	
	@Test
	public void test_user() {
		sender.user("用户管理!");
	}
	
	@Test
	public void test_userQuery() {
		sender.userQuery("查询用户信息!");
	}
}

测试ApiPaymentSender类中的相关方法

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiPaymentSenderTests {
	@Autowired
	private ApiPaymentSender sender;
	
	@Test
	public void test_order() {
		sender.order("订单管理!");
	}
	
	@Test
	public void test_orderQuery() {
		sender.orderQuery("查询订单信息!");
	}
	
	@Test
	public void test_orderDetailQuery() {
		sender.orderDetailQuery("查询订单详情信息!");
	}
}

3.5)验证

3.5.1)执行ApiCoreSenderTests测试类

生产者日志:

2018-05-14 16:30:05.804  INFO 7340 --- [           main] c.lianjinsoft.sender.ApiCoreSenderTests  : Starting ApiCoreSenderTests on LAPTOP-1DF7S904 with PID 7340 (started by lianjinsoft in ...
2018-05-14 16:30:05.805  INFO 7340 --- [           main] c.lianjinsoft.sender.ApiCoreSenderTests  : No active profile set, falling back to default profiles: default
2018-05-14 16:30:05.851  INFO 7340 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:30:06.553  INFO 7340 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:30:07.375  INFO 7340 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:30:07.385  INFO 7340 --- [           main] c.lianjinsoft.sender.ApiCoreSenderTests  : Started ApiCoreSenderTests in 1.922 seconds (JVM running for 2.846)
2018-05-14 16:30:07.431  INFO 7340 --- [           main] com.lianjinsoft.util.LogUtil             : api.core.user send message: 用户管理!
2018-05-14 16:30:07.463  INFO 7340 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:30:07.578  INFO 7340 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f...
2018-05-14 16:30:07.647  INFO 7340 --- [           main] com.lianjinsoft.util.LogUtil             : api.core.user.query send message: 查询用户信息!
2018-05-14 16:30:07.716  INFO 7340 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:30:07.728  INFO 7340 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消费者日志:

2018-05-14 16:30:07.609  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.core receive message: 用户管理!

分析日志:

从生产者日志第7、10行可以看出,api.core.user和api.core.user.query消息均已发送成功。

从消费者日志可以看出,只有api.core.user发送的消息被收到了。

问题:

为什么api.core.user.query发送的消息没有被api.core队列监听消费?

答:因为在TopicConfig配置类中,我们对api.core队列绑定的交换机规则是api.core.*,而通配符“*”只能向后多匹配一层路径。

3.5.2)执行ApiPaymentSenderTests测试类

生产者日志:

2018-05-14 16:31:12.823  INFO 6460 --- [           main] c.l.sender.ApiPaymentSenderTests         : Starting ApiPaymentSenderTests on LAPTOP-1DF7S904 with PID 6460 (started by lianjinsoft in ...
2018-05-14 16:31:12.823  INFO 6460 --- [           main] c.l.sender.ApiPaymentSenderTests         : No active profile set, falling back to default profiles: default
2018-05-14 16:31:12.857  INFO 6460 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:31:13.718  INFO 6460 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:31:14.530  INFO 6460 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:31:14.539  INFO 6460 --- [           main] c.l.sender.ApiPaymentSenderTests         : Started ApiPaymentSenderTests in 2.05 seconds (JVM running for 2.945)
2018-05-14 16:31:14.592  INFO 6460 --- [           main] com.lianjinsoft.util.LogUtil             : api.payment.order.query send message: 查询订单信息!
2018-05-14 16:31:14.638  INFO 6460 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:31:14.762  INFO 6460 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#47404bea:0/SimpleConnection@6b54655f ...
2018-05-14 16:31:14.819  INFO 6460 --- [           main] com.lianjinsoft.util.LogUtil             : api.payment.order.detail.query send message: 查询订单详情信息!
2018-05-14 16:31:14.825  INFO 6460 --- [           main] com.lianjinsoft.util.LogUtil             : api.payment.order send message: 订单管理!
2018-05-14 16:31:14.836  INFO 6460 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:31:14.840  INFO 6460 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消费者日志:

2018-05-14 16:31:14.809  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.payment.order receive message: 查询订单信息!
2018-05-14 16:31:14.821  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.payment.order receive message: 查询订单详情信息!
2018-05-14 16:31:14.829  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.payment.order receive message: 订单管理!

分析日志:

从生产者日志第7、10、11行可以看出,api.payment.order.query、api.payment.order.detail.query、api.payment.order消息均发送成功。

从消费者日志可以看出,api.payment.order队列监听到了所有消息并均处理成功了。

知识点:

TopicExchange交换机支持使用通配符*、#

*号只能向后多匹配一层路径。

#号可以向后匹配多层路径。

4.交换机——HeadersExchange

HeadersExchange交换机是根据请求消息中设置的header attribute参数类型来匹配的(和routingKey没有关系)。

4.1)添加一个配置类(消费者)

配置一个routingKey为credit.bank的消息队列并绑定在creditBankExchange交换机上

配置一个routingKey为credit.finance的消息队列并绑定在creditFinanceExchange交换机上

@Configuration
public class HeadersConfig {
	@Bean
	public Queue creditBankQueue() {
		return new Queue("credit.bank");
	}
	
	@Bean
	public Queue creditFinanceQueue() {
		return new Queue("credit.finance");
	}
	
	@Bean
	public HeadersExchange creditBankExchange() {
		 return new HeadersExchange("creditBankExchange");
	}
	
	@Bean
	public HeadersExchange creditFinanceExchange() {
		 return new HeadersExchange("creditFinanceExchange");
	}
	
	@Bean
	public Binding bindingCreditAExchange(Queue creditBankQueue, HeadersExchange creditBankExchange) {
		Map<String,Object> headerValues = new HashMap<>();
		headerValues.put("type", "cash");
		headerValues.put("aging", "fast");
		return BindingBuilder.bind(creditBankQueue).to(creditBankExchange).whereAll(headerValues).match();
	}
	
	@Bean
	public Binding bindingCreditBExchange(Queue creditFinanceQueue, HeadersExchange creditFinanceExchange) {
		Map<String,Object> headerValues = new HashMap<>();
		headerValues.put("type", "cash");
		headerValues.put("aging", "fast");
		return BindingBuilder.bind(creditFinanceQueue).to(creditFinanceExchange).whereAny(headerValues).match();
	}
}

4.2)添加一个消息监听类(消费者)

添加creditBank()方法,监听routingKey为credit.bank的队列消息

添加creditFinance()方法,监听routingKey为credit.finance的队列消息

@Component
public class ApiCreditReceive {
	@RabbitHandler
	@RabbitListener(queues = "credit.bank")
	public void creditBank(String msg) {
		LogUtil.info("credit.bank receive message: "+msg);
	}
	
	@RabbitHandler
	@RabbitListener(queues = "credit.finance")
	public void creditFinance(String msg) {
		LogUtil.info("credit.finance receive message: "+msg);
	}
}

4.3)添加一个消息发送类(生产者)

添加一个creditBank()方法,发送消息至creditBankExchange交换机且routingKey为credit.bank

添加一个creditFinance()方法,发送消息至creditFinanceExchange交换机且routingKey为credit.finance

@Component
public class ApiCreditSender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void creditBank(Map<String, Object> head, String msg){
		LogUtil.info("credit.bank send message: "+msg);
		rabbitTemplate.convertAndSend("creditBankExchange", "credit.bank", getMessage(head, msg));
	}
	
	public void creditFinance(Map<String, Object> head, String msg){
		LogUtil.info("credit.finance send message: "+msg);
		rabbitTemplate.convertAndSend("creditFinanceExchange", "credit.finance", getMessage(head, msg));
	}
}

4.4)添加一个测试类(生产者)

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiCreditSenderTests {
	@Autowired
	private ApiCreditSender sender;
	
	@Test
	public void test_creditBank_type() {
		Map<String,Object> head = new HashMap<>();
		head.put("type", "cash");
		sender.creditBank(head, "银行授信(部分匹配)");
	}
	
	@Test
	public void test_creditBank_all() {
		Map<String,Object> head = new HashMap<>();
		head.put("type", "cash");
		head.put("aging", "fast");
		sender.creditBank(head, "银行授信(全部匹配)");
	}
	
	@Test
	public void test_creditFinance_type() {
		Map<String,Object> head = new HashMap<>();
		head.put("type", "cash");
		sender.creditFinance(head, "金融公司授信(部分匹配)");
	}
	
	@Test
	public void test_creditFinance_all() {
		Map<String,Object> head = new HashMap<>();
		head.put("type", "cash");
		head.put("aging", "fast");
		sender.creditFinance(head, "金融公司授信(全部匹配)");
	}
}

4.5)执行ApiCreditSenderTests测试类

生产者日志:

2018-05-14 16:32:18.954  INFO 5204 --- [           main] c.l.sender.ApiCreditSenderTests          : Starting ApiCreditSenderTests on LAPTOP-1DF7S904 with PID 5204 (started by lianjinsoft in...
2018-05-14 16:32:18.964  INFO 5204 --- [           main] c.l.sender.ApiCreditSenderTests          : No active profile set, falling back to default profiles: default
2018-05-14 16:32:19.007  INFO 5204 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:32:19.609  INFO 5204 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:32:20.437  INFO 5204 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:32:20.446  INFO 5204 --- [           main] c.l.sender.ApiCreditSenderTests          : Started ApiCreditSenderTests in 1.839 seconds (JVM running for 2.759)
2018-05-14 16:32:20.566  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.bank send message: 银行授信(部分匹配)
2018-05-14 16:32:20.574  INFO 5204 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:32:20.666  INFO 5204 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f...
2018-05-14 16:32:21.064  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.finance send message: 金融公司授信(全部匹配)
2018-05-14 16:32:21.070  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.bank send message: 银行授信(全部匹配)
2018-05-14 16:32:21.077  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.finance send message: 金融公司授信(部分匹配)
2018-05-14 16:32:21.109  INFO 5204 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:32:21.114  INFO 5204 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消费者日志:

2018-05-14 16:32:21.093  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : credit.finance receive message: 金融公司授信(全部匹配)
2018-05-14 16:32:21.094  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : credit.bank receive message: 银行授信(全部匹配)
2018-05-14 16:32:21.097  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : credit.finance receive message: 金融公司授信(部分匹配)

分析日志:

通过生产者日志第7、10、11、12可以看出,测试的4个方法均已成功发送消息。

通过消费者日志可以看出,credit.bank监听的队列有一条消息没有接收到。

问题:

为什么ApiCreditSenderTests.test_creditBank_type()发送的消息,没有被处理?

答:因为在HeadersConfig配置类中,creditBankExchange交换机的匹配规则是完全匹配,即header attribute参数必须完成一致。

5.交换机——FanoutExchange

FanoutExchange交换机是转发消息到所有绑定队列(广播模式,和routingKey没有关系)。

5.1)添加一个配置类(消费者)

配置一个routingKey为api.report.payment的消息队列并绑定在reportExchange交换机上

配置一个routingKey为api.report.refund的消息队列并绑定在reportExchange交换机上

@Configuration
public class FanoutConfig {
	@Bean
	public Queue reportPaymentQueue() {
		return new Queue("api.report.payment");
	}
	
	@Bean
	public Queue reportRefundQueue() {
		return new Queue("api.report.refund");
	}
	
	@Bean
	public FanoutExchange reportExchange() {
		 return new FanoutExchange("reportExchange");
	}
	
	@Bean
	public Binding bindingReportPaymentExchange(Queue reportPaymentQueue, FanoutExchange reportExchange) {
		return BindingBuilder.bind(reportPaymentQueue).to(reportExchange);
	}
	
	@Bean
	public Binding bindingReportRefundExchange(Queue reportRefundQueue, FanoutExchange reportExchange) {
		return BindingBuilder.bind(reportRefundQueue).to(reportExchange);
	}
}

5.2)添加一个消息监听类(消费者)

添加payment()方法,监听routingKey为api.report.payment的队列消息

添加refund()方法,监听routingKey为api.report.refund的队列消息

@Component
public class ApiReportReceive {
	@RabbitHandler
	@RabbitListener(queues = "api.report.payment")
	public void payment(String msg) {
		LogUtil.info("api.report.payment receive message: "+msg);
	}
	
	@RabbitHandler
	@RabbitListener(queues = "api.report.refund")
	public void refund(String msg) {
		LogUtil.info("api.report.refund receive message: "+msg);
	}
}

5.3)添加一个消息发送类(生产者)

添加一个generateReports()方法,发送消息至reportExchange交换机

@Component
public class ApiReportSender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void generateReports(String msg){
		LogUtil.info("api.generate.reports send message: "+msg);
		rabbitTemplate.convertAndSend("reportExchange", "api.generate.reports", msg);
	}
}

5.4)添加一个测试类(生产者)

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiReportSenderTests {
	@Autowired
	private ApiReportSender sender;
	
	@Test
	public void test_generateReports() {
		sender.generateReports("开始生成报表!");
	}
}

5.5)执行ApiReportSenderTests测试类

生产者日志:

2018-05-14 16:33:41.453  INFO 14356 --- [           main] c.l.sender.ApiReportSenderTests          : Starting ApiReportSenderTests on LAPTOP-1DF7S904 with PID 14356 (started by lianjinsoft in ...
2018-05-14 16:33:41.454  INFO 14356 --- [           main] c.l.sender.ApiReportSenderTests          : No active profile set, falling back to default profiles: default
2018-05-14 16:33:41.490  INFO 14356 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:33:42.094  INFO 14356 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type...
2018-05-14 16:33:42.960  INFO 14356 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:33:42.972  INFO 14356 --- [           main] c.l.sender.ApiReportSenderTests          : Started ApiReportSenderTests in 1.939 seconds (JVM running for 2.843)
2018-05-14 16:33:43.037  INFO 14356 --- [           main] com.lianjinsoft.util.LogUtil             : api.generate.reports send message: 开始生成报表!
2018-05-14 16:33:43.054  INFO 14356 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:33:43.174  INFO 14356 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f...
2018-05-14 16:33:43.237  INFO 14356 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa:...
2018-05-14 16:33:43.240  INFO 14356 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消费者日志:

2018-05-14 16:33:43.205  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.report.payment receive message: 开始生成报表!
2018-05-14 16:33:43.207  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.report.refund receive message: 开始生成报表!

分析日志:

通过生产者日志第7行可以看出,消息已发送成功。

通过消费者日志可以看出,api.report.payment和api.report.refund队列均收到了同一个消息。

6.多对一

在实际项目中,我们的系统通常会做集群、分布式或灾备部署。那么就会出现一对多、多对一或多对多的场景。

那么咱们本地如何模拟多对一呢?

6.1)为了测试方便,我们复用PaymentNotifyReceive案例,在PaymentNotifySenderTests测试类中,增加测试方法

test_sender_many2one_1:请求参数为偶数

test_sender_many2one_2:请求参数为奇数

@Test
public void test_sender_many2one_1() throws Exception {
	for (int i = 0; i < 20; i+=2) {
		sender.sender("支付订单号:"+i);
		Thread.sleep(1000);
	}
}

@Test
public void test_sender_many2one_2() throws Exception {
	for (int i = 1; i < 20; i+=2) {
		sender.sender("支付订单号:"+i);
		Thread.sleep(1000);
	}
}

6.2)执行test_sender_many2one_1()、test_sender_many2one_2()方法

生产者1日志:

2018-05-14 16:34:49.249  INFO 5064 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 5064 (started by lianjinsoft in...
2018-05-14 16:34:49.250  INFO 5064 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
2018-05-14 16:34:49.297  INFO 5064 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
2018-05-14 16:34:49.989  INFO 5064 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:34:51.267  INFO 5064 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:34:51.293  INFO 5064 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.449 seconds (JVM running for 3.366)
2018-05-14 16:34:51.357  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:0
2018-05-14 16:34:51.370  INFO 5064 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:34:51.817  INFO 5064 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f ...
2018-05-14 16:34:52.866  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:2
2018-05-14 16:34:53.870  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:4
2018-05-14 16:34:54.870  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:6
2018-05-14 16:34:55.871  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:8
2018-05-14 16:34:56.872  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:10
2018-05-14 16:34:57.872  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:12
2018-05-14 16:34:58.873  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:14
2018-05-14 16:34:59.875  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:16
2018-05-14 16:35:00.876  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:18
2018-05-14 16:35:01.882  INFO 5064 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
2018-05-14 16:35:01.883  INFO 5064 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

生产者2日志:

2018-05-14 16:34:52.689  INFO 13988 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 13988 (started by lianjinsoft in...
2018-05-14 16:34:52.690  INFO 13988 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
2018-05-14 16:34:52.738  INFO 13988 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
2018-05-14 16:34:53.444  INFO 13988 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:34:54.567  INFO 13988 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:34:54.575  INFO 13988 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.237 seconds (JVM running for 4.18)
2018-05-14 16:34:54.788  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:1
2018-05-14 16:34:54.796  INFO 13988 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:34:54.870  INFO 13988 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#305f7627:0/SimpleConnection@665e9289...
2018-05-14 16:34:55.903  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:3
2018-05-14 16:34:56.903  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:5
2018-05-14 16:34:57.904  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:7
2018-05-14 16:34:58.904  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:9
2018-05-14 16:34:59.905  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:11
2018-05-14 16:35:00.906  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:13
2018-05-14 16:35:01.907  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:15
2018-05-14 16:35:02.907  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:17
2018-05-14 16:35:03.910  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:19
2018-05-14 16:35:04.928  INFO 13988 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
2018-05-14 16:35:04.932  INFO 13988 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消费者日志:

2018-05-14 16:34:51.853  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:0
2018-05-14 16:34:52.871  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:2
2018-05-14 16:34:53.871  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:4
2018-05-14 16:34:54.871  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:6
2018-05-14 16:34:54.902  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:1
2018-05-14 16:34:55.872  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:8
2018-05-14 16:34:55.904  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:3
2018-05-14 16:34:56.873  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:10
2018-05-14 16:34:56.905  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:5
2018-05-14 16:34:57.873  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:12
2018-05-14 16:34:57.905  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:7
2018-05-14 16:34:58.878  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:14
2018-05-14 16:34:58.906  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:9
2018-05-14 16:34:59.877  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:16
2018-05-14 16:34:59.906  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:11
2018-05-14 16:35:00.877  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:18
2018-05-14 16:35:00.909  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:13
2018-05-14 16:35:01.909  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:15
2018-05-14 16:35:02.911  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:17
2018-05-14 16:35:03.914  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:19

分析日志:

从生产者1、生产者2日志中可以看出,所有消息均已经发送成功。

从消费者日志可以看出,所有消息均被成功接收处理。

7.一对多

如何模拟一对多?

7.1)为了测试方便,我们继续复用PaymentNotifyReceive案例,在PaymentNotifySenderTests测试类中,增加测试方法

test_sender_one2many:循环调用20次。

@Test
public void test_sender_one2many() {
	for (int i = 0; i < 20; i++) {
		sender.sender("支付订单号:"+i);
	}
}

7.2)测试

为了达到一对多的效果,我们需要多启动一个(或多个)消费者。然后执行test_sender_one2many()测试方法。

生产者日志:

2018-05-14 16:36:27.703  INFO 7508 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 7508 (started by lianjinsoft in...
2018-05-14 16:36:27.704  INFO 7508 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
2018-05-14 16:36:27.729  INFO 7508 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
2018-05-14 16:36:28.391  INFO 7508 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type...
2018-05-14 16:36:29.285  INFO 7508 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:36:29.303  INFO 7508 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.197 seconds (JVM running for 3.097)
2018-05-14 16:36:29.504  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:0
2018-05-14 16:36:29.516  INFO 7508 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:36:29.635  INFO 7508 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#4f071df8:0/SimpleConnection@42a9e5d1...
2018-05-14 16:36:29.672  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:1
2018-05-14 16:36:29.679  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:2
2018-05-14 16:36:29.705  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:3
2018-05-14 16:36:29.707  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:4
2018-05-14 16:36:29.710  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:5
2018-05-14 16:36:29.710  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:6
2018-05-14 16:36:29.710  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:7
2018-05-14 16:36:29.713  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:8
2018-05-14 16:36:29.719  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:9
2018-05-14 16:36:29.728  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:10
2018-05-14 16:36:29.729  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:11
2018-05-14 16:36:29.733  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:12
2018-05-14 16:36:29.734  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:13
2018-05-14 16:36:29.734  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:14
2018-05-14 16:36:29.734  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:15
2018-05-14 16:36:29.741  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:16
2018-05-14 16:36:29.742  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:17
2018-05-14 16:36:29.742  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:18
2018-05-14 16:36:29.744  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:19
2018-05-14 16:36:29.762  INFO 7508 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
2018-05-14 16:36:29.767  INFO 7508 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消费者1日志:

2018-05-14 16:36:29.675  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:0
2018-05-14 16:36:29.690  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:2
2018-05-14 16:36:29.712  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:4
2018-05-14 16:36:29.714  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:6
2018-05-14 16:36:29.728  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:9
2018-05-14 16:36:29.733  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:11
2018-05-14 16:36:29.737  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:13
2018-05-14 16:36:29.740  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:15
2018-05-14 16:36:29.743  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:16
2018-05-14 16:36:29.745  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:18

消费者2日志:

2018-05-14 16:36:29.705  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:1
2018-05-14 16:36:29.709  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:3
2018-05-14 16:36:29.712  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:5
2018-05-14 16:36:29.718  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:7
2018-05-14 16:36:29.722  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:8
2018-05-14 16:36:29.731  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:10
2018-05-14 16:36:29.736  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:12
2018-05-14 16:36:29.738  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:14
2018-05-14 16:36:29.744  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:17
2018-05-14 16:36:29.747  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:19

分析日志:

从生产者日志可以看出,所有的消息均已经发送成功。

从消费者1、消费者2日志可以看出,消息被两个消费者均衡消费了。

8.发送对象

实际项目中,请求信息可能包含多个字段。为了保证生产者与消费者两端的字段一致性,通常会传递一个对象。

8.1)为了测试方便,我们在DirectConfig中增加一个消息队列

@Bean
public Queue refundNotifyQueue() {
	return new Queue("notify.refund");
}

8.2)添加一个消息监听类(消费者)

监听routingKey为notify.refund的队列消息

@Component
@RabbitListener(queues = "notify.refund")
public class RefundNotifyReceive {
	@RabbitHandler
	public void receive(Order order) {
		LogUtil.info("notify.refund receive message: "+order);
	}
}

8.3)添加一个消息发送类(生产者)

@Component
public class RefundNotifySender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void sender(Order order){
		LogUtil.info("notify.refund send message: "+order);
		rabbitTemplate.convertAndSend("notify.refund", order);
	}
}

8.4)添加一个测试类

@RunWith(SpringRunner.class)
@SpringBootTest
public class RefundNotifySenderTests {
	@Autowired
	private RefundNotifySender sender;
	
	@Test
	public void test_sender() {
		Order order = new Order();
		order.setId(100001);
		order.setOrderId(String.valueOf(System.currentTimeMillis()));
		order.setAmount(new BigDecimal("1999.99"));
		order.setCreateTime(new Date());
		sender.sender(order);
	}
}

8.5)执行RefundNotifySenderTests测试类

生产者日志:

2018-05-14 16:37:47.038  INFO 13672 --- [           main] c.l.sender.RefundNotifySenderTests       : Starting RefundNotifySenderTests on LAPTOP-1DF7S904 with PID 13672 (started by lianjinsoft in...
2018-05-14 16:37:47.041  INFO 13672 --- [           main] c.l.sender.RefundNotifySenderTests       : No active profile set, falling back to default profiles: default
2018-05-14 16:37:47.070  INFO 13672 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa:...
2018-05-14 16:37:47.715  INFO 13672 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:37:48.779  INFO 13672 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:37:48.802  INFO 13672 --- [           main] c.l.sender.RefundNotifySenderTests       : Started RefundNotifySenderTests in 2.082 seconds (JVM running for 2.967)
2018-05-14 16:37:49.085  INFO 13672 --- [           main] com.lianjinsoft.util.LogUtil             : notify.refund send message: Order [id=100001, orderId=1526287069081, amount=1999.99, createTime=...
2018-05-14 16:37:49.104  INFO 13672 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:37:49.170  INFO 13672 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#6b5894c8:0/SimpleConnection@38f57b3d [delegate=...
2018-05-14 16:37:49.265  INFO 13672 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:37:49.266  INFO 13672 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消费者日志:

2018-05-14 16:37:49.242  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.refund receive message: Order [id=100001, orderId=1526287069081, amount=1999.99, createTime=...

分析日志:

从生产者日志可以看出,order对象已经发送成功。

从消费者日志可以看出,order对象已经接受成功并可以直接使用。

注意:

传递的对象必须支持序列化(实现了Serializable接口)

9.RPC

RabbitMQ支持RPC远程调用,同步返回结果。

9.1)为了测试方便,我们在DirectConfig中增加一个消息队列

@Bean
public Queue queryOrderQueue() {
	return new Queue("query.order");
}

9.2)添加一个消息监听类(消费者)

监听routingKey为query.order的队列消息

@Component
@RabbitListener(queues = "query.order")
public class QueryOrderReceive {
	@RabbitHandler
	public Order receive(String orderId) {
		LogUtil.info("notify.refund receive message: "+orderId);
		
		Order order = new Order();
		order.setId(100001);
		order.setOrderId(orderId);
		order.setAmount(new BigDecimal("2999.99"));
		order.setCreateTime(new Date());
		return order;
	}
}

9.3)添加一个消息发送类(生产者)

@Component
public class QueryOrderSender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void sender(String orderId){
		LogUtil.info("query.order send message: "+orderId);
		Order order = (Order) rabbitTemplate.convertSendAndReceive("query.order", orderId);
		LogUtil.info("query.order return message: "+order);
	}
}

9.4)添加一个测试类

@RunWith(SpringRunner.class)
@SpringBootTest
public class QueryOrderSenderTests {
	@Autowired
	private QueryOrderSender sender;
	
	@Test
	public void test_sender() {
		sender.sender("900000001");
	}
}

9.5)执行QueryOrderSenderTests测试类

生产者日志:

2018-05-14 16:38:14.163  INFO 2024 --- [           main] c.l.sender.QueryOrderSenderTests         : Starting QueryOrderSenderTests on LAPTOP-1DF7S904 with PID 2024 (started by lianjinsoft in...
2018-05-14 16:38:14.164  INFO 2024 --- [           main] c.l.sender.QueryOrderSenderTests         : No active profile set, falling back to default profiles: default
2018-05-14 16:38:14.197  INFO 2024 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
2018-05-14 16:38:14.848  INFO 2024 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:38:15.705  INFO 2024 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:38:15.715  INFO 2024 --- [           main] c.l.sender.QueryOrderSenderTests         : Started QueryOrderSenderTests in 1.927 seconds (JVM running for 3.079)
2018-05-14 16:38:15.793  INFO 2024 --- [           main] com.lianjinsoft.util.LogUtil             : query.order send message: 900000001
2018-05-14 16:38:15.812  INFO 2024 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:38:15.988  INFO 2024 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#38be305c:0/SimpleConnection@71984c3 ...
2018-05-14 16:38:16.057  INFO 2024 --- [           main] com.lianjinsoft.util.LogUtil             : query.order return message: Order [id=100001, orderId=900000001, amount=2999.99, createTime=...
2018-05-14 16:38:16.079  INFO 2024 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
2018-05-14 16:38:16.097  INFO 2024 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消费者日志:

2018-05-14 16:38:16.028  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.refund receive message: 900000001

分析日志:

从生产者日志第7行可以看出,消息已经发送成功。从第10行日志可以看出,已经收取到返回的消息,并成功转化为Order对象。

从消费者日志可以看出,已经成功接收到消息并处理完成。

虽然RabbitMQ支持RPC接口调用,但不推荐使用。

原因:

1)RPC默认为单线程阻塞模型,效率极低。

2)需要手动实现多线程消费。

安装RabbitMQ请参考:CentOS在线安装RabbitMQ3.7

本帖源代码:https://gitee.com/skychenjiajun/spring-boot

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏ml

flume安装及配置介绍(二)

注: 环境: skylin-linux Flume的下载方式:   wget http://www.apache.org/dyn/closer.lua/flu...

33111
来自专栏一个会写诗的程序员的博客

《Spring Boot极简教程》第7章 Spring Boot集成模板引擎

其实,没有任何一个模板引擎(jsp,velocity,thymeleaf,freemarker,etc)可以完全实现MVC绝对的分层,只有“自由度”上的界定罢了...

783
来自专栏生信技能树

(15)基因组各种版本对应关系-生信菜鸟团博客2周年精选文章集

这是我的成名作: 首先是NCBI对应UCSC,对应ENSEMBL数据库: GRCh36 (hg18): ENSEMBL release_52. GRCh37 (...

4128
来自专栏Java3y

Mybatis【与Spring整合】

Mybatis第五篇【Mybatis与Spring整合】 既然我们已经学了Mybatis的基本开发了,接下来就是Mybatis与Spring的整合了! 以下使用...

2394
来自专栏名山丶深处

springboot集成redis(mybatis、分布式session)

3654
来自专栏三杯水

Codis3.2集群HA高可用方案

Sentinel需要使用原生的Redis-server,版本要等于或高于Codis3.2里面的3.2.8版本, 这里是在Redis3.2.9的下配置测试的,另外...

652
来自专栏JAVA后端开发

spring boot2集成activiti6的问题记录

经查,是因为我用mybatis plus,要求用mybatis3.4.6,而activiti用的是mybatis3.4.2,两边有冲突,直接排除activiti...

4373
来自专栏后台及大数据开发

CentOS下 elasticsearch集群安装

4.修改/root/elasticsearch-node3/config/elasticsearch.yml 为如下内容(注意红色部分为三个节点不一致的地方)

823
来自专栏张善友的专栏

Windows Server AppFabric Beta 2 已经发布

Windows Server AppFabric Beta 2是一个包含完全功能的AppFabric版本(This build represents our “...

1755
来自专栏Netkiller

Spring boot with Git version

本文节选自《Netkiller Java 手札》 5.23. Spring boot with Git version Spring boot 每次升级打包发给...

2938

扫码关注云+社区