火花版本: 2.3.0
我有一个PySpark数据帧,它有一个数组列,我想通过应用一些字符串匹配条件来过滤数组元素。如果我有这样的数据
Array Col
['apple', 'banana', 'orange']
['strawberry', 'raspberry']
['apple', 'pineapple', 'grapes']我想要过滤每个数组中包含'apple‘字符串的元素,或者,从'app’等开始。我如何在PySpark中实现这一点?
有人能告诉我如何在火星雨中实现它吗?
发布于 2021-10-01 14:34:10
您可以将过滤器与存在结合使用,后者位于高阶函数之下,它将检查数组中的任何元素是否包含word。
另一种方法是UDF -
数据准备
sparkDF = sql.createDataFrame([(['apple', 'banana', 'orange'],),
(['strawberry', 'raspberry'],),
(['apple', 'pineapple', 'grapes'],)
]
,['arr_column']
)
sparkDF.show(truncate=False)
+--------------------------+
|arr_column |
+--------------------------+
|[apple, banana, orange] |
|[strawberry, raspberry] |
|[apple, pineapple, grapes]|
+--------------------------+过滤器&存在>= Spark2.4
starts_with_app = lambda s: s.startswith("app")
sparkDF_filtered = sparkDF.filter(F.exists(F.col("arr_column"), starts_with_app))
sparkDF_filtered.show(truncate=False)
+--------------------------+
|arr_column |
+--------------------------+
|[apple, banana, orange] |
|[apple, pineapple, grapes]|
+--------------------------+UDF -更低版本
def filter_string(inp):
res = []
for s in inp:
if s.startswith("app"):
res += [s]
if res:
return res
else:
return None
filter_string_udf = F.udf(lambda x: filter_string(x),ArrayType(StringType()))
sparkDF_filtered = sparkDF.withColumn('arr_filtered',filter_string_udf(F.col('arr_column')))\
.filter(F.col('arr_filtered').isNotNull())
sparkDF_filtered.show(truncate=False)
+--------------------------+------------+
|arr_column |arr_filtered|
+--------------------------+------------+
|[apple, banana, orange] |[apple] |
|[apple, pineapple, grapes]|[apple] |
+--------------------------+------------+发布于 2021-10-01 14:26:12
您可以使用spark 2.4+中的高阶函数:
df.withColumn("Filtered_Col",F.expr(f"filter(Array_Col,x -> x rlike '^(?i)app' )")).show()+--------------------------+------------+
|Array_Col |Filtered_Col|
+--------------------------+------------+
|[apple, banana, orange] |[apple] |
|[strawberry, raspberry] |[] |
|[apple, pineapple, grapes]|[apple] |
+--------------------------+------------+对于较低的版本,最好使用udf:
import re
def myf(v):
l=[]
for i in v:
if bool(re.match('^(?i)app',i)):
l.append(i)
return l
myudf = F.udf(myf,T.ArrayType(T.StringType()))
df.withColumn("Filtered_Col",myudf("Array_Col")).show()https://stackoverflow.com/questions/69406983
复制相似问题