我正在尝试将数据从csv文件加载到DataFrame。我必须使用spark.read.csv()
函数,因为rdd sc.fileText()
不能处理文件中的特定数据( csv数据中有不可见的逗号字符,rdd sc.fileText()
会将它们拆分)。csv文件在开始时有几行要跳过:
Report <- to skip
Generated by YYYY-MM-DD <- to skip
Sessions <- to skip
<- to skip
Session of all hosts <- to skip
col1,col2,col3,col4,col5 <- it is my header
tst1,tst2,tst3,tst4,tst5 <- my data start here
tst6,tst7,tst8,tst9,tst10
...
我想要一个这样的数据帧:
tst1,tst2,tst3,tst4,tst5 <- data
tst6,tst7,tst8,tst9,tst10
我尝试使用map和filter函数,但它不能正常工作:
.rdd.map(lambda line: str(line).split(','))\
.filter(lambda line: len(line)>3).collect()
发布于 2019-06-11 03:00:03
我找不到任何可以让我跳过指定行数的东西,但是我可以通过指定模式,然后读取数据并过滤掉任何我不想带来的东西来实现相同的结果。
import org.apache.spark.sql.types.{StructType, StringType}
val fileSchema = new StructType()
.add("column1", StringType)
.add("column2", StringType)
.add("column3", StringType)
.add("column4", StringType)
.add("column5", StringType)
val data = spark.read.schema(fileSchema).csv("s3a://aws-s3-test-bucket/jeremy/foo.txt")
data.show(false)
哪一项会产生
+-----------------------+-------+-------+-------+-------+
|column1 |column2|column3|column4|column5|
+-----------------------+-------+-------+-------+-------+
|Report |null |null |null |null |
|Generated by YYYY-MM-DD|null |null |null |null |
|Sessions |null |null |null |null |
|Session of all hosts |null |null |null |null |
|col1 |col2 |col3 |col4 |col5 |
|tst1 |tst2 |tst3 |tst4 |tst5 |
|tst6 |tst7 |tst8 |tst9 |tst10 |
+-----------------------+-------+-------+-------+-------+
然后,您可以过滤掉包含您知道不应该在其中的数据的行,例如,您想要跳过的行和列名。
我建议在创建模式时使用实际的列名,而不是我使用column1
-> real_name_of_column1
来获取所需数据帧的占位符。我在示例中使用占位符是为了强调这样一个事实,即创建模式时使用的列名不一定需要由数据通知。
+-----------------------+-------+-------+-------+-------+
|col1 |col2 |col3 |col4 |col5 |
+-----------------------+-------+-------+-------+-------+
|tst1 |tst2 |tst3 |tst4 |tst5 |
|tst6 |tst7 |tst8 |tst9 |tst10 |
...
+-----------------------+-------+-------+-------+-------+
https://stackoverflow.com/questions/56530675
复制相似问题