我有一个简单的Spark结构化流作业,它使用Kafka0.10API从Kafka读取数据并写入我们的S3存储。从日志中我可以看到,对于触发的每一批,流应用程序正在取得进展,并且正在消耗来自源的数据,因为endOffset大于startOffset,而且每一批都在增加。但是numInputRows始终为零,并且没有写入S3的行。
为什么抵消量逐渐增加,但火花批次却没有消耗任何数据?
19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
"runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
"name" : null,
"timestamp" : "2019-09-10T15:55:00.000Z",
"batchId" : 189,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 127,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 24,
"setOffsetRange" : 36,
"triggerExecution" : 1859,
"walCommit" : 1032
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[my_kafka_topic]]",
"startOffset" : {
"my_kafka_topic" : {
"23" : 1206926686,
"8" : 1158514946,
"17" : 1258387219,
"11" : 1263091642,
"2" : 1226741128,
"20" : 1229560889,
"5" : 1170304913,
"14" : 1207333901,
"4" : 1274242728,
"13" : 1336386658,
"22" : 1260210993,
"7" : 1288639296,
"16" : 1247462229,
"10" : 1093157103,
"1" : 1219904858,
"19" : 1116269615,
"9" : 1238935018,
"18" : 1069224544,
"12" : 1256018541,
"3" : 1251150202,
"21" : 1256774117,
"15" : 1170591375,
"6" : 1185108169,
"24" : 1202342095,
"0" : 1165356330
}
},
"endOffset" : {
"my_kafka_topic" : {
"23" : 1206928043,
"8" : 1158516721,
"17" : 1258389219,
"11" : 1263093490,
"2" : 1226743225,
"20" : 1229562962,
"5" : 1170307882,
"14" : 1207335736,
"4" : 1274245585,
"13" : 1336388570,
"22" : 1260213582,
"7" : 1288641384,
"16" : 1247464311,
"10" : 1093159186,
"1" : 1219906407,
"19" : 1116271435,
"9" : 1238936994,
"18" : 1069226913,
"12" : 1256020926,
"3" : 1251152579,
"21" : 1256776910,
"15" : 1170593216,
"6" : 1185110032,
"24" : 1202344538,
"0" : 1165358262
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
}
}火花代码的简化版本如下所示
val df = sparkSession
.readStream
.format"kafka")
.options(Map(
"kafka.bootstrap.servers" -> "host:1009",
"subscribe" -> "my_kafka-topic",
"kafka.client.id" -> "my-client-id",
"maxOffsetsPerTrigger" -> 1000,
"fetch.message.max.bytes" -> 6048576
))
.load()
df
.writeStream
.partitionBy("date", "hour")
.outputMode(OutputMode.Append())
.format("parquet")
.options(Map("checkpointLocation" -> "checkpoint", "path" -> "data"))
.trigger(Trigger.ProcessingTime(Duration("5m")))
.start()
.awaitTermination()编辑:在执行每一批之前,我还从日志中看到了这些
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-5 to offset 1168959116.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-1 to offset 1218619371.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-8 to offset 1157205346.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0] Resetting offset for partition my-topic-21 to offset 1255403059.发布于 2019-09-12 09:18:36
您是否可以检查以下链接中提到的与输出目录和检查点位置有关的情况是否适用于您的情况?
https://kb.databricks.com/streaming/file-sink-streaming.html
发布于 2020-05-24 17:26:47
这正是更新偏移量的问题,但当我清理检查点位置以重新启动流时,却使用旧的目标位置(未清除)写入流数据时,没有发生任何输入行。在清理(更改)检查点和写入位置之后,它工作得很好。
在这个特殊情况下,当我清除检查点位置时,偏移将得到正确的更新。但是,因为我没有明确目标位置(因为它有5-6个月的连续流数据,即需要删除的1000多个小文件),但是很明显,它会触发对星火元数据的检查,并且当它在其中发现旧数据时,它不会使用任何新的数据。
https://stackoverflow.com/questions/57874681
复制相似问题