由于Spark UDF的输入参数必须是数据列column,在UDF中进行如Redis查询、白/黑名单过滤前,需要加载外部资源(如配置参数、白名单)初始化它们的实例。若它们都能被序列化,从Driver端初始化+broadcast的方式可以完成构建。而Redis、字典树等存在不能序列化的对象,也就无法从Driver端发送到Excutor端。因此,整体的思路是:在Driver端初始化可以被序列化的资源,在Excutor端利用资源构建不可序列化对象,从而分布完成整个对象的构建。
同时结合单列的思想,在每个Excutor端仅完成一次构建。核心关键在于在Excutor初始化静态变量等不可序列化的成员,以下提供3种解决思路。
本文以构建字典树为进行说明,Redis连接可以参考文章1
本部分介绍AtKwdBo类、WordTrieEntity类;AtKwdBo类:使用AtKwdBo类接收构建字典树的词包;WordTrieEntity类:字典树的构造与字符串匹配
文章中3总结了序列化的问题,如下:
keywords记录关键词,stopwords记录否词。若用户query词命中stopwords中的任一否词,过滤掉该条query词;若用户query命中keywords中的任一关键词,则命中当前词包。用户所有query必须命中所有词包,才能筛选出该用户。
AtKwdBo.java
package com.sogo.getimei.entity; import java.io.Serializable; import java.util.*; import lombok.Getter; import lombok.Setter; @Getter @Setter public class AtKwdBo implements Serializable { private Set<String> keywords; private Set<String> stopwords; /** * just for test * @return */ public static List<AtKwdBo> generateKeyWord() { // Keyword List<AtKwdBo> atKwdBos = new ArrayList<>(); AtKwdBo atKwdBo = new AtKwdBo(); Set<String> keywords = new HashSet<>(); keywords.add("小米手机"); keywords.add("雷军"); keywords.add("小米10周年"); atKwdBo.setKeywords(keywords); Set<String> stopwords = new HashSet<>(); stopwords.add("华为手机"); atKwdBo.setStopwords(stopwords); atKwdBos.add(atKwdBo); return atKwdBos; } }
字典树(AC自动机)需要引用的maven依赖如下:
<!--AC自动机--> <!-- https://mvnrepository.com/artifact/org.ahocorasick/ahocorasick --> <dependency> <groupId>org.ahocorasick</groupId> <artifactId>ahocorasick</artifactId> <version>0.4.0</version> </dependency>
字典树的构建方法
private static Trie buildTrie(Set<String> stringSet) { return Trie.builder().addKeywords(stringSet).build(); }
基于字典树构建 "关键词字典树" 和 "停词字典树":
注:主要实现词包间的与或非逻辑,具体细节可以忽略
WordTrieEntity.java
package com.sogo.getimei.entity; import lombok.Getter; import lombok.Setter; import org.ahocorasick.trie.Trie; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.Seq; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Set; /** * @Created by IntelliJ IDEA. * @author: liuzhixuan * @Date: 2020/8/26 * @Time: 23:08 * @des: */ @Setter @Getter public class WordTrieEntity implements Serializable { // LOGGER private static final Logger LOGGER = LoggerFactory.getLogger(WordTrieEntity.class); // 不被序列化 private transient Trie keywordsTrie; // 不被序列化 private transient Trie stopwordsTrie; public WordTrieEntity(Trie keywordsTrie, Trie stopwordsTrie) { this.keywordsTrie = keywordsTrie; this.stopwordsTrie = stopwordsTrie; } public static List<WordTrieEntity> generateKeywordTrieList(List<AtKwdBo> atKwdBos) { // get key word List<WordTrieEntity> keywordsTrieList = new ArrayList<>(); for (AtKwdBo atKwdBo : atKwdBos) { Trie keywordsTrie = buildTrie(atKwdBo.getKeywords()); Trie stopwordsTrie = buildTrie(atKwdBo.getStopwords()); keywordsTrieList.add(new WordTrieEntity(keywordsTrie, stopwordsTrie)); } System.out.println("I am initialized in WordTrieEntity"); return keywordsTrieList; } private static Trie buildTrie(Set<String> stringSet) { return Trie.builder().addKeywords(stringSet).build(); } public static Boolean contains(Seq<String> stringSeq, List<WordTrieEntity> wordTrieList) { for (WordTrieEntity wordTrie : wordTrieList) { // 词包间是“与”的关系 if (Boolean.FALSE.equals(contains(wordTrie, stringSeq))) { return false; } } return true; } private static Boolean contains(WordTrieEntity wordTrie, Seq<String> stringSeq) { // 只要存在一个即可 for (int i = 0; i < stringSeq.size(); i ++) // 词包内是“或”的关系 if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.apply(i)))) { return true; } // 都不存在时,返回false return false; } private static Boolean contains(WordTrieEntity wordTrie, List<String> stringSeq) { // 只要存在一个即可 for (int i = 0; i < stringSeq.size(); i ++) // 词包内是“或”的关系 if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.get(i)))) { return true; } // 都不存在时,返回false return false; } private static Boolean contains(WordTrieEntity wordTrie, String query) { // 否词 if (null != wordTrie.getStopwordsTrie() && wordTrie.getStopwordsTrie().containsMatch(query)) { return false; } // 匹配关键词 if (null == wordTrie.getKeywordsTrie()) { LOGGER.error("keyword is null"); } return null != wordTrie.getKeywordsTrie() && wordTrie.getKeywordsTrie().containsMatch(query); } }
Spark UDF在注册时就需要实例化,之后有且仅会(自动)调用call方法。考虑到字典树中存在不能被序列化的对象,因此将字典树用static关键词修饰。而静态成员变量在Driver端初始化,不会传输到Excutor端,调用时会出现空指针异常(另外一种表现是:在local模式下测试正常,在yarn模式报错)。因此,我们需要在call方法中初始化(因为此时调用发生在Excutor端)。为了防止字典树被多次初始化,我们模拟单列:
FilterQueryByAcAutoUdf.java
wordTrieList成员变量是个List结构,其中一个元素对应一个词包,词包中包含有关键词和否词。
package com.sogo.getimei.udf; import com.sogo.getimei.entity.AtKwdBo; import com.sogo.getimei.entity.WordTrieEntity; import lombok.Getter; import lombok.Setter; import org.apache.spark.sql.api.java.UDF1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.Seq; import java.io.Serializable; import java.util.List; /** * @Created by IntelliJ IDEA. * @author: liuzhixuan * @Date: 2020/8/26 * @Time: 14:24 * @des: */ @Getter @Setter public class FilterQueryByAcAutoUdf implements UDF1<Seq<String>, Boolean>, Serializable{ // log private final Logger LOGGER = LoggerFactory.getLogger(FilterQueryByAcAutoUdf.class); // static修饰: 1. 字典树中有不能被序列化的对象; 2. 模拟单列,仅需被初始化一次; private static List<WordTrieEntity> wordTrieList; // 若 AtKwdBo 中存在不能被序列化的对象,本方法不适用 private List<AtKwdBo> atKwdBos; public FilterQueryByAcAutoUdf(List<AtKwdBo> atKwdBos) { this.atKwdBos = atKwdBos; // // 直接在这里初始化,不会传到Excutor,也即执行时会有空指针异常的问题 // wordTrieList = WordTrieEntity.generateKeywordTrieList(atKwdBos); } // 模拟单列:懒汉模式,2次校验,保存在Excutor中仅被初始化一次 @Override public Boolean call(Seq<String> stringSeq) throws Exception { if (null == wordTrieList || wordTrieList.isEmpty()) { synchronized (FilterQueryByAcAutoUdf.class) { if (null == wordTrieList || wordTrieList.isEmpty()) { // 若 AtKwdBo 中存在不能被序列化的对象,可以放在此处初始化 wordTrieList = WordTrieEntity.generateKeywordTrieList(atKwdBos); LOGGER.error("[DEBUG] Test how many times it will be initial; wordTrieList is null or empty"); } } } return WordTrieEntity.contains(stringSeq, wordTrieList); } }
spark.udf().register("filterQueryWordsUdf", new FilterQueryByAcAutoUdf(AtKwdBo.generateKeyWord(), DataTypes.BooleanType); Dataset<Row> acDs = waplxDs.filter("filterQueryWordsUdf(fwords)").selectExpr("imei", "explode(fwords) as fwords")
waplxDs的schema如下
root |-- imei: string (nullable = true) |-- fwords: array (nullable = true) | |-- element: string (containsNull = false)
匹配出搜索了"小米手机"的用户
+----------------------------------------+------------------------------+ |imei |fwords | +----------------------------------------+------------------------------+ |26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx5座套 | |26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx3导航 | |26E014B8B77C0A442EC31E59505A1CED4D446779|太靠左右边那么大 | |26E014B8B77C0A442EC31E59505A1CED4D446779|电动车禁了 | |26E014B8B77C0A442EC31E59505A1CED4D446779|说实在的电动车禁了可以减少车祸| |26E014B8B77C0A442EC31E59505A1CED4D446779|小米手机 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装高德 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装carlife | +----------------------------------------+------------------------------+
log日志仅出现一次
I am initialized in WordTrieEntity ERROR com.sogo.getimei.udf.FilterQueryByAcAutoUdf - [DEBUG] Test how many times it will be initial; wordTrieList is null or empty
输出结果符合预期,本地测试正常,集群测试正常。查看Excutor日志可知,每个Excutor中wordTrieList仅被初始化一次。
FilterQueryByAcAutoUdf0类只包含静态变量和静态方法,在Driver无需实例化,因此wordTrieList = WordTrieEntity.generateKeywordTrieList(AtKwdBo.generateKeyWord()); 不会被执行,仅在调用FilterQueryByAcAutoUDF.call方法时才会被执行2,这就保证在每个Excutor都会构建出字典树,不会出现空指针异常的问题。本方法适应于词包固定的情况,当程序运行起来后,由词包构建的字典树就不会改变。
文章2中讲明了静态成员变量初始化实机为:读取一个类的静态字段
FilterQueryByAcAutoUdf0.java
package com.sogo.getimei.udf; import com.sogo.getimei.entity.AtKwdBo; import com.sogo.getimei.entity.WordTrieEntity; import org.apache.spark.sql.api.java.UDF1; import scala.Serializable; import scala.collection.Seq; import java.util.List; /** * @Created by IntelliJ IDEA. * @author: liuzhixuan * @Date: 2020/8/27 * @Time: 16:43 * @des: */ public class FilterQueryByAcAutoUdf0 implements Serializable { // lazy初始化,在使用时才会被初始化(仅在Excutor端调用) private static List<WordTrieEntity> wordTrieList = WordTrieEntity.generateKeywordTrieList(AtKwdBo.generateKeyWord()); private FilterQueryByAcAutoUdf0() { System.out.println("I am initialized in FilterQueryByAcAutoUdf0"); } public static UDF1<Seq<String>, Boolean> FilterQueryByAcAutoUDF = new UDF1<Seq<String>, Boolean>() { @Override public Boolean call(Seq<String> stringSeq) throws Exception { // 在此处调用wordTrieList,才会初始化wordTrieList,且仅被初始化一次 return WordTrieEntity.contains(stringSeq, wordTrieList); } }; }
spark.udf().register("filterQueryWordsUdf", FilterQueryByAcAutoUdf0.FilterQueryByAcAutoUDF, DataTypes.BooleanType); Dataset<Row> acDs = waplxDs.filter("filterQueryWordsUdf(fwords)") .selectExpr("imei", "explode(fwords) as fwords")
waplxDs同上,输出结果符合预期,如下:
+----------------------------------------+------------------------------+ |imei |fwords | +----------------------------------------+------------------------------+ |26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx5座套 | |26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx3导航 | |26E014B8B77C0A442EC31E59505A1CED4D446779|太靠左右边那么大 | |26E014B8B77C0A442EC31E59505A1CED4D446779|电动车禁了 | |26E014B8B77C0A442EC31E59505A1CED4D446779|说实在的电动车禁了可以减少车祸| |26E014B8B77C0A442EC31E59505A1CED4D446779|小米手机 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装高德 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装carlife | +----------------------------------------+------------------------------+
解决写Spark UDF 麻烦,那就用Dataset的mapPartition算子代码。使用mapPartition算子,我们也不能在Driver端初始化不能被序列化的成员变量。使用broadcast+单例既保证了尽量少的拷贝、尽量少的初始化。
WordTrieInitEntity.java
package com.sogo.getimei.entity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.List; /** * @Created by IntelliJ IDEA. * @author: liuzhixuan * @Date: 2020/8/27 * @Time: 10:44 * @des: */ public class WordTrieInitEntity implements Serializable { // logger private final Logger logger = LoggerFactory.getLogger(WordTrieEntity.class); // key word and stop word private static List<WordTrieEntity> wordTrieList; // resource to build wordTrieList private List<AtKwdBo> atKwdBos; public WordTrieInitEntity(List<AtKwdBo> atKwdBos) { // 在 Driver 端初始化(可序列化的)资源数据 this.atKwdBos = atKwdBos; } /** * 在 Excutor 端进行初始化字典树 * @return 字典树 */ public List<WordTrieEntity> getWordTrieList() { if (null == wordTrieList || wordTrieList.isEmpty()) { synchronized (WordTrieInitEntity.class) { if (null == wordTrieList || wordTrieList.isEmpty()) { wordTrieList = WordTrieEntity.generateKeywordTrieList(atKwdBos); } } } return wordTrieList; } }
// 实例化对象,初始化非静态成员变量 WordTrieInitEntity wordTrieInitEntity = new WordTrieInitEntity(AtKwdBo.generateKeyWord()); JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); // 广播,此时主要广播构建字典树的词包数据 Broadcast<WordTrieInitEntity> wordTriesBroadcast = javaSparkContext.broadcast(wordTrieInitEntity); // 使用mapPartition取代UDF Dataset<Row> acDs = waplxDs.mapPartitions(new MapPartitionsFunction<Row, String>() { @Override public Iterator<String> call(Iterator<Row> iterator) throws Exception { // 在此处(Excutor中)构建字典树 List<WordTrieEntity> wordTries = wordTriesBroadcast.value().getWordTrieList(); logger.error("[DEBUG] wordTries:[{}]", wordTries); List<String> res = new ArrayList<>(); if (null == wordTries || wordTries.isEmpty()) { logger.error("word tries is null or empty"); return res.iterator(); } // 整个Excutor公用一份字典树 while (iterator.hasNext()) { Row row = iterator.next(); Seq<String> fwords = row.getAs("fwords"); if (WordTrieEntity.contains(fwords, wordTries)) { res.add(row.getAs("imei")); } else { res.add(null); } } return res.iterator(); } }, Encoders.STRING()) .filter(col("value").isNotNull()) .selectExpr("value as imei");
waplxDs同上,输出结果中包含"26E014B8B77C0A442EC31E59505A1CED4D446779"符合预期,如下:
+----------------------------------------+ |imei | +----------------------------------------+ |26E014B8B77C0A442EC31E59505A1CED4D446779| |3902BD5C873086B7D22CECFF73916E644D6A5533| |3A8FC47D656B554EE8772285A6793FEF4F445134| |76AFE3AC3337952787FAF1C1C1F188014D6A4531| |0CDDB52E6CDD2BC085DAD34B1F59EAFD4F445578| |0DD25A855BD894B2EC547D8842AF594C4E6A6B34| |B3141189E2CC830F5E7D6FF2395B3C664D445534| |31CB2E300E8DBF85A52523AD0EF59BD94E446779| |E8D927387934B0829B74AB22EBC4202E4E6A4533| |F32DBA15DEE9BCD619B2FB4A1D8665344F444D35| +----------------------------------------+
在主逻辑代码中new mapPartition 减弱了程序的可读性,因此实现mapPartition类中进行词包匹配:
WordTrieMapPartitionImpl.java
package com.sogo.getimei.entity; import org.apache.spark.api.java.function.MapPartitionsFunction; import org.apache.spark.sql.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.mutable.Seq; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * @Created by IntelliJ IDEA. * @author: liuzhixuan * @Date: 2020/8/29 * @Time: 11:54 * @des: */ public class WordTrieMapPartitionImpl implements MapPartitionsFunction<Row, User2QueriesEntity> { // logger private static final Logger LOGGER = LoggerFactory.getLogger(WordTrieMapPartitionImpl.class); // key word and stop word private static List<WordTrieEntity> wordTrieList; // resource to build wordTrieList private List<AtKwdBo> atKwdBos; // constructor public WordTrieMapPartitionImpl(List<AtKwdBo> atKwdBos) { this.atKwdBos = atKwdBos; } @Override public Iterator<User2QueriesEntity> call(Iterator<Row> iterator) throws Exception { // 在 Excutor 端进行初始化字典树,单例保证仅被初始化一次 WordTrieMapPartitionImpl.wordTrieList = WordTrieMapPartitionImpl.getWordTrieList(atKwdBos); List<User2QueriesEntity> res = new ArrayList<>(); while (iterator.hasNext()) { Row inputRow = iterator.next(); Seq<String> fwords = inputRow.getAs("fwords"); if (Boolean.TRUE.equals(WordTrieEntity.contains(fwords, wordTrieList))) { // JavaConverters 在spark2.3.1版本 //List<String> fwordlist = JavaConverters.seqAsJavaListConverter(fwords).asJava(); // // JavaConverters 在spark3.0.0版本 // List<String> fwordlist = JavaConverters.seqAsJavaList(fwords) List<String> fwordlist = new ArrayList<>(); for (int i = 0; i < fwords.length(); i ++) { fwordlist.add(fwords.apply(i)); } res.add(new User2QueriesEntity(inputRow.getAs("imei"), fwordlist)); } else { // 由于返回User2QueriesEntity对象,需要new一个,不能直接用null替代 res.add(new User2QueriesEntity()); } } return res.iterator(); } /** * 在 Excutor 端进行初始化字典树 * @return 字典树 */ private static List<WordTrieEntity> getWordTrieList(List<AtKwdBo> atKwdBos) { if (null == wordTrieList || wordTrieList.isEmpty()) { synchronized (WordTrieInitEntity.class) { if (null == wordTrieList || wordTrieList.isEmpty()) { wordTrieList = WordTrieEntity.generateKeywordTrieList(atKwdBos); } } } return wordTrieList; } }
User2QueriesEntity.java
package com.sogo.getimei.entity; import lombok.Getter; import lombok.Setter; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import java.io.Serializable; import java.util.List; @Setter @Getter public class User2QueriesEntity implements Serializable { private String imei; private List<String> fwords; public User2QueriesEntity() {} public User2QueriesEntity(String imei, List<String> fwords) { this.imei = imei; this.fwords = fwords; } public static Encoder<User2QueriesEntity> getEncoder() { return Encoders.bean(User2QueriesEntity.class); } }
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); Broadcast<List<AtKwdBo>> AtKwdBoListBc = javaSparkContext.broadcast(AtKwdBo.generateKeyWord().get(1)); Dataset<Row> acDs = waplxDs .mapPartitions(new WordTrieMapPartitionImpl(AtKwdBoListBc.value()), User2QueriesEntity.getEncoder()) .filter("imei is not null") .selectExpr("imei", "explode(fwords) as fwords") .cache(); acDs.show(20, 0); acDs.printSchema();
waplxDS同上,输出结果符合预期,字典树在Excutor端仅被构建一次:
+----------------------------------------+------------------------------+ |imei |fwords | +----------------------------------------+------------------------------+ |26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx5座套 | |26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx3导航 | |26E014B8B77C0A442EC31E59505A1CED4D446779|太靠左右边那么大 | |26E014B8B77C0A442EC31E59505A1CED4D446779|电动车禁了 | |26E014B8B77C0A442EC31E59505A1CED4D446779|说实在的电动车禁了可以减少车祸| |26E014B8B77C0A442EC31E59505A1CED4D446779|小米手机 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装高德 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度 | |26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装carlife | +----------------------------------------+------------------------------+ only showing top 10 rows root |-- imei: string (nullable = true) |-- fwords: string (nullable = true)
在Spark DS 中处理不能被序列化的对象时,要想在Excutor上使用它们,必须在Excutor中被初始化。因为,在Driver端初始化由static和transient修饰的对象(或成员变量)时,不会被发送到Excutor。这就是说,我们需要在Excutor上初始化它们,也即在Excutor执行的算子或方法中初始化它们。另一方面,为了保证在Excutor中仅初始化一次,可以使用单列、broadcast、static的lazy加载等方式。
1 Spark中redis连接池的几种使用方法 http://mufool.com/2017/07/04/spark-redis/
2 java机制:类的加载详解 https://blog.csdn.net/mawei7510/article/details/83412304
3 生成dataset的几种方式 https://www.cnblogs.com/lyy-blog/p/9814662.html
原创声明,本文系作者授权云+社区发表,未经许可,不得转载。
如有侵权,请联系 yunjia_community@tencent.com 删除。
我来说两句