由java开发UDF1需指定返回值的DataType,spark-2.3.1暂不支持Array、Map这些复杂结构。因此,需要自定义DataType,满足定制化需求。以下以自定义Map结构的DataType为例进行说明。
UDF mapFilterUdf 返回Map结构
BoolFilterUdf.java
package com.sogo.getimei.udf;
import org.apache.spark.sql.api.java.UDF1;
import java.util.HashMap;
import java.util.Map;
/**
* @Created by IntelliJ IDEA.
* @author: MikeLiu
* @Date: 2020/8/3
* @Time: 22:40
* @des:
*/
public class BoolFilterUdf {
/**
* 这里主要保存不变的映射数据,比如黑名单(目前使用这种方式向UDF传入字典等非DF的列)
*/
public static Map<String, String> filterMap;
/**
* 返回值为Boolean类型
*/
public static UDF1<String, Boolean> boolFilterUdf = new UDF1<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return filterMap.containsKey(s);
}
};
/**
* 返回值为Map类型
*/
public static UDF1<String, Map<String, String>> mapFilterUdf = new UDF1<String, Map<String, String>>() {
@Override
public Map<String, String> call(String s) throws Exception {
return filterMap.containsKey(s) ? BoolFilterUdf.filterMap : null;
}
};
/**
* 构造函数,初始化配置字典
*/
public BoolFilterUdf() {
BoolFilterUdf.filterMap = new HashMap<>();
}
/**
* 构造函数,初始化配置字典
*/
public BoolFilterUdf(Map<String, String> filterMap) {
BoolFilterUdf.filterMap = filterMap;
}
}基础数据结构类型:BooleanType, IntegerType, ShortType,LongType, FloatType, DoubleType, ByteType, StringType, DateType, NullType, TimestampType
import com.sogo.getimei.udf.BoolFilterUdf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
// 初始化SparkSession
SparkSession spark = SparkSession.builder().enableHiveSupport().getOrCreate();
// 注册临时UDF
spark.udf().register("boolFilterUdf", BoolFilterUdf.boolFilterUdf, DataTypes.BooleanType);以返回Map<String, String>结构为例说明。
文章1指出可以通过fromJson方法来构建复杂的结构,但不能用于java;文章2给出了scale代码的json格式,返回的数据结构更复杂。基于此,本文从简单到组合,给出可执行的java实现。
String mapTypeJson = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType).json();mapTypeJson的内容为:
{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true}DataType mapDataType = DataType.fromJson(mapTypeJson);spark.udf().register("mapFilterUdf", BoolFilterUdf.mapFilterUdf, mapDataType);// df的schema结构: userid string
df.selectExpr("mapFilterUdf(userid) as id_map")
.filter("id_map is not null")
.show(10, 0);目标struct的形式:
struct<name:string,scores:map<string,float>,friends:array<string>>上面已完成了Map类型的DataTypede构造,同样可以构造出Array类型的DataType如下:
DataType arrayStringDataType = DataType.fromJson(DataTypes.createArrayType(DataTypes.StringType).json());然后借助StructField和StructType完成struct类的DataType的构建
List<StructField> structFieldList = new ArrayList<>();
// 添加name字段, 类型为String DataType
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
// 添加scores字段,类型为Map DataType
structFieldList.add(DataTypes.createStructField("scores", mapStringFloatDataType, true));
// 添加friends字段, 类型为 Array DataType
structFieldList.add(DataTypes.createStructField("friends", arrayStringDataType, true));
String studyTypeJsonStr = DataTypes.createStructType(structFieldList).json();
// 最后组装成struct DataType
DataType studyDataType = DataType.fromJson(studyTypeJsonStr);得到的json string的结果与文章2描述的一致,之后可以直接通过json格式配置DataType
{
"type":"struct",
"fields":[
{
"name":"name",
"type":"string",
"nullable":true,
"metadata":{
}
},
{
"name":"scores",
"type":{
"type":"map",
"keyType":"string",
"valueType":"float",
"valueContainsNull":true
},
"nullable":true,
"metadata":{
}
},
{
"name":"friends",
"type":{
"type":"array",
"elementType":"string",
"containsNull":true
},
"nullable":true,
"metadata":{
}
}
]
}哎,没想到报错了。错误中的value就是StudyEntity类的toString()方法返回的结果。文章3可遇到了这个问题,可惜没有解答,怎么办呢?
The value ({"friends":["liu11","liu12","liu13"],"name":"liu1","scores":{"Chn":99.0,"Math":98.0,"Eng":97.0}}) of the type (com.s
ogo.getimei.entity.StudyEntity) cannot be converted to struct<name:string,scores:map<string,float>,friends:array<string>>失望之余,测试了下map<String, List<String>>结构,发现可以:
DataType arrayStringDataType = DataType.fromJson(DataTypes.createArrayType(DataTypes.StringType).json());
String mapTypeJson = DataTypes.createMapType(DataTypes.StringType, arrayStringDataType).json();
DataType mapStringListDataType = DataType.fromJson(mapTypeJson);本着继续探索的精神,发现使用Row类型替换Entity能解决问题。如下:</br>
StudyEntity.java (含UDF)
package com.sogo.getimei.entity;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.api.java.UDF1;
import java.io.Serializable;
import java.util.*;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan
* @Date: 2020/8/4
* @Time: 14:09
* @des:
*/
@Setter
@Getter
public class StudyEntity implements Serializable {
private String name;
private Map<String, Float> scores;
private List<String> friends;
/**
* UDF: use Row class to return struct type
*/
public static UDF1<String, Row> structFilterUdf = new UDF1<String, Row>() {
@Override
public Row call(String s) throws Exception {
StudyEntity parse = StudyEntity.parse(s);
return RowFactory.create(parse.getName(), parse.getScores(), parse.getFriends());
}
};
/**
* 这种方法运行时报错:toString()的内容无法转换成struct
*/
public static UDF1<String, StudyEntity> parseStudyEntityUdf = new UDF1<String, StudyEntity>() {
@Override
public StudyEntity call(String s) throws Exception {
return StudyEntity.parse(s);
}
};
public static StudyEntity parse(String s) {
// liu \t Chinese:98,Math:99,English:97 \t zhangsan,lisi,zhaoli
if (StringUtils.isEmpty(s)) {
return null;
}
StudyEntity studyEntity = new StudyEntity();
String[] fields = s.split("\t", -1);
if (fields.length < 3) {
return null;
}
// name
studyEntity.setName(fields[0]);
// scores
Map<String, Float> scores = new HashMap<>();
for (String score : fields[1].split(",", -1)) {
String[] courseAndScore = score.split(":", -1);
scores.put(courseAndScore[0], Float.valueOf(courseAndScore[1]));
}
studyEntity.setScores(scores);
// friends
String[] friendStrs = fields[2].split(",", -1);
List<String> friends = new ArrayList<>();
Collections.addAll(friends, friendStrs);
studyEntity.setFriends(friends);
return studyEntity;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}逻辑处理代码
DataType arrayStringDataType = DataType.fromJson(DataTypes.createArrayType(DataTypes.StringType).json());
String mapTypeJson = DataTypes.createMapType(DataTypes.StringType, DataTypes.FloatType).json();
DataType mapStringFloatDataType = DataType.fromJson(mapTypeJson);
List<StructField> structFieldList = new ArrayList<>();
// 添加name字段, 类型为String DataType
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
// 添加scores字段,类型为Map DataType
structFieldList.add(DataTypes.createStructField("scores", mapStringFloatDataType, true));
// 添加friends字段, 类型为 Array DataType
structFieldList.add(DataTypes.createStructField("friends", arrayStringDataType, true));
String studyTypeJsonStr = DataTypes.createStructType(structFieldList).json();
// 最后组装成struct DataType
DataType studyDataType = DataType.fromJson(studyTypeJsonStr);
// UDF 注册
spark.udf().register("structFilterUdf", StudyEntity.structFilterUdf, studyDataType);
// 数据处理
Dataset<Row> studyDs = spark.read().text("./data/input/study_test_data.txt")
.withColumn("split_col", split(col("value"), "\t"))
.selectExpr("structFilterUdf(value) as study")
.selectExpr("study.name as name", "study.scores as scores", "study.friends as friends");
studyDs.show(20, 0);
studyDs.printSchema();测试数据 study_test_data.txt:以'\t'分割,1=name, 2=scores, 3=friends
liu1 Chn:99,Math:98,Eng:97 liu11,liu12,liu13
liu2 Chn:89,Math:88,Eng:87 liu21,liu22,liu23
liu3 Chn:79,Math:78,Eng:77 liu31,liu32,liu33
liu4 Chn:69,Math:68,Eng:67 liu41,liu42,liu43输出结果
+----+----------------------------------------+---------------------+
|name|scores |friends |
+----+----------------------------------------+---------------------+
|liu1|[Chn -> 99.0, Math -> 98.0, Eng -> 97.0]|[liu11, liu12, liu13]|
|liu2|[Chn -> 89.0, Math -> 88.0, Eng -> 87.0]|[liu21, liu22, liu23]|
|liu3|[Chn -> 79.0, Math -> 78.0, Eng -> 77.0]|[liu31, liu32, liu33]|
|liu4|[Chn -> 69.0, Math -> 68.0, Eng -> 67.0]|[liu41, liu42, liu43]|
+----+----------------------------------------+---------------------+
root
|-- name: string (nullable = true)
|-- scores: map (nullable = true)
| |-- key: string
| |-- value: float (valueContainsNull = true)
|-- friends: array (nullable = true)
| |-- element: string (containsNull = true)继续深究 struct 中嵌套 struct 的问题,也即文章5中遇到的问题。实现发现,若直接返回Entity(或者struct等非基础数据类型时)都会报错。因此,可以通过将它们转换成Row类型解决。以下以解决文章5中的返回PersonEntity为例说明。</br>
public class PersonEntity {
private String name;
private Integer age;
private List<AddressEntity> address;
}
public class AddressEntity {
private String street;
private String city;
}AddressEntity.java
package com.sogo.getimei.entity;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan
* @Date: 2020/8/6
* @Time: 16:08
* @des:
*/
@Setter
@Getter
public class AddressEntity implements Serializable {
private String street;
private String city;
public AddressEntity() {}
public AddressEntity(String street, String city) {
this.street = street;
this.city = city;
}
/**
* 构造 AddressEntity的 DataType
*/
public static DataType dataType() {
List<StructField> structFieldList = new ArrayList<>(2);
structFieldList.add(DataTypes.createStructField("street", DataTypes.StringType, true));
structFieldList.add(DataTypes.createStructField("city", DataTypes.StringType, true));
String jsonStr = DataTypes.createStructType(structFieldList).json();
// 组装成struct DataType
return DataType.fromJson(jsonStr);
}
}PersonEntity.java </br>
在 personParseUdf 中,先将List<AddressEntity> 转换成了 List<Row>,再将PersonEntity转换成Row(包含List<Row>)。
package com.sogo.getimei.entity;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan
* @Date: 2020/8/6
* @Time: 16:05
* @des:
*/
@Setter
@Getter
public class PersonEntity implements Serializable {
private String name;
private Integer age;
private List<AddressEntity> address;
public static DataType dataType() {
List<StructField> structFieldList = new ArrayList<>(3);
// name
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
// age
structFieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
// address
DataType arrayDataType = DataType.fromJson(DataTypes.createArrayType(AddressEntity.dataType()).json());
structFieldList.add(DataTypes.createStructField("address", arrayDataType, true));
// final struct
String jsonStr = DataTypes.createStructType(structFieldList).json();
return DataType.fromJson(jsonStr);
}
public static UDF1<String, Row> personParseUdf = new UDF1<String, Row>() {
@Override
public Row call(String s) throws Exception {
PersonEntity personEntity = PersonEntity.parse(s);
List<Row> rowList = new ArrayList<>();
for (AddressEntity addressEntity : personEntity.getAddress()) {
rowList.add(RowFactory.create(addressEntity.getStreet(), addressEntity.getCity()));
}
return RowFactory.create(personEntity.getName(), personEntity.getAge(), rowList);
}
};
public static PersonEntity parse(String str) {
if (StringUtils.isEmpty(str)) {
return null;
}
String[] fields = str.split("\t", -1);
PersonEntity personEntity = new PersonEntity();
personEntity.setName(fields[0]);
personEntity.setAge(Integer.valueOf(fields[1]));
List<AddressEntity> address = new ArrayList<>();
String[] fieldsAddress = fields[2].split(",", -1);
for (String s : fieldsAddress) {
String[] add = s.split(":", -1);
address.add(new AddressEntity(add[0], add[1]));
}
personEntity.setAddress(address);
return personEntity;
}
}// UDF 注册
spark.udf().register("personParseUdf", PersonEntity.personParseUdf, PersonEntity.dataType());
// 数据处理
Dataset<Row> studyDs = spark.read().text("./data/input/person_test_data.txt")
.selectExpr("personParseUdf(value) as person")
.selectExpr("person.name as name", "person.age as age", "person.address as address");
studyDs.show(20, 0);
studyDs.printSchema();liu1 90 Chn:99,Math:98,Eng:97
liu2 80 Chn:89,Math:88,Eng:87
liu3 70 Chn:79,Math:78,Eng:77
liu4 60 Chn:69,Math:68,Eng:67+----+---+----------------------------------+
|name|age|address |
+----+---+----------------------------------+
|liu1|90 |[[Chn, 99], [Math, 98], [Eng, 97]]|
|liu2|80 |[[Chn, 89], [Math, 88], [Eng, 87]]|
|liu3|70 |[[Chn, 79], [Math, 78], [Eng, 77]]|
|liu4|60 |[[Chn, 69], [Math, 68], [Eng, 67]]|
+----+---+----------------------------------+
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- address: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- street: string (nullable = true)
| | |-- city: string (nullable = true)Spark UDF1 返回基础数结构时,直接使用DataTypes中已定义的;返回Map、Array结构时,先使用createArrayType、createMapType创建对应的json string,再使用DataType.fromJson(...)创建DataType;返回struct或者struct的嵌套结构时,需要将RowFactory.create(...)将struct转换成Row。
1 如何使用Spark UDF返回复杂类型 https://mlog.club/article/1574696
2 使用 json定义spark sql schema 代码例子 http://spark.coolplayer.net/?p=3674
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。