前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Cloud Stream使用细节

Spring Cloud Stream使用细节

作者头像
江南一点雨
发布2018-04-02 15:56:23
1.4K0
发布2018-04-02 15:56:23
举报
文章被收录于专栏:玩转JavaEE玩转JavaEE

上篇文章我们看了Spring Cloud Stream的基本使用,小伙伴们对Spring Cloud Stream应该也有了一个基本的了解,但是上篇文章中的消息我们是从RabbitMQ的web管理页面发来的,如果我们想要从代码中发送消息呢?本文我们就来看看Spring Cloud Stream的一些使用细节。


自定义消息通道

上篇文章我们提到了Sink和Source两个接口,这两个接口中分别定义了输入通道和输出通道,而Processor通过继承Source和Sink,同时具有输入通道和输出通道。这里我们就模仿Sink和Source,来定义一个自己的消息通道。

还是在上文的基础上,首先我们定义一个接口叫做MySink,如下:

代码语言:javascript
复制
public interface MySink {
    String INPUT = "mychannel";

    @Input(INPUT)
    SubscribableChannel input();
}

这里我们定义了一个名为mychannel的消息输入通道,@Input注解的参数则表示了消息通道的名称,同时我们还定义了一个方法返回一个SubscribableChannel对象,该对象用来维护消息通道订阅者。然后,我们再定义一个名为MySource的接口,如下:

代码语言:javascript
复制
public interface MySource {
    @Output(MySink.INPUT)
    MessageChannel output();
}

@Output注解中描述了消息通道的名称,还是mychannel,然后这里我们也定义了一个返回MessageChannel对象的方法,该对象中有一个向消息通道发送消息的方法。

最后我们定义一个消息接收类,如下:

代码语言:javascript
复制
@EnableBinding(value = {MySink.class})
public class SinkReceiver2 {
    private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);

    @StreamListener(MySink.INPUT)
    public void receive(Object playload) {
        logger.info("Received:" + playload);
    }
}

OK,我们在这里绑定消息通道,然后监听自定义的消息通道,最后来一个单元测试测试一下,如下:

代码语言:javascript
复制
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = StreamHelloApplication.class)
@EnableBinding(MySource.class)
public class StreamHelloApplicationTests {

    @Autowired
    private MySource mySource;

    @Test
    public void contextLoads() {
        mySource.output().send(MessageBuilder.withPayload("hello 123").build());
    }
}

运行单元测试,我们可以看到如下日志,表示消息发送成功了:

如果想要发送对象也可以直接发送,不用进行对象转换,如下:

发送:

代码语言:javascript
复制
Book book = new Book(1l, "三国演义", "罗贯中");
mySource.output().send(MessageBuilder.withPayload(book).build());

接收:

代码语言:javascript
复制
@StreamListener(MySink.INPUT)
public void receive(Book playload) {
    logger.info("Received:" + playload);
}

如果我们想要在接收成功后给一个回执,也是OK的,如下:

代码语言:javascript
复制
@StreamListener(MySink.INPUT)
@SendTo(Source.OUTPUT)//定义回执发送的消息通道
public String receive(Book playload) {
    logger.info("Received:" + playload);
    return "receive msg :" + playload;
}

方法的返回值就是回执消息,回执消息在系统默认的output通道中,我们如果想要接收这个消息,当然就要监听这个通道,如下:

代码语言:javascript
复制
@StreamListener(Source.OUTPUT)
public void receive2(String msg) {
    System.out.println("msg:"+msg);
}

当然要记得Source类也要在@EnableBinding注解中进行绑定。此时运行结果如下:

消费组

由于我们的服务可能会有多个实例同时在运行,如果不做任何设置,此时发送一条消息将会被所有的实例接收到,但是有的时候我们可能只希望消息被一个实例所接收,这个需求我们可以通过消息分组来解决。方式很简单,给项目配置消息组和主题,如下:

代码语言:javascript
复制
spring.cloud.stream.bindings.mychannel.group=g1
spring.cloud.stream.bindings.mychannel.destination=dest1

这里我们设置该工程都属于g1消费组,输入通道的主题名则为dest1。这里配置完成之后,我们在消息发送方做如下配置:

代码语言:javascript
复制
spring.cloud.stream.bindings.mychannel.destination=dest1

也配置消息主题名为dest1(如果发送和接收就在同一个应用中,则这里可以不配置)。OK,此时我们将我们的项目启动两个实例,注意两个实例的端口不一样,此时如果我们再发送消息,则只会被两个实例中的一个接收到,另外一个应用则接收不到,但是到底是两个实例中的哪一个接收,则是不确定的。

消息分区

有的时候,我们可能需要相同特征的消息能够总是被发送到同一个消费者上去处理,如果我们只是单纯的使用消费组则无法实现功能,此时我们需要借助于消息分区,消息分区之后,具有相同特征的消息就可以总是被同一个消费者处理了,配置方式如下(这里的配置都是在消费组的配置基础上完成的):

在消费者上添加如下配置:

代码语言:javascript
复制
spring.cloud.stream.bindings.mychannel.consumer.partitioned=true
spring.cloud.stream.instance-count=2
spring.cloud.stream.instance-index=0

关于这个配置我说三点:

1.第一行表示开启消息分区 2.第二行表示当前消息者的总的实例个数 3.第三行表示当前实例的索引,从0开始,当我们启动多个实例时,需要在启动时在命令行配置索引

然后在消息生产者上添加如下配置:

代码语言:javascript
复制
spring.cloud.stream.bindings.mychannel.producer.partitionKeyExpression=payload
spring.cloud.stream.bindings.mychannel.producer.partitionCount=2

第一行配置设置了分区键的表达式规则,第二行则设置了消息分区数量。

OK,此时我们再次启动多个消费者实例,然后重复发送多条消息,这些消息都将被同一个消费者处理掉。

Spring Cloud Stream使用细节我们就先说到这里,有问题欢迎留言讨论。

参考资料:

1.《Spring Cloud微服务实战》

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-11-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 江南一点雨 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 自定义消息通道
  • 消费组
  • 消息分区
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档