df = spark.read.csv('input.csv', header=True, inferSchema=True)假设'input.csv‘文件包含以下数据:
id, name, age
1, John, 20
2, Mike, 33
3, Phil, 19, 180, 78
4, Sean, 40我想过滤掉比标题包含更多列的行,并将其保存到不同的输出中,有点像这样(说明性地):
df2 = df.filter(condition1) #condition1 = rows which have more columns than header
df = df.filter(condition2) #condition2 = rows which have same amount or less columns than header
df.show()
df2.show()因此,我将得到如下输出:
+---+------+----+
| id| name| age|
+---+------+----+
| 1| John| 20|
| 2| Mike| 33|
| 4| Sean| 40|
+---+------+----+
+---+------+----+
| id| name| age|
+---+------+----+
| 3| Phil| 19|
+---+------+----+到目前为止我什么也没找到。目前,它只是缩小行以适合标题,而没有办法获得它。我能做什么?谢谢
编辑:模式不一定需要是"id","name“,"age”。它应该从字面上接受任何内容,因此过滤不能依赖于特定的列。此外,解决方案不能排他于特定类型的读取,数据将根据用户选择的内容接收,唯一可以修改的是参数和选项。
发布于 2021-01-27 23:24:36
您可以将其作为文本文件读取,并查找每行中的列数。获取具有3个以上列的行的id的df。然后使用id对数据帧(通过read.csv获得)执行semi或anti连接。
text = spark.read.text('input.csv')
textdf = text.selectExpr(
"value",
"size(split(value, ',')) len",
"split(value, ',')[0] id"
).filter('len > 3')
textdf.show()
+----------------+---+---+
| value|len| id|
+----------------+---+---+
|3,Phil,19,180,78| 5| 3|
+----------------+---+---+
df = spark.read.csv('input.csv', header=True)
df1 = df.join(textdf, 'id', 'anti')
df2 = df.join(textdf, 'id', 'semi')
df1.show()
+---+----+---+
| id|name|age|
+---+----+---+
| 1|John| 20|
| 2|Mike| 33|
| 4|Sean| 40|
+---+----+---+
df2.show()
+---+----+---+
| id|name|age|
+---+----+---+
| 3|Phil| 19|
+---+----+---+https://stackoverflow.com/questions/65921993
复制相似问题