首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Spark -返回多列的Java UDF

Spark -返回多列的Java UDF
EN

Stack Overflow用户
提问于 2016-09-28 07:34:22
回答 1查看 4K关注 0票数 2

我使用的是sparkSql 1.6.2 (Java API),并且我必须处理下面的DataFrame,它在2列中有一个值列表:

代码语言:javascript
复制
ID  AttributeName AttributeValue
 0  [an1,an2,an3] [av1,av2,av3]
 1  [bn1,bn2]     [bv1,bv2]

所需的表为:

代码语言:javascript
复制
ID  AttributeName AttributeValue
 0  an1           av1
 0  an2           av2
 0  an3           av3
 1  bn1           bv1
 1  bn2           bv2

我想我必须使用分解函数和自定义UDF函数的组合。

我找到了以下资源:

我可以成功地运行一个示例,读取这两列并返回一列中前两个字符串的连接

代码语言:javascript
复制
 UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
        public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            return col1.apply(0) + col2.apply(0);
        }
    };

 context.udf().register("combineUDF", combineUDF, DataTypes.StringType);

问题是编写返回两列的UDF的签名(在Java中)。据我所知,我必须定义一个新的StructType,如下所示,并将其设置为返回类型,但到目前为止,我还没有设法让最终的代码工作

代码语言:javascript
复制
StructType retSchema = new StructType(new StructField[]{
            new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
            new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
        }
    );

context.udf().register("combineUDF",combineUDF,retSchema);

任何帮助都将不胜感激。

更新:我正在尝试首先实现压缩包(AttributeName,AttributeValue),然后我只需要在sparkSql中应用标准的分解函数:

代码语言:javascript
复制
ID  AttName_AttValue
 0  [[an1,av1],[an1,av2],[an3,av3]]
 1  [[bn1,bv1],[bn2,bv2]]

我构建了以下UDF:

代码语言:javascript
复制
UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
        public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            List<List<String>> zipped = new LinkedList<>();

            for (int i = 0, listSize = col1.size(); i < listSize; i++) {
                List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
                zipped.add(subRow);
            }

            return zipped;
        }

    };

但当我运行代码时

代码语言:javascript
复制
myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);

我收到以下错误消息:

scala.MatchError:[an1,av1],[an1,av2],[an3,av3]

看起来组合已经正确执行了,但是返回类型不是Scala中预期的类型。

有什么帮助吗?

EN

回答 1

Stack Overflow用户

发布于 2016-10-03 00:02:11

最后,我设法得到了我想要的结果,但可能不是以最有效的方式。

基本上有两个步骤:

  • Zip of the two list
  • Explode of the list in rows

对于第一步,我定义了以下UDF函数

代码语言:javascript
复制
UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() {
    public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
        ArrayList zipped = new ArrayList();

        for (int i = 0, listSize = col1.size(); i < listSize; i++) {
            String subRow = col1.apply(i) + ";" + col2.apply(i);
            zipped.add(subRow);
        }

        return scala.collection.JavaConversions.asScalaBuffer(zipped);
    }

};

缺少到SparkSession的函数注册:

代码语言:javascript
复制
sparkSession.udf().register("concatItems",concatItems,DataTypes.StringType);

然后我用下面的代码调用它:

代码语言:javascript
复制
DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue"));

在这个阶段,df2看起来是这样的:

代码语言:javascript
复制
ID  AttName_AttValue
 0  [[an1,av1],[an1,av2],[an3,av3]]
 1  [[bn1,bv1],[bn2,bv2]]

然后,我调用以下lambda函数将列表分解为行:

代码语言:javascript
复制
 DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row"));

在这个阶段,df3看起来是这样的:

代码语言:javascript
复制
ID  AttName_AttValue
 0  [an1,av1]
 0  [an1,av2]
 0  [an3,av3]
 1  [bn1,bv1]
 1  [bn2,bv2]

最后,为了将属性名和值拆分为两个不同的列,我将DataFrame转换为JavaRDD,以便使用映射函数:

代码语言:javascript
复制
JavaRDD df3RDD = df3.toJavaRDD().map(
            (Function<Row, Row>) myRow -> {
                String[] info = String.valueOf(myRow.get(1)).split(",");
                return RowFactory.create(myRow.get(0), info[0], info[1]);
        }).cache();

如果有人有更好的解决方案,请随时发表评论。我希望它能帮上忙。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39735864

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档