我正在尝试测试我的程序中对数据帧执行转换的部分,我想测试这些数据帧的几种不同变体,这些变体排除了从文件中读取特定DF的选项
所以我的问题是:
显然,我之前在谷歌上搜索过,但找不到任何非常有用的东西。在我找到的更有用的链接中有:
如果示例/教程是用Scala编写的,那就太好了,但我将采用您所使用的任何语言
提前感谢
发布于 2016-03-17 21:07:29
这个link展示了如何以编程方式创建一个带模式的数据框架。您可以将数据保存在单独的特征中,并将其与您的测试混合。例如,
// This example assumes CSV data. But same approach should work for other formats as well.
trait TestData {
val data1 = List(
"this,is,valid,data",
"this,is,in-valid,data",
)
val data2 = ...
}
然后使用ScalaTest,我们可以做这样的事情。
class MyDFTest extends FlatSpec with Matchers {
"method" should "perform this" in new TestData {
// You can access your test data here. Use it to create the DataFrame.
// Your test here.
}
}
要创建DataFrame,您可以使用几个如下所示的工具方法。
def schema(types: Array[String], cols: Array[String]) = {
val datatypes = types.map {
case "String" => StringType
case "Long" => LongType
case "Double" => DoubleType
// Add more types here based on your data.
case _ => StringType
}
StructType(cols.indices.map(x => StructField(cols(x), datatypes(x))).toArray)
}
def df(data: List[String], types: Array[String], cols: Array[String]) = {
val rdd = sc.parallelize(data)
val parser = new CSVParser(',')
val split = rdd.map(line => parser.parseLine(line))
val rdd = split.map(arr => Row(arr(0), arr(1), arr(2), arr(3)))
sqlContext.createDataFrame(rdd, schema(types, cols))
}
我不知道有什么实用程序类用于检查DataFrame中的特定值。但我认为使用DataFrame API编写一个应用程序应该很简单。
发布于 2017-11-24 03:33:53
对于那些希望在Java语言中实现类似功能的人,可以通过使用以下项目在单元测试中初始化SparkContext来开始使用:https://github.com/holdenk/spark-testing-base
我个人不得不模仿一些AVRO文件的文件结构。因此,我使用Avro-tools (https://avro.apache.org/docs/1.8.2/gettingstartedjava.html#download_install)通过以下命令从我的二进制记录中提取模式:
java -jar $AVRO_HOME/avro tojson largeAvroFile.avro | head -3
然后,使用这个小帮助器方法,您可以将输出JSON转换为在单元测试中使用的DataFrame。
private DataFrame getDataFrameFromList() {
SQLContext sqlContext = new SQLContext(jsc());
ImmutableList<String> elements = ImmutableList.of(
{"header":{"appId":"myAppId1","clientIp":"10.22.63.3","createdDate":"2017-05-10T02:09:59.984Z"}}
{"header":{"appId":"myAppId1","clientIp":"11.22.63.3","createdDate":"2017-05-11T02:09:59.984Z"}}
{"header":{"appId":"myAppId1","clientIp":"12.22.63.3","createdDate":"2017-05-11T02:09:59.984Z"}}
);
JavaRDD<String> parallelize = jsc().parallelize(elements);
return sqlContext.read().json(parallelize);
}
发布于 2018-05-29 17:52:14
您可以使用Spark用于自己的单元测试的SharedSQLContext
和SharedSparkSession
。有关示例,请查看我的answer。
https://stackoverflow.com/questions/36060502
复制相似问题