首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spark 3.x与Python中Kafka的集成

Spark 3.x与Python中Kafka的集成
EN

Stack Overflow用户
提问于 2020-05-19 13:04:48
回答 1查看 4.2K关注 0票数 4

带有火花流的Kafka抛出了一个错误:

代码语言:javascript
运行
复制
from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka

我已经建立了一个卡夫卡经纪人和一个工作火花环境与一个主人和一个工人。

代码语言:javascript
运行
复制
import os

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils



if __name__=="__main__":
    sc = SparkContext(appName="SparkStreamAISfromKAFKA")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc,1)
    kvs = KafkaUtils.createStream(ssc,"my-kafka-broker","raw-event-streaming-consumer",{'enriched_ais_messages':1})
    lines = kvs.map(lambda x: x[1])
    lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint()
    ssc.start()
    ssc.awaitTermination()

我认为错误的地方是与卡夫卡人有关,特别是与版本有关的东西。有人能帮忙吗?

火花-版本:版本3.0.0-预览2

我执行时:

代码语言:javascript
运行
复制
/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 --jars spark-streaming-kafka-0-10_2.11 spark_streamer.py spark://mysparkip:7077
EN

Stack Overflow用户

回答已采纳

发布于 2020-05-23 21:47:15

根据火花流+ Kafka集成指南

"Kafka 0.8支持在Spark2.3.0中被取消。“

此外,下面的屏幕快照显示,Kafka 0.10 (以及更高版本)不支持Python。

在您的情况下,您必须使用Spark2.4来运行您的代码。

PySpark支持结构化流

如果您计划使用Spark的最新版本(例如3.x),并且仍然希望在Python中将Spark与Kafka集成,则可以使用结构化流。您将找到关于如何在结构化流+ Kafka集成指南(Kafka broker版本0.10.0或更高)中使用Python的详细说明。

从卡夫卡读取数据

代码语言:javascript
运行
复制
# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

给卡夫卡写数据

代码语言:javascript
运行
复制
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()
票数 9
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61891762

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档