re Spark文档2.3:
registerJavaFunction(name,javaClassName,returnType=None)源
将Java用户定义函数注册为SQL函数。
除了名称和函数本身之外,还可以选择指定返回类型。当未指定返回类型时,我们将通过反射来推断它。
参数:
name -用户定义函数的名称
javaClassName - java类的全限定名
returnType -注册的Java函数的返回类型。该值可以是pyspark.sql.types.DataType对象,也可以是DDL格式的类型字符串。
我的问题是:
我想有一个大量的UDF库,为火花2.3+,都是用Java写的,都可以从PySpark/Python访问。
阅读上面我链接的文档,你会发现在一个类和Java函数(可以从PySpark中的Spark-调用)之间存在SQL一对一的映射。因此,如果我有10个Java UDF函数,那么我需要创建10个公共Java类,每个类有1个UDF,以使它们可从PySpark/SQL调用。
这是正确的吗?
我是否可以创建1个public Java类,并将许多不同的UDF放入1个类中,并使所有的UDF都可以从Spark2.3中的PySpark调用?
这篇文章没有提供任何示例代码来帮助回答我的问题。看起来一切都在Scala中。我要全部用爪哇写的。我是否需要扩展一个类或实现一个接口才能用Java语言来做这件事呢?如果有任何链接指向要从PySpark-SQL调用的示例Java代码,我们将不胜感激。
Spark: How to map Python with Scala or Java User Defined Functions?
发布于 2018-08-12 00:37:02
因此,如果我有10个Java函数,那么我需要创建10个公共Java类,每个类有一个UDF,以使它们可以从
/SQL中调用。
这是正确的吗?
是的,这是正确的。但是,您可以:
UserDefinedFunction
并将其作为接口,如Spark: How to map Python with Scala or Java User Defined Functions?UDFRegistration.register
中所示注册名为udfs
,然后只需通过Py4j为每个注册函数调用org.apache.spark.sql.functions.callUDF
。发布于 2018-08-12 11:21:02
下面非常简单的Java/ Python /Pyspark代码示例可能会对某些人有所帮助,我在Spark 2.3.1和Java 1.8上得到了它,用于从Python调用的Java UDF。
请注意,在我看来,这种方法对来说非常麻烦,因为您需要为每个Java UDF创建一个单独的类。因此,对于50个离散的Java UDF= 50个独立的公共Java类!理想情况下,如果单个公共Java类可以包含多个单独的Java UDF,则所有这些都打包在单个JAR文件中。唉,我还是不知道该怎么做。
欢迎提出改进建议!谢谢
// Java 8 code
package com.yourdomain.sparkUDF;
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF0;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public final class JavaUDFExample
implements UDF0<String> {
@Override
public String call() throws Exception {
return java.util.UUID.randomUUID().toString();
}
}
// end of Java code
// make a jar file from above including all referenced jar Spark libraries
# PySPark Python code below
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName("Java UDF Example").getOrCreate()
df = spark.read.json(r"c:\temp\temperatures.json")
df.createOrReplaceTempView("citytemps")
spark.udf.registerJavaFunction("getGuid", "com.yourdomain.sparkUDF.JavaUDFExample", StringType())
spark.sql("SELECT getguid() as guid, * FROM citytemps").show()
# end of PySpark-SQL Python code
DOS shell script to run on local Spark:
spark-submit --jars c:\dir\sparkjavaudf.jar python-udf-example.py
https://stackoverflow.com/questions/51797395
复制相似问题