首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何从udf和广播变量访问广播变量是在调用此udf的另一个类中定义的。

如何从udf和广播变量访问广播变量是在调用此udf的另一个类中定义的。
EN

Stack Overflow用户
提问于 2020-05-31 19:54:17
回答 1查看 1.3K关注 0票数 0

如何从UDF和广播变量访问广播变量是在另一个类中定义的,其中这个UDF被称为

代码语言:javascript
复制
    /* this udf is in different file */
    package com.abc
     public class JavaClass{
        public static UserDefinedFunction getvalue = udf((String param) -> {

        return "String value";
        }, DataTypes.StringType);

    }    
**/* below code is in different file */** 
    package com.xyz;
    import com.abc.JavaClass;
     public class AnotherClassToCallUDF{
        pubic static void main(String args[]) {
    Dataset<Row> abc = .......;

    abc.withColoumn("new-col",JavaClass.getvalue.apply("passing some value"));
    }

    }       

    **/* in the above code , how to pass broadcast variable while calling udf...since udf accepts only col typ`enter code here`e and lit type ..it does not accept anything else..then how to access broadcast variable which is defined in main class and accessing in another class..       
*/**
EN

回答 1

Stack Overflow用户

发布于 2020-06-02 12:23:20

试试这个-

1.创建UDF

代码语言:javascript
复制
class MyUDF implements UDF1<Long, String> {
        private Map<Long, String> broadCastMap;
        public MyUDF(Broadcast<Map<Long, String>> broadCastMap) {
           this.broadCastMap = broadCastMap.value();
        }
        public String call(Long id) {
            return id +" -> " + broadCastMap.getOrDefault(id, "No mapping");
        }
    }

2.使用udf传递广播并使用它。

代码语言:javascript
复制
Dataset<Row> inputDf = spark.range(1, 5).withColumn("col1", lit("a"));
        inputDf.show(false);
        inputDf.printSchema();
        /**
         * +---+----+
         * |id |col1|
         * +---+----+
         * |1  |a   |
         * |2  |a   |
         * |3  |a   |
         * |4  |a   |
         * +---+----+
         *
         * root
         *  |-- id: long (nullable = false)
         *  |-- col1: string (nullable = false)
         */

        // Create broadcast
        Map<Long, String> map = new HashMap<>();
        map.put(1L, "b");
        map.put(2L, "c");
        Broadcast<Map<Long, String>> broadCastMap = new JavaSparkContext(spark.sparkContext()).broadcast(map);

        UserDefinedFunction myUdf = udf(new MyUDF(broadCastMap), DataTypes.StringType);

        spark.sqlContext().udf().register("myUdf", myUdf);

        inputDf.withColumn("new_col", callUDF("myUdf",
                JavaConverters.asScalaBufferConverter(Collections.singletonList(col("id"))).asScala()))
                .show();
        /**
         * +---+----+---------------+
         * | id|col1|        new_col|
         * +---+----+---------------+
         * |  1|   a|         1 -> b|
         * |  2|   a|         2 -> c|
         * |  3|   a|3 -> No mapping|
         * |  4|   a|4 -> No mapping|
         * +---+----+---------------+
         */

3.不向FunctionRegistry注册UDF

代码语言:javascript
复制
 inputDf.withColumn("new_col", myUdf.apply(col("id")))
                .show();
        /**
         * +---+----+---------------+
         * | id|col1|        new_col|
         * +---+----+---------------+
         * |  1|   a|         1 -> b|
         * |  2|   a|         2 -> c|
         * |  3|   a|3 -> No mapping|
         * |  4|   a|4 -> No mapping|
         * +---+----+---------------+
         */
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62121715

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档