我当前的设置是:
带有pyspark 2.2.1
我使用https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark-jupyter.md作为如何读取数据的示例,但是:
接收器,因为它在python
获取流的每个元素并通过python函数传递它的正确方法是什么?
谢谢,
边缘
发布于 2018-10-12 08:33:10
在第一步中,您定义了一个从EventHub或IoT-Hub读取数据作为流的数据帧:
from pyspark.sql.functions import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
数据以二进制形式存储在body属性中。要获取正文的元素,必须定义结构:
from pyspark.sql.types import *
Schema = StructType([StructField("name", StringType(), True),
StructField("dt", LongType(), True),
StructField("main", StructType(
[StructField("temp", DoubleType()),
StructField("pressure", DoubleType())])),
StructField("coord", StructType(
[StructField("lon", DoubleType()),
StructField("lat", DoubleType())]))
])
并将该架构应用于强制转换为字符串的正文:
from pyspark.sql.functions import *
rawData = df. \
selectExpr("cast(Body as string) as json"). \
select(from_json("json", Schema).alias("data")). \
select("data.*")
在生成的数据帧上,您可以应用函数,例如,在列‘name’上调用自定义函数u_make_hash:
parsedData=rawData.select('name', u_make_hash(rawData['name']).alias("namehash"))
https://stackoverflow.com/questions/49365852
复制相似问题