我使用的是sparkSql 1.6.2 (Java API),并且我必须处理下面的DataFrame,它在2列中有一个值列表:
ID AttributeName AttributeValue
0 [an1,an2,an3] [av1,av2,av3]
1 [bn1,bn2] [bv1,bv2]
所需的表为:
ID AttributeName AttributeValue
0 an1 av1
0 an2 av2
0 an3 av3
1 bn1 bv1
1 bn2 bv2
我想我必须使用分解函数和自定义UDF函数的组合。
我找到了以下资源:
我可以成功地运行一个示例,读取这两列并返回一列中前两个字符串的连接
UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
return col1.apply(0) + col2.apply(0);
}
};
context.udf().register("combineUDF", combineUDF, DataTypes.StringType);
问题是编写返回两列的UDF的签名(在Java中)。据我所知,我必须定义一个新的StructType,如下所示,并将其设置为返回类型,但到目前为止,我还没有设法让最终的代码工作
StructType retSchema = new StructType(new StructField[]{
new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
}
);
context.udf().register("combineUDF",combineUDF,retSchema);
任何帮助都将不胜感激。
更新:我正在尝试首先实现压缩包(AttributeName,AttributeValue),然后我只需要在sparkSql中应用标准的分解函数:
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1],[bn2,bv2]]
我构建了以下UDF:
UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
List<List<String>> zipped = new LinkedList<>();
for (int i = 0, listSize = col1.size(); i < listSize; i++) {
List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
zipped.add(subRow);
}
return zipped;
}
};
但当我运行代码时
myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);
我收到以下错误消息:
scala.MatchError:[an1,av1],[an1,av2],[an3,av3]
看起来组合已经正确执行了,但是返回类型不是Scala中预期的类型。
有什么帮助吗?
发布于 2016-10-03 00:02:11
最后,我设法得到了我想要的结果,但可能不是以最有效的方式。
基本上有两个步骤:
对于第一步,我定义了以下UDF函数
UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() {
public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
ArrayList zipped = new ArrayList();
for (int i = 0, listSize = col1.size(); i < listSize; i++) {
String subRow = col1.apply(i) + ";" + col2.apply(i);
zipped.add(subRow);
}
return scala.collection.JavaConversions.asScalaBuffer(zipped);
}
};
缺少到SparkSession的函数注册:
sparkSession.udf().register("concatItems",concatItems,DataTypes.StringType);
然后我用下面的代码调用它:
DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue"));
在这个阶段,df2看起来是这样的:
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1],[bn2,bv2]]
然后,我调用以下lambda函数将列表分解为行:
DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row"));
在这个阶段,df3看起来是这样的:
ID AttName_AttValue
0 [an1,av1]
0 [an1,av2]
0 [an3,av3]
1 [bn1,bv1]
1 [bn2,bv2]
最后,为了将属性名和值拆分为两个不同的列,我将DataFrame转换为JavaRDD,以便使用映射函数:
JavaRDD df3RDD = df3.toJavaRDD().map(
(Function<Row, Row>) myRow -> {
String[] info = String.valueOf(myRow.get(1)).split(",");
return RowFactory.create(myRow.get(0), info[0], info[1]);
}).cache();
如果有人有更好的解决方案,请随时发表评论。我希望它能帮上忙。
https://stackoverflow.com/questions/39735864
复制相似问题