首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何使用pyspark和自定义python函数处理均匀的to流

如何使用pyspark和自定义python函数处理均匀的to流
EN

Stack Overflow用户
提问于 2018-03-19 22:52:52
回答 1查看 2.4K关注 0票数 3

我当前的设置是:

带有pyspark 2.2.1

  • streaming服务的Azure IOTHub/EventHub

  • some自定义python函数的
  • Spark 2.3.0基于pandas、matplotlib等

我使用https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark-jupyter.md作为如何读取数据的示例,但是:

  • 不能使用writeStream.start()"

接收器,因为它在python

  • 中没有实现。当我尝试调用.rdd、.map或.flatMap时,我得到了一个异常:“必须使用foreach执行对流来源的查询

获取流的每个元素并通过python函数传递它的正确方法是什么?

谢谢,

边缘

EN

回答 1

Stack Overflow用户

发布于 2018-10-12 08:33:10

在第一步中,您定义了一个从EventHub或IoT-Hub读取数据作为流的数据帧:

代码语言:javascript
复制
from pyspark.sql.functions import *

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

数据以二进制形式存储在body属性中。要获取正文的元素,必须定义结构:

代码语言:javascript
复制
from pyspark.sql.types import *

Schema = StructType([StructField("name", StringType(), True),
                      StructField("dt", LongType(), True),
                      StructField("main", StructType( 
                          [StructField("temp", DoubleType()), 
                           StructField("pressure", DoubleType())])),
                      StructField("coord", StructType( 
                          [StructField("lon", DoubleType()), 
                           StructField("lat", DoubleType())]))
                    ])

并将该架构应用于强制转换为字符串的正文:

代码语言:javascript
复制
from pyspark.sql.functions import *

rawData = df. \
  selectExpr("cast(Body as string) as json"). \
  select(from_json("json", Schema).alias("data")). \
  select("data.*")

在生成的数据帧上,您可以应用函数,例如,在列‘name’上调用自定义函数u_make_hash:

代码语言:javascript
复制
 parsedData=rawData.select('name', u_make_hash(rawData['name']).alias("namehash"))  
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49365852

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档