org.springframework.messaging.MessageHandlingException:
error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@673f700a];
nested exception is org.apache.kafka.common.errors.SerializationException:
Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
kafka配置如下:
#kafka-producer配置,官网=>https://kafka.apache.org/documentation/#producerconfigs
kafka:
#集群地址
bootstrap-servers: kafka:9092
consumer:
group-id: async-task-consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: kafka:9092
# Spring Cloud Stream
# key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
# value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
# 消息的键的序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息的值的序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#发送确认机制:acks=all或-1:leader会等待所有ISR中的follower同步完成的ack才commit(保证ISR副本都有数据leader才commit,吞吐率降低),acks=0:partition leader不会等待任何ISR中副本的commit(可能会有数据丢失,吞吐高),acks=1 kafka会把这条消息写到本地日志文件中
acks: all
retries: 0
#累计约1M条就发发送,必须小于缓冲区大小,否则报错无法分配内存(减少IO次数,过大则延时高,瞬间IO大)
batch-size: 1024000
#指定创建信息nio-buffer缓冲区大小约1M
buffer-memory: 1024000
properties:
metadata:
broker:
list: kafka:9092
#发送失败重试次数
message:
send:
max:
retries: 3
#默认0ms立即发送,不修改则上两条规则相当于无效(这个属性时个map列表,producer的其它配置也配置在这里,详细↑官网,这些配置会注入给KafkaProperties这个配置bean中,供#spring自动配置kafkaTemplate这个对象时使用)
linger:
ms: 1000
ssl:
client:
auth: required
SpringCloudStream配置如下:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka:9092
bindings:
test-input:
destination: test-topic
contentType: application/json
group: test-group
consumer:
configuration:
max:
poll:
records: 10
test-output:
destination: test-topic
contentType: application/json
由于项目中kafka配置中key和value 的序列化方式为
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
服务启动时,会给cloud-stream 装载绑定中间件的配置,而spring cloud stream默认使用的序列化方式为ByteArraySerializer,这就导致stream 在发送数据时使用l了服务装载StringSerializer序列化方式,从而导致了java.lang.ClassCastException: [B > cannot be cast to java.lang.String的问题出现。
4.1、在yaml 文件中自定义binder环境的属性。当配置完成后它,创建binder的上下文不再是应用程序上下文的子节点。这允许binder组件和应用组件的完全分离。stream 就会使用自己默认的环境。
stream:
bindings:
input:
destination: input
binder: kafka
output:
destination: input
binder: kafka
binders:
kafka:
type: kafka
environment:
spring.kafka:
bootstrap-servers: ${spring.kafka.bootstrap-servers}
4.2、在Spring Boot配置文件中新增配置如下
spring.cloud.stream.bindings.output.producer.use-native-encoding=true
4.3、终极解决办法:只使用其中一种方式,不要混用
A:各有各的优缺点,也可混合着玩。混合着玩要特别注意springboot 自动装配kafka生产者消费者的消息即value的序列化反系列化默认为string,而springcloud-stream默认为byteArray,需要统一序列化反系列化方式否则乱码或类型转化报错。
B:springboot 自动装配的kafkaTemplate异步发送处理回调消息比较方便
C:springcloud-stream将topic与sink接收器的输入通道与source资源的输出通道bind。通过输出输入通道来发送接收消息,默认会去spring容器中找名output,input的对象进行消息来发送接收,需要手动打开自动配置开关@EnableBingding(XXX)来往spring 的beanFactory实例化
D:springcloud-stream屏蔽了底层MQ的具体实现,可以较方便的切换消息组件如rabbitMq等,也可以较方便的在发送时携带header,消费者可以根据header的不同路由到不同的消费方法,这种操作在有些时候是比较方便的。kafkaListener则需要需要手动解析消息体进行业务路由。
E:springcloud-stream也有其缺点,那就是使用有点麻烦,如果一个系统需要往两个或以上topic发消息,或接收两个或以上topic的消息。需要自定义MySink、MySource,也可用一个processor处理器继承这些接口,开启注解只需要指定这个处理器即可。
参考:
1、kafka和Spring Cloud Stream 混用导致stream 发送消息出现序列化失败问题: java.lang.ClassCastException::https://blog.csdn.net/gzh_91/article/details/102562321
2、Spring Cloud Stream Kafka 异常:https://www.dazhuanlan.com/2019/11/03/5dbe3b38b5962/
3、kafka-springcloud stream与kafkaTemplate的消息系列化与反序列:https://blog.csdn.net/qq_39506978/article/details/89483827
4、spring-cloud-stream-binder-kafka属性配置:https://segmentfault.com/a/1190000011277937
5、SpringCloud学习之SpringCloudStream&集成kafka:https://www.cnblogs.com/niechen/p/8687206.html