// Subscribe to 1 topic
val df = spark
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic, with headers
val df = spark
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Map)]
// Subscribe to multiple topics
val df = spark
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern
val df = spark
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
Real-Time End-to-End Integration with Apache Kafka in Apache Spark’s Structured Streaming
如有侵权,请联系 cloudcommunity@tencent.com 删除。
如有侵权,请联系 cloudcommunity@tencent.com 删除。