专栏首页程序猿DDSpring Cloud Stream如何处理消息重复消费?

Spring Cloud Stream如何处理消息重复消费?

最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实,在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概念以及作用。

那么什么是消费组呢?为什么要用消费组?它解决什么问题呢?摘录一段之前博文的内容,来解答这些疑问:

通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理(出现上述重复消费问题)。但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。

下面,通过一个例子来看看如何使用消费组。

问题重现

构建消息消费端

第一步:创建绑定接口,绑定example-topic输入通道(默认情况下,会绑定到RabbitMQ的同名Exchange或Kafaka的同名Topic)。

interface ExampleBinder {
    String NAME = "example-topic";
    @Input(NAME)
    SubscribableChannel input();
}

第二步:对上述输入通道创建监听与处理逻辑。

@EnableBinding(ExampleBinder.class)
public class ExampleReceiver {
    private static Logger logger = LoggerFactory.getLogger(ExampleReceiver.class);
    @StreamListener(ExampleBinder.NAME)
    public void receive(String payload) {
        logger.info("Received: " + payload);
    }
}

第三步;创建应用主类和配置文件

@SpringBootApplication
public class ExampleApplication {
    public static void main(String[] args) {
        SpringApplication.run(ExampleApplication.class, args);
    }
}
spring.application.name=stream-consumer-group
server.port=0

这里设置server.port=0,以方便在本地启动多实例来重现问题。

完成上述操作之后,启动两个该应用的实例,以备后续调用。

构建消息生产端

比较简单,需要注意的是,使用@Output创建一个同名的输出绑定,这样发出的消息才能被上述启动的实例接收到。具体实现如下:

@RunWith(SpringRunner.class)
@EnableBinding(value = {ExampleApplicationTests.ExampleBinder.class})
public class ExampleApplicationTests {
    @Autowired
    private ExampleBinder exampleBinder;
    @Test
    public void exampleBinderTester() {
        exampleBinder.output().send(MessageBuilder.withPayload("Produce a message from : http://blog.didispace.com").build());
    }
    public interface ExampleBinder {
        String NAME = "example-topic";
        @Output(NAME)
        MessageChannel output();
    }
}

启动上述测试用例之后,可以发现之前启动的两个实例都收到的消息,并在日志中打印了:Received: Produce a message from : http://blog.didispace.com。消息重复消费的问题成功重现!

使用消费组解决问题

如何解决上述消息重复消费的问题呢?我们只需要在配置文件中增加如下配置即可:

spring.cloud.stream.bindings.example-topic.group=aaa 当我们指定了某个绑定所指向的消费组之后,往当前主题发送的消息在每个订阅消费组中,只会有一个订阅者接收和消费,从而实现了对消息的负载均衡。只所以之前会出现重复消费的问题,是由于默认情况下,任何订阅都会产生一个匿名消费组,所以每个订阅实例都会有自己的消费组,从而当有消息发送的时候,就形成了广播的模式。

另外,需要注意上述配置中example-topic是在代码中@Output和@Input中传入的名字。

-END-

本文分享自微信公众号 - 程序猿DD(didispace)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-11-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spring Cloud 应用如何注册到多个注册中心

    我们知道,使用 Spring Cloud 开发微服务时,服务注册的使用方式非常简单,只需要引入服务注册的依赖即可。

    程序猿DD
  • 程序员你为什么这么累【续】:编码习惯之配置规范

    请先仔细阅读:分享我工作中制定配置文件的习惯 工作中少不了要制定各种各样的配置文件,这里和大家分享一下工作中我是如何制定配置文件的,这是个人习惯,结合强大的sp...

    程序猿DD
  • Spring Cloud Stream如何消费自己生产的消息?

    在上一篇《Spring Cloud Stream如何处理消息重复消费?》中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继...

    程序猿DD
  • Spring Cloud 应用如何注册到多个注册中心

    我们知道,使用 Spring Cloud 开发微服务时,服务注册的使用方式非常简单,只需要引入服务注册的依赖即可。

    程序猿DD
  • SpringCloud2.0入门3-新的eureka依赖

    Springboot2.0推出有一段时间了,是要学习1.5+还是从2.0开始?犹豫的原因是资料不全,目前现有的资料大部分是1.0的。但作为学习者,肯定要学习最新...

    Ryan-Miao
  • 领域驱动设计-划分界限上下文

    用户1910585
  • java.base.jmod

    /Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/jmods$ jmod list java....

    一个会写诗的程序员
  • Spring Cloud(九)《服务网关Zuul 动态路由与权限过滤器》

    在实际的业务开发中不只是将路由配置放到文件中,而是需要进行动态管理并且可以在变化时不用重启系统就可以更新。与此同时还需要在接口访问的时候,可以增加一些权限验证以...

    小傅哥
  • JavaEE中,实现登录时进行校验验证码的功能

    时间静止不是简史
  • 上下文系列小讲堂(一)

    “度量值”和“计算列”的区别,令很多初学新人纠结不已。毕竟大部份人是从EXCEL里绕过来的,遇到问题,习惯拉起公式添加列,操作近乎条件反射,毕竟添加的计算列实实...

    公众号PowerBI大师

扫码关注云+社区

领取腾讯云代金券