前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Cloud Stream 重点与总结

Spring Cloud Stream 重点与总结

作者头像
用户1516716
发布2019-05-13 19:50:07
2.4K0
发布2019-05-13 19:50:07
举报
文章被收录于专栏:A周立SpringCloudA周立SpringCloud

点击上方 IT牧场 ,选择 置顶或者星标技术干货每日送达!

TIPS •本文基于Spring Cloud Stream 2.2.0.RC1,包含其新特性。•内容稍微有点乱,但这毕竟是个人学习笔记分享,不是从0到1的手把手系列博客,望知悉。

本文是当初学习Spring Cloud Stream的笔记,最初写于16年。

原本想开个Spring Cloud Stream系列文章连载,写Spring Cloud Stream算是个人夙愿了——首先这是个人非常喜欢的组件,它屏蔽了各种MQ的差异,统一了编程模型(可以类比成基于MQ通信圈的”Spring Data”);其次个人实体书《Spring Cloud 与 Docker 微服务架构实战》没有包含这部分内容也是一大遗憾;更重要的是,这货细节其实挺多,而且上手是稍微有一点曲线的。

然而,个人已同时在更新 Spring Cloud 系列以及 Spring Cloud Alibaba 系列了,再开一个系列感觉精力跟不上。于是,暂时先对照 Spring Cloud Stream 最新文档,将内容见到到最新版本,包括新特性。

更新完现有系列后,还是会考虑出一个 Spring Cloud Stream 从入门到精通系列教程。

概念

group

By default, when a group is not specified, Spring Cloud Stream assigns the application to an anonymous and independent single-member consumer group that is in a publish-subscribe relationship with all other consumer groups.

组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会消费。

组内单次只有1个实例消费,并且会轮询负载均衡。

In general, it is preferable to always specify a consumer group when binding an application to a given destination.

通常,在将应用程序绑定到给定目标时,最好始终指定使用者组。

partition

One or more producer application instances send data to multiple consumer application instances and ensure that data identified by common characteristics are processed by the same consumer instance.

一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,然后根据分区个数取余,所以对于相同的消息,总会落到同一个消费者上。

destination binder

与外部消息系统通信的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumerbindProducer ,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder。

destination binding

Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建。

Applying the @EnableBinding annotation to one of the application’s configuration classes defines a destination binding.

使用@EnableBinding即可定义destination binding

注解

@Input(“inboundOrders”)

Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface.

代码语言:javascript
复制
public interface Barista {    @Input("inboundOrders")    SubscribableChannel orders();}

Input注解作用

•为每个binding生成channel实例•指定channel名称•在spring容器中生成一个名为inboundOrders,类型为SubscribableChannel的bean•在spring容器中生成一个类,实现Barista接口。

@InboundChannelAdapter

示例:

代码语言:javascript
复制
@Bean@InboundChannelAdapter(value = Source.OUTPUT,        poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))public MessageSource<String> test() {    return () -> new GenericMessage<>("Hello Spring Cloud Stream");}

作用:表示定义的方法能产生消息。用InboundChannelAdapter注解的方法上即使有参数也没用。

•fixedDelay:多少毫秒发送1次•maxMessagesPerPoll:一次发送几条消息。

ServiceActivator

代码语言:javascript
复制
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)    public String transform(String payload) {        return payload.toUpperCase();    }

表示方法能够处理消息或消息有效内容,监听input消息,用方法体的代码处理,然后输出到output中。

@Transformer

和ServiceActivator差不多,表示方法能够转换消息,消息头,或消息有效内容

@StreamListener(target = Sink.INPUT, condition = “headers[‘type’]==’bogey’”)

In order to be eligible to support conditional dispatching, a method must satisfy the follow conditions: •It must not return a value.•It must be an individual message handling method (reactive API methods are not supported).

condition的作用:符合条件,才走处理方法。

condition起作用的两个条件:

•注解的方法没有返回值•方法是一个独立方法,不支持Reactive API

代码示例:

代码语言:javascript
复制
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")public void handle(String body) {    System.out.println("Received: " + body);}
@Bean@InboundChannelAdapter(value = Source.OUTPUT,        poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2"))public MessageSource<String> test() {    return () -> {        Map<String, Object> map = new HashMap<>(1);        map.put("type", "dog");        return new GenericMessage<>("abcdef", map);    };}

PollableMessageSource

作用:允许消费者控制消费速率。

代码语言:javascript
复制
@SpringBootApplication@EnableBinding({ConsumerApplication.PolledProcessor.class})@EnableSchedulingpublic class ConsumerApplication {    public static void main(String[] args) {        SpringApplication.run(ConsumerApplication.class, args);    }
    @Autowired    private PolledProcessor polledProcessor;
    @Scheduled(fixedDelay = 5_000)    public void poll() {        polledProcessor.input().poll(message -> {            byte[] bytes = (byte[]) message.getPayload();            String payload = new String(bytes);            System.out.println(payload);        });    }
    public interface PolledProcessor {        @Input        PollableMessageSource input();
        @Output        MessageChannel output();    }
    @Bean    @InboundChannelAdapter(value = "output",            poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))    public MessageSource<String> test() {        return () -> {            Map<String, Object> map = new HashMap<>(1);            map.put("type", "dog");            return new GenericMessage<>("adfdfdsafdsfa", map);        };    }}

如果不想自己进行byte数组转换,可以添加配置:

代码语言:javascript
复制
spring:  cloud:    stream:      bindings:        output:          # 指定content-type          content-type: text/plain

相关文章:https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers[1]

错误处理

应用处理

方式1:处理指定channel

配置:

代码语言:javascript
复制
spring:  cloud:    stream:      bindings:        input:          destination: my-destination          group: my-group        output:          destination: my-destination

代码:

代码语言:javascript
复制
@Slf4j@SpringBootApplication@EnableBinding({Processor.class})@EnableSchedulingpublic class ConsumerApplication {    public static void main(String[] args) {        SpringApplication.run(ConsumerApplication.class, args);    }
    @StreamListener(value = Processor.INPUT)    public void handle(String body) {        throw new RuntimeException("x");    }
    @ServiceActivator(inputChannel = "my-destination.my-group.errors")    public void handleError(ErrorMessage message) {        Throwable throwable = message.getPayload();        log.error("截获异常", throwable);
        Message<?> originalMessage = message.getOriginalMessage();        assert originalMessage != null;
        log.info("原始消息体 = {}", new String((byte[]) originalMessage.getPayload()));    }
    @Bean    @InboundChannelAdapter(value = Processor.OUTPUT,            poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))    public MessageSource<String> test() {        return () -> new GenericMessage<>("adfdfdsafdsfa");    }}

方式2:处理所有channel

代码语言:javascript
复制
@StreamListener(value = Processor.INPUT)public void handle(String body) {    throw new RuntimeException("x");}
@StreamListener("errorChannel")public void error(Message<?> message) {    ErrorMessage errorMessage = (ErrorMessage) message;    System.out.println("Handling ERROR: " + errorMessage);}

系统处理

系统处理方式,因消息中间件不同而异。如果应用没有配置错误处理器,那么error将会被传播给binder,binder将error回传给消息中间件。消息中间件可以丢弃消息、requeue(重新排队,从而重新处理)或将失败的消息发送给DLQ(死信队列)。

丢弃

默认情况下,错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产。

DLQ

配置:

代码语言:javascript
复制
spring:  cloud:    stream:      bindings:        input:          destination: my-destination          group: my-group        output:          destination: my-destination      rabbit:        bindings:          input:            consumer:              auto-bind-dlq: true

代码:

代码语言:javascript
复制
@StreamListener(value = Processor.INPUT)public void handle(String body) {    throw new RuntimeException("x");}
@Bean@InboundChannelAdapter(value = Processor.OUTPUT,        poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))public MessageSource<String> test() {    return () -> new GenericMessage<>("adfdfdsafdsfa");}

这样,消息消费失败后,就会放入死信队列。在控制台操作一下,即可将这些消息放回消息队列。客户端就可以重新处理。

如果想获取原始错误的异常堆栈,可添加如下配置:

代码语言:javascript
复制
spring:  cloud:    stream:      rabbit:        bindings:          input:            consumer:              republish-to-dlq: true

requeue

Rabbit/Kafka的binder依赖RetryTemplate实现重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate则不再重试。此时可通过requeue方式处理异常。

添加如下配置:

代码语言:javascript
复制
# 默认是3,设为1则禁用重试spring.cloud.stream.bindings.<input channel名称>.consumer.max-attempts=1# 表示是否要requeue被拒绝的消息(即:requeue处理失败的消息)spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException 异常为止。

RetryTemplate

配置方式

RetryTemplate重试也是错误处理的一种手段。

代码语言:javascript
复制
spring:  cloud:    stream:      bindings:        <input channel名称>:          consumer:            # 最多尝试处理几次,默认3            maxAttempts: 3            # 重试时初始避退间隔,单位毫秒,默认1000            backOffInitialInterval: 1000            # 重试时最大避退间隔,单位毫秒,默认10000            backOffMaxInterval: 10000            # 避退乘数,默认2.0            backOffMultiplier: 2.0            # 当listen抛出retryableExceptions未列出的异常时,是否要重试            defaultRetryable: true            # 异常是否允许重试的map映射            retryableExceptions:              java.lang.RuntimeException: true              java.lang.IllegalStateException: false

测试代码:

代码语言:javascript
复制
@StreamListener(value = Processor.INPUT)public void handle(String body) {    throw new RuntimeException(body);}
private AtomicInteger count = new AtomicInteger(0);
@Bean@InboundChannelAdapter(value = Processor.OUTPUT,        poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))public MessageSource<String> test() {    return () -> new GenericMessage<>(count.getAndAdd(1) + "");}

编码方式

多数场景下,使用配置方式定制重试行为都是可以满足需求的,但配置方式可能无法满足一些复杂需求。此时可使用编码方式配置RetryTemplate:

代码语言:javascript
复制
@Configurationclass RetryConfiguration {    @StreamRetryTemplate    public RetryTemplate sinkConsumerRetryTemplate() {        RetryTemplate retryTemplate = new RetryTemplate();        retryTemplate.setRetryPolicy(retryPolicy());        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;    }
    private ExceptionClassifierRetryPolicy retryPolicy() {        BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(                Collections.singletonList(IllegalAccessException.class                ));        keepRetryingClassifier.setTraverseCauses(true);
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);        AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();
        ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();        retryPolicy.setExceptionClassifier(                classifiable -> keepRetryingClassifier.classify(classifiable) ?                        alwaysRetryPolicy : simpleRetryPolicy);
        return retryPolicy;    }
    private FixedBackOffPolicy backOffPolicy() {        final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();        backOffPolicy.setBackOffPeriod(2);
        return backOffPolicy;    }}

然后添加配置:

代码语言:javascript
复制
spring.cloud.stream.bindings.<input channel名称>.consumer.retry-template-name=myRetryTemplate

注: Spring Cloud Stream 2.2才支持设置retry-template-name

动态绑定目标

这是Spring Integration原生的API,建议有时间了解下Spring Integration相关文档。

代码示例:

代码语言:javascript
复制
@EnableBinding@Controllerpublic class SourceWithDynamicDestination {    @Autowired    private BinderAwareChannelResolver resolver;    @Autowired    @Qualifier("sourceChannel")    private MessageChannel localChannel;
    @RequestMapping(path = "/", method = POST, consumes = "*/*")    @ResponseStatus(HttpStatus.ACCEPTED)    public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {        localChannel.send(                MessageBuilder.createMessage(                        body,                        new MessageHeaders(                                Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType)                        )                )        );    }
    @Bean(name = "sourceChannel")    public MessageChannel localChannel() {        return new DirectChannel();    }
    @Bean    @ServiceActivator(inputChannel = "sourceChannel")    public ExpressionEvaluatingRouter router() {        ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));        router.setDefaultOutputChannelName("default-output");        router.setChannelResolver(resolver);        return router;    }
    //Following sink is used as test consumer. It logs the data received through the consumer.    @EnableBinding(Sink.class)    static class TestSink {
        private final Log logger = LogFactory.getLog(getClass());
        @StreamListener(Sink.INPUT1)        public void receive(String data) {            logger.info("Data received from customer-1..." + data);        }
        @StreamListener(Sink.INPUT2)        public void receiveX(String data) {            logger.info("Data received from customer-2..." + data);        }    }
    interface Sink {        String INPUT1 = "input1";        String INPUT2 = "input2";
        @Input(INPUT1)        SubscribableChannel input1();
        @Input(INPUT2)        SubscribableChannel input2();    }}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-04-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 IT牧场 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概念
    • group
      • partition
        • destination binder
          • destination binding
          • 注解
            • @Input(“inboundOrders”)
              • @InboundChannelAdapter
                • ServiceActivator
                  • @Transformer
                    • @StreamListener(target = Sink.INPUT, condition = “headers[‘type’]==’bogey’”)
                    • PollableMessageSource
                    • 错误处理
                      • 应用处理
                        • 方式1:处理指定channel
                          • 方式2:处理所有channel
                            • 系统处理
                              • 丢弃
                                • DLQ
                                  • requeue
                                    • RetryTemplate
                                      • 配置方式
                                        • 编码方式
                                        • 动态绑定目标
                                        相关产品与服务
                                        消息队列 TDMQ
                                        消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
                                        领券
                                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档