前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >记一次Flink写入Kafka坑点

记一次Flink写入Kafka坑点

作者头像
Flink实战剖析
发布2022-04-18 11:53:23
1K0
发布2022-04-18 11:53:23
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

最近做了一个将结果数据写入到Kafka的需求,sink部分代码如下:

代码语言:javascript
复制
val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](

        sinkTopic, new StringKeyedSerializationSchema,producerConfig, sinkSemantic)

      ds.addSink(kafkaProducer).setParallelism(sinkParallelism)
 

其中StringKeyedSerializationSchema是自定义的实现KeyedSerializationSchema的序列化器,用于序列化写入kafka的key/value, 任务也上线了,在flink web页面看到任务各项指标一切正常,也测试消费写入kafka的数据,得到的结果也如预期一样,想着万事大吉了,so easy~ 过了一会kafka中间件的同事找过来说:你这个写入topic的数据怎么只有这几个分区,其他分区都没有数据写入~

什么情况?任务看着一切都ok啊,怎么就有分区没有数据写入呢?马上google一下数据写入kafka的分区策略:

  1. 如果指定写入分区,就将数据写入分区
  2. 如果没有指定分区,指定了key, 那么就会按照key hash对分区取模方式发送
  3. 如果既没指定分区又没指定key,那么就会以轮序的方式发送

而实际情况是有几个分区一条数据都没有写入,并且在StringKeyedSerializationSchema也指定了每条写入数据的key, 那么就一定是第一种情况了,在FlinkKafkaProducer011中指定了数据写入的分区,马上翻看源码,在FlinkKafkaProducer011的invoke方法里面有这么一个逻辑:

代码语言:javascript
复制
if (flinkKafkaPartitioner != null) {

            record = new ProducerRecord<>(

                targetTopic,

                flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),

                timestamp,

                serializedKey,

                serializedValue);

        } else {

            record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);

        }
 

很明显就是执行了if逻辑,也是就flinkKafkaPartitioner不为空,在构建ProducerRecord时调用了flinkKafkaPartitioner.partition的方法,指定写入的partition,而flinkKafkaPartitioner是在FlinkKafkaProducer011初始化的时候给的默认值FlinkFixedPartitioner,在看下其partition方式:

代码语言:javascript
复制
partitions[parallelInstanceId % partitions.length]

parallelInstanceId表示当前task的index,partitions表示kafka的topic的分区,该逻辑求得的分区就是根据当前task index 对partition取余得到的,而我设置的sinkParallelism是4,topic的分区数是6,到这里就比较明朗,取余永远不会得到4、5,所以就导致分区4、5一直没有数据写入。如果设置的parallism设置比kafka的分区数还要大,就会导致得到的partition值大于topic实际partition。 那么解决方式有一下几种:

  1. parallism设置成为与kafka topic 分区数一致大小
  2. 将flinkKafkaPartitioner指定为空,并且制定写入kafka的key
  3. 将flinkKafkaPartitioner与写入的key都置为空
  4. 自定义一个FlinkKafkaPartitioner,重写partition方法

最终选择第三种较为简单的方案,修改代码:

代码语言:javascript
复制
val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](

        sinkTopic, new StringKeyedSerializationSchema,producerConfig,Optional.ofNullable(null), sinkSemantic,5)
 

同时将StringKeyedSerializationSchema的serializeKey返回值设置为null. 再次运行任务,查看kafka 数据写入情况,所有分区都有数据写入。最终破案。

a little note : 如果需要将数据写入多个topic, 重写KeyedSerializationSchema中getTargetTopic方法根据数据判断写入的topic,默认写入的topic是 FlinkKafkaProducer011 传入的topic。

END

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档