前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring cloud stream【消息分区】

Spring cloud stream【消息分区】

作者头像
用户4919348
发布2019-07-01 10:40:26
1.1K0
发布2019-07-01 10:40:26
举报
文章被收录于专栏:波波烤鸭波波烤鸭

  在上篇文章中我们给大家介绍了Stream的消息分组,可以实现消息的重复消费的问题,但在某些场景下分组还不能满足我们的需求,比如,同时有多条同一个用户的数据,发送过来,我们需要根据用户统计,但是消息被分散到了不同的集群节点上了,这时我们就可以考虑消息分区了。   当生产者将消息数据发送给多个消费者实例时,保证同一消息数据始终是由同一个消费者实例接收和处理。

Stream 消息分区

创建项目

  将我们上篇文章中的分组的三个项目,拷贝一份修改名称及服务名称

在这里插入图片描述
在这里插入图片描述

没有分区的情况下演示

发送多条消息查看效果

代码语言:javascript
复制
@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
	
	@Autowired
	private ISendeService sendService;

	@Test
	public void testStream(){
		Product p = new Product(999, "stream test ...999");
		// 将需要发送的消息封装为Message对象
		Message message = MessageBuilder
								.withPayload(p)
								.build();
		for (int i = 0; i < 10; i++) {
			// 发送多条消息到队列中
			sendService.send().send(message );
		}
		
	}
}

10条消息被随机的分散到了两个消费者中:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

我们可以看到A中6条消息,B中4条消息,而且这是随机的,下次执行的结果肯定不一样。

分区

1.发送者中配置

代码语言:javascript
复制
spring.application.name=stream-partition-sender
server.port=9060
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange  outputProduct自定义的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct

#通过该参数指定了分区键的表达式规则
spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload
#指定了消息分区的数量。 
spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2

2.消费者中配置

服务A

代码语言:javascript
复制
spring.application.name=stream-partition-receiverA
server.port=9070
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange  和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列  inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct999

#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2 
#设置当前实例的索引号,从 0 开始
spring.cloud.stream.instanceIndex=0

服务B

代码语言:javascript
复制
spring.application.name=stream-partition-receiverB
server.port=9071
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 对应 MQ 是 exchange  和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列  inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct999

#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2 
#设置当前实例的索引号,从 1 开始
spring.cloud.stream.instanceIndex=1

启动服务测试

在这里插入图片描述
在这里插入图片描述

10个消息都被消费者A给消费了,说明到达了我们需要的效果。 案例源码:https://github.com/q279583842q/springcloud-e-book

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年06月29日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Stream 消息分区
    • 创建项目
      • 没有分区的情况下演示
        • 分区
          • 1.发送者中配置
          • 2.消费者中配置
      相关产品与服务
      微服务引擎 TSE
      微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档