前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KafkaTemplate和SpringCloudStream混用导致stream发送消息出现序列化失败问题

KafkaTemplate和SpringCloudStream混用导致stream发送消息出现序列化失败问题

作者头像
chenchenchen
发布2021-09-06 10:16:05
2.5K0
发布2021-09-06 10:16:05
举报
文章被收录于专栏:chenchenchen

背景

1、具体报错

代码语言:javascript
复制
org.springframework.messaging.MessageHandlingException: 
error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@673f700a]; 

nested exception is org.apache.kafka.common.errors.SerializationException: 
Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

2、本地配置

kafka配置如下:

代码语言:javascript
复制
#kafka-producer配置,官网=>https://kafka.apache.org/documentation/#producerconfigs

  kafka:
    #集群地址
    bootstrap-servers: kafka:9092
    consumer:
      group-id: async-task-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: kafka:9092
      # Spring Cloud Stream
      # key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      # value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      # 消息的键的序列化器
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息的值的序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #发送确认机制:acks=all或-1:leader会等待所有ISR中的follower同步完成的ack才commit(保证ISR副本都有数据leader才commit,吞吐率降低),acks=0:partition leader不会等待任何ISR中副本的commit(可能会有数据丢失,吞吐高),acks=1 kafka会把这条消息写到本地日志文件中
      acks: all
      retries: 0
      #累计约1M条就发发送,必须小于缓冲区大小,否则报错无法分配内存(减少IO次数,过大则延时高,瞬间IO大)
      batch-size: 1024000
      #指定创建信息nio-buffer缓冲区大小约1M
      buffer-memory: 1024000
      properties:
        metadata:
          broker:
            list: kafka:9092
        #发送失败重试次数
        message:
          send:
            max:
              retries: 3
        
#默认0ms立即发送,不修改则上两条规则相当于无效(这个属性时个map列表,producer的其它配置也配置在这里,详细↑官网,这些配置会注入给KafkaProperties这个配置bean中,供#spring自动配置kafkaTemplate这个对象时使用)
        linger:
          ms: 1000
        ssl:
          client:
            auth: required

SpringCloudStream配置如下:

代码语言:javascript
复制
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: kafka:9092
      bindings:
        test-input:
          destination: test-topic
          contentType: application/json
          group: test-group
          consumer:
            configuration:
              max:
                poll:
                  records: 10
        test-output:
          destination: test-topic
          contentType: application/json

3、问题原因

由于项目中kafka配置中key和value 的序列化方式为

代码语言:javascript
复制
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

服务启动时,会给cloud-stream 装载绑定中间件的配置,而spring cloud stream默认使用的序列化方式为ByteArraySerializer,这就导致stream 在发送数据时使用l了服务装载StringSerializer序列化方式,从而导致了java.lang.ClassCastException: [B > cannot be cast to java.lang.String的问题出现。

4、解决方案

4.1、在yaml 文件中自定义binder环境的属性。当配置完成后它,创建binder的上下文不再是应用程序上下文的子节点。这允许binder组件和应用组件的完全分离。stream 就会使用自己默认的环境。

代码语言:javascript
复制
stream:
  bindings:
    input:
      destination: input
      binder: kafka
    output:
      destination: input
      binder: kafka
  binders:
    kafka:
      type: kafka
      environment:
        spring.kafka:
          bootstrap-servers: ${spring.kafka.bootstrap-servers}

4.2、在Spring Boot配置文件中新增配置如下

代码语言:javascript
复制
spring.cloud.stream.bindings.output.producer.use-native-encoding=true

4.3、终极解决办法:只使用其中一种方式,不要混用

5、优缺点对比

A:各有各的优缺点,也可混合着玩。混合着玩要特别注意springboot 自动装配kafka生产者消费者的消息即value的序列化反系列化默认为string,而springcloud-stream默认为byteArray,需要统一序列化反系列化方式否则乱码或类型转化报错。

B:springboot 自动装配的kafkaTemplate异步发送处理回调消息比较方便

C:springcloud-stream将topic与sink接收器的输入通道与source资源的输出通道bind。通过输出输入通道来发送接收消息,默认会去spring容器中找名output,input的对象进行消息来发送接收,需要手动打开自动配置开关@EnableBingding(XXX)来往spring 的beanFactory实例化

D:springcloud-stream屏蔽了底层MQ的具体实现,可以较方便的切换消息组件如rabbitMq等,也可以较方便的在发送时携带header,消费者可以根据header的不同路由到不同的消费方法,这种操作在有些时候是比较方便的。kafkaListener则需要需要手动解析消息体进行业务路由。

E:springcloud-stream也有其缺点,那就是使用有点麻烦,如果一个系统需要往两个或以上topic发消息,或接收两个或以上topic的消息。需要自定义MySink、MySource,也可用一个processor处理器继承这些接口,开启注解只需要指定这个处理器即可。

参考:

1、kafka和Spring Cloud Stream 混用导致stream 发送消息出现序列化失败问题: java.lang.ClassCastException::https://blog.csdn.net/gzh_91/article/details/102562321

2、Spring Cloud Stream Kafka 异常:https://www.dazhuanlan.com/2019/11/03/5dbe3b38b5962/

3、kafka-springcloud stream与kafkaTemplate的消息系列化与反序列:https://blog.csdn.net/qq_39506978/article/details/89483827

4、spring-cloud-stream-binder-kafka属性配置https://segmentfault.com/a/1190000011277937

5、SpringCloud学习之SpringCloudStream&集成kafkahttps://www.cnblogs.com/niechen/p/8687206.html

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/09/01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
    • 1、具体报错
      • 2、本地配置
        • 3、问题原因
          • 4、解决方案
            • 5、优缺点对比
            相关产品与服务
            文件存储
            文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档