Spring Cloud构建微服务架构:消息驱动的微服务(消费组)【Dalston版】

通过之前的《消息驱动的微服务(入门)》一文,相信很多朋友已经对Spring Cloud Stream有了一个初步的认识。但是,对于《消息驱动的微服务(核心概念)》一文中提到的一些核心概念可能还有些迷糊,下面我们将详细的来学习一下这些概念。本文我们就来学习和使用一下“消费组”这一概念。

使用消费组实现消息消费的负载均衡

通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。

默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置 spring.cloud.stream.bindings.input.group属性即可,比如我们可以这样实现:

  • 先创建一个消费者应用 SinkReceiver,实现了 greetings主题上的输入通道绑定,它的实现如下:
@EnableBinding(value = {Sink.class})
public class SinkReceiver {

    private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(User user) {
        logger.info("Received: " + user);
    }
}
  • 为了将 SinkReceiver的输入通道目标设置为 greetings主题,以及将该服务的实例设置为同一个消费组,做如下设置:
spring.cloud.stream.bindings.input.group=Service-A
spring.cloud.stream.bindings.input.destination=greetings

通过 spring.cloud.stream.bindings.input.group属性指定了该应用实例都属于 Service-A消费组,而 spring.cloud.stream.bindings.input.destination属性则指定了输入通道对应的主题名。

  • 完成了消息消费者之后,我们再来实现一个消息生产者应用 SinkSender,具体如下:
@EnableBinding(value = {Source.class})
public class SinkSender {

    private static Logger logger = LoggerFactory.getLogger(SinkSender.class);

    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000"))
    public MessageSource<String> timerMessageSource() {
        return () -> new GenericMessage<>("{\"name\":\"didi\", \"age\":30}");
    }

}
  • 为消息生产者 SinkSender做一些设置,让它的输出通道绑定目标也指向 greetings主题,具体如下:
spring.cloud.stream.bindings.output.destination=greetings

到这里,对于消费分组的示例就已经完成了。分别运行上面实现的生产者与消费者,其中消费者我们启动多个实例。通过控制台,我们可以发现每个生产者发出的消息,会被启动的消费者以轮询的方式进行接收和输出。

博客原文:http://blog.didispace.com/spring-cloud-starter-dalston-7-3/

但对依赖的Spring Boot和Spring Cloud版本做了升级。

原文发布于微信公众号 - 程序猿DD(didispace)

原文发表时间:2018-01-30

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏小白鼠

Jenkins远程部署

服务器之间如果已经配置了通过ssh无密码访问,有关于这一步的配置就结束了。如果没有配置,可以通过密码的方式进行访问,需要配置 密码 和 端口。点击高级:

1.1K3
来自专栏java、Spring、技术分享

Eureka Server

从图中可以看出Eureka服务器提供服务注册与服务查找功能。多台服务器可以形成Eureka服务器集群,以提供高可用的服务。 Eureka 服务器并没有提供后台...

1236
来自专栏Java开发

SpringBoot集成MyBatisPlus

部署:application.properties更改指定部署模式还是开发模式 dev / prod 分别对应application-dev.properti...

4622
来自专栏我的博客

php在windows平台执行shell解压文件

今天看到这个php可以执行shell解压文件,顺便整理出来看看吧 先帖代码吧 <form action="" method="post" enctype="mu...

3164
来自专栏Java开发

Linux下Tomcat指定JDK和设置内存大小

863
来自专栏Java学习123

myeclipse(eclipse)乱码处理

2865
来自专栏一枝花算不算浪漫

testng及JMeter使用之初体验

3677
来自专栏测试开发架构之路

Spring Boot 入门实践

1835
来自专栏君赏技术博客

【未解决】怎么代码加载Localizeable.strings文件到一个字典里面?

我想做一个功能是检查请求的参数值是否被国际化,如果被郭国际化就抱错。因为我们的请求的参数值只能是英文。

712
来自专栏我的小碗汤

这个404你能解决吗?

去看webapps里工程目录下,index.html文件是有的,见鬼了,是哪儿出了问题?

1513

扫码关注云+社区

领取腾讯云代金券