我需要订阅Kafka topic latest
offset,阅读一些最新的记录,打印并完成。我如何在Spark中做到这一点?我想我可以做这样的事情
sqlContext
.read
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.1:9092,...")
.option("subscribe", "myTopic")
.option("startingOffsets", "latest")
.filter($"someField" === "someValue")
.take(10)
.show
发布于 2020-08-27 21:12:38
您需要提前知道您想从Kafka消费哪些分区中的哪些偏移量。如果你有这些信息,你可以这样做:
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.1:9092,...")
.option("subscribe", "myTopic")
.option("startingOffsets", """{"myTopic":{"0":20,"1":20}}""")
.option("endingOffsets", """{"myTopic":{"0":25,"1":25}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.filter(...)
Kafka + Spark Integration Guide中提供了有关startingOffsets
和endingOffsets
的更多详细信息
https://stackoverflow.com/questions/63616440
复制相似问题