文章1中提到的动态加载外部资源,其实需要重启Spark任务才会生效。受到文章2启动,可以在数据中加入常量列,表示外部资源的地址,并作为UDF的参数(UDF不能输入非数据列,因此用此方法迂回解决问题),再结合文章1的方法,实现同一UDF,动态加载不同资源。本文通过继承GenericUDF类,读取存放在Redis集群中的字符串,构建字典树,完成词包匹配,来说明这一工作。
由于GenericUDF不能通过spark.udf().register(...)的方式注册3,我们将采用文章4方法,即通过在SparkSQL或Hive中创建UDF函数,再调用。
UDF和GenericUDF的区别可参考文章5:
开发自定义UDF函数有两种方式,一个是继承org.apache.hadoop.hive.ql.exec.UDF,另一个是继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF; 如果是针对简单的数据类型(比如String、Integer等)可以使用UDF,如果是针对复杂的数据类型(比如Array、Map、Struct等),可以使用GenericUDF,另外,GenericUDF还可以在函数开始之前和结束之后做一些初始化和关闭的处理操作。
GenericUDF的demo可参考文章6,文章7详细介绍了Generic UDF中的ObjectInspector。
KeyWordSetEntity.java
name字段:两方面作用:1. 在外部存储中,name唯一标记对应资源(如mysql的主键,Redis中的key); 2. 后续UDF中的常量列的值。
keyWordSet字段:外部资源;list结构表示存在多个词包;KeyWordPackage结构表示词包中存在"关键词"和"否词"。
package com.sogo.sparkudf.entity; import com.alibaba.fastjson.JSONObject; import lombok.Getter; import lombok.Setter; import java.io.Serializable; import java.util.*; /** * @Created by IntelliJ IDEA. * @author: liuzhixuan * @Date: 2020/8/31 * @Time: 10:44 * @des: */ @Setter @Getter public class KeyWordSetEntity implements Serializable { // name private String name; // key word set private List<KeyWordPackage> keyWordSet; // constructor public KeyWordSetEntity() {} // constructor public KeyWordSetEntity(String name, List<KeyWordPackage> keyWordSet) { this.name = name; this.keyWordSet = keyWordSet; } // constructor public KeyWordSetEntity(String name, KeyWordPackage... keyWordPackages) { this.name = name; keyWordSet = new ArrayList<>(); Collections.addAll(keyWordSet, keyWordPackages); } @Getter @Setter public static class KeyWordPackage implements Serializable { private Set<String> keywords; private Set<String> stopwords; public KeyWordPackage() {} public KeyWordPackage(Set<String> keywords, Set<String> stopwords) { this.keywords = keywords; this.stopwords = stopwords; } } public static Set<String> generateKeyWordSet(String keyword, String separator) { Set<String> resSet = new HashSet<>(); String[] fields = keyword.split(separator, -1); Collections.addAll(resSet, fields); return resSet; } public static KeyWordSetEntity generateTestData() { Set<String> keywords1 = generateKeyWordSet("小米手机,小米10", ","); Set<String> stopwords1 = generateKeyWordSet("大米", ","); Set<String> keywords2 = generateKeyWordSet("雷军,武汉大学", ","); Set<String> stopwords2 = generateKeyWordSet("联想", ","); KeyWordPackage keyWordPackage1 = new KeyWordPackage(keywords1, stopwords1); KeyWordPackage keyWordPackage2 = new KeyWordPackage(keywords2, stopwords2); String name = "xiaomi_udf"; return new KeyWordSetEntity(name, keyWordPackage1, keyWordPackage2); } public static KeyWordSetEntity generateTestData2() { Set<String> keywords1 = generateKeyWordSet("华为手机,华为荣耀", ","); Set<String> stopwords1 = generateKeyWordSet("小米手机", ","); Set<String> keywords2 = generateKeyWordSet("华为P40", ","); Set<String> stopwords2 = generateKeyWordSet("联想", ","); KeyWordPackage keyWordPackage1 = new KeyWordPackage(keywords1, stopwords1); KeyWordPackage keyWordPackage2 = new KeyWordPackage(keywords2, stopwords2); String name = "huawei_udf"; return new KeyWordSetEntity(name, keyWordPackage1, keyWordPackage2); } public static void main(String[] args) { KeyWordSetEntity keyWordSetEntity = generateTestData2(); String jsonString = JSONObject.toJSONString(keyWordSetEntity, true); System.out.println("jsonString;" + jsonString); } }
WordTrieEntity.java
完成字典树的构建、关键词匹配的工作。
package com.sogo.sparkudf.entity; import com.alibaba.fastjson.JSON; import lombok.Getter; import lombok.Setter; import org.ahocorasick.trie.Trie; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.Seq; import scala.collection.mutable.ArraySeq; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Set; /** * @Created by IntelliJ IDEA. * @author: liuzhixuan * @Date: 2020/8/26 * @Time: 23:08 * @des: */ @Setter @Getter public class WordTrieEntity implements Serializable { // LOGGER private static final Logger LOGGER = LoggerFactory.getLogger(WordTrieEntity.class); // 不被序列化 private transient Trie keywordsTrie; // 不被序列化 private transient Trie stopwordsTrie; public WordTrieEntity(Trie keywordsTrie, Trie stopwordsTrie) { this.keywordsTrie = keywordsTrie; this.stopwordsTrie = stopwordsTrie; } public static List<WordTrieEntity> generateKeywordTrieList(String jsonString) { // get key word KeyWordSetEntity keyWordSetEntity = JSON.parseObject(jsonString, KeyWordSetEntity.class); List<WordTrieEntity> keywordsTrieList = new ArrayList<>(); for (KeyWordSetEntity.KeyWordPackage keyWordPackage: keyWordSetEntity.getKeyWordSet()) { Trie keywordsTrie = buildTrie(keyWordPackage.getKeywords()); Trie stopwordsTrie = buildTrie(keyWordPackage.getStopwords()); keywordsTrieList.add(new WordTrieEntity(keywordsTrie, stopwordsTrie)); } System.out.println("[DEBUG]I am initialized in WordTrieEntity"); return keywordsTrieList; } private static Trie buildTrie(Set<String> stringSet) { return Trie.builder().addKeywords(stringSet).build(); } public static Boolean contains(Seq<String> stringSeq, List<WordTrieEntity> wordTrieList) { // nothing to filter if (null == wordTrieList || wordTrieList.isEmpty()) { return true; } for (WordTrieEntity wordTrie : wordTrieList) { // 词包间是“与”的关系 if (Boolean.FALSE.equals(contains(wordTrie, stringSeq))) { return false; } } return true; } public static Boolean contains(List<String> stringSeq, List<WordTrieEntity> wordTrieList) { // nothing to filter if (null == wordTrieList || wordTrieList.isEmpty()) { return true; } for (WordTrieEntity wordTrie : wordTrieList) { // 词包间是“与”的关系 if (Boolean.FALSE.equals(contains(wordTrie, stringSeq))) { return false; } } return true; } private static Boolean contains(WordTrieEntity wordTrie, Seq<String> stringSeq) { // 只要存在一个即可 for (int i = 0; i < stringSeq.size(); i ++) // 词包内是“或”的关系 if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.apply(i)))) { return true; } // 都不存在时,返回false return false; } private static Boolean contains(WordTrieEntity wordTrie, List<String> stringSeq) { // 只要存在一个即可 for (int i = 0; i < stringSeq.size(); i ++) // 词包内是“或”的关系 if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.get(i)))) { return true; } // 都不存在时,返回false return false; } private static Boolean contains(WordTrieEntity wordTrie, String query) { // 否词 if (null != wordTrie.getStopwordsTrie() && wordTrie.getStopwordsTrie().containsMatch(query)) { return false; } // 匹配关键词 if (null == wordTrie.getKeywordsTrie()) { LOGGER.error("keyword is null"); } return null != wordTrie.getKeywordsTrie() && wordTrie.getKeywordsTrie().containsMatch(query); } private static Seq<String> list2Seq(List<String> list) { Seq<String> stringSeq = new ArraySeq<>(list.size()); for (int i = 0; i < list.size(); i ++) { ((ArraySeq<String>) stringSeq).update(i, list.get(i)); } return stringSeq; } }
RedisClusterConnector.java
存放外部资源(词包),用于构建字典树。单列模式保证了redis实例仅被初始化一次。
package com.sogo.sparkudf.connnector; import com.alibaba.fastjson.JSON; import com.sogo.sparkudf.entity.KeyWordSetEntity; import lombok.Getter; import lombok.Setter; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import java.io.Serializable; import java.util.HashSet; import java.util.Set; /** * @Created by IntelliJ IDEA. * @author: liuzhixuan * @Date: 2020/8/20 * @Time: 22:25 * @des: */ @Setter @Getter public class RedisClusterConnector implements Serializable { // LOGGER private static final Logger LOGGER = LoggerFactory.getLogger(RedisClusterConnector.class); // jedis cluster instance private static transient JedisCluster jedisCluster; public static JedisCluster getRedisCluster() { if (null == jedisCluster) { synchronized (RedisClusterConnector.class) { if (null == jedisCluster) { LOGGER.warn("[JUST_FOR_DEBUG] Test how many times the function is called"); // jedis pool config GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); poolConfig.setMaxIdle(50); poolConfig.setMinIdle(0); poolConfig.setMaxTotal(50); // host and port Set<HostAndPort> nodes = new HashSet<>(); nodes.add(new HostAndPort("10.162.48.186", 6001)); nodes.add(new HostAndPort("10.162.48.186", 6020)); nodes.add(new HostAndPort("10.162.48.225", 6001)); nodes.add(new HostAndPort("10.162.48.225", 6020)); // init int connectionTimeout = 10000; int soTimeout = 5000; int maxAttempts = 2; jedisCluster = new JedisCluster(nodes, connectionTimeout, soTimeout, maxAttempts, poolConfig); } } } return jedisCluster; } public static void close() { if (null != jedisCluster) { synchronized (RedisClusterConnector.class) { if (null != jedisCluster) { jedisCluster.close(); } } } } public static void main(String[] args) { JedisCluster jedisCluster = RedisClusterConnector.getRedisCluster(); KeyWordSetEntity keyWordSetEntity = KeyWordSetEntity.generateTestData(); String udfName = keyWordSetEntity.getName(); String value = JSON.toJSONString(keyWordSetEntity); jedisCluster.set("keyword_package_" + udfName, value); String s = jedisCluster.get("keyword_package_" + udfName); System.out.println("value:" + s); } }
KeyWordKeyFilterUdf.java
package com.sogo.sparkudf.udf; import com.sogo.sparkudf.connnector.RedisClusterConnector; import com.sogo.sparkudf.entity.WordTrieEntity; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.*; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCluster; import java.util.*; /** * @Created by IntelliJ IDEA. * @author: liuzhixuan * @Date: 2020/8/29 * @Time: 10:58 * @des: reference from * https://www.jianshu.com/p/ca9dce6b5c37 * https://www.jianshu.com/p/ba0e54579cc4 */ /** * @Describtion 注解是可选的,用于对函数进行说明,其中的FUNC字符串表示函数名, * 当使用DESCRIBE FUNCTION命令时,替换成函数名。其包含三个属性: * * name:用于指定Hive中的函数名。 * value:用于描述函数的参数。 * extended:额外的说明,如,给出示例。当使用DESCRIBE FUNCTION EXTENDED name的时候打印。 * 链接:https://www.jianshu.com/p/ca9dce6b5c37 */ @Description( name = "keyword_match_udf", value = "_FUNC_(queries, 'keyword_package') - from the input string" + "returns true if queries contain keyword_package", extended = "Example:\n" + " > SELECT _FUNC_(queries, 'keyword_package') FROM src;" ) public class KeyWordKeyFilterUdf extends GenericUDF { // LOGGER private static final Logger LOGGER = LoggerFactory.getLogger(KeyWordKeyFilterUdf.class); // prefix of redis key private static final String PREFIX_REDIS_KEY = "keyword_package_"; // key word trie private static Map<String, List<WordTrieEntity>> dictTrie = new HashMap<>(); // object inspector: String private static final ObjectInspector valueOI = PrimitiveObjectInspectorFactory .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING); // string object inspector: for parse string private static final StringObjectInspector stringOI = (StringObjectInspector) valueOI; // object inspector: list<string> private static final ListObjectInspector arrayOI = ObjectInspectorFactory.getStandardListObjectInspector(valueOI); // redis cluster client private static JedisCluster jedisCluster; /** * Additionally setup GenericUDF with MapredContext before initializing. * This is only called in runtime of MapRedTask. * * @param context context */ @Override public void configure(MapredContext context) { /* org.apache.hadoop.mapred.JobConf jobConf = context.getJobConf(); String user = jobConf.getUser(); String jobName = jobConf.getJobName(); String queueName = jobConf.getQueueName(); int numMapTasks = jobConf.getNumMapTasks(); int numReduceTasks = jobConf.getNumReduceTasks(); int maxMapAttempts = jobConf.getMaxMapAttempts(); int maxReduceAttempts = jobConf.getMaxReduceAttempts(); LOGGER.warn("[JUST_FOR_DEBUG] user:{}, jobName:{}, queueName:{}, " + "numMapTasks:{}, numReduceTasks:{}, maxMapAttempts:{}, maxReduceAttempts:{}", user, jobName, queueName, numMapTasks, numReduceTasks, maxMapAttempts, maxReduceAttempts); */ } //这个方法只调用一次,并且在evaluate()方法之前调用。该方法接受的参数是一个ObjectInspectors数组。 // 该方法检查接受正确的参数类型和参数个数。 // 注:每次执行sql前,都会运行 @Override public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException { // 输入1个元素:1. query if (objectInspectors.length < 2) { throw new UDFArgumentLengthException("param invalid: 1->query word, 2->type"); } // ObjectInspector.Category.LIST if (!objectInspectors[0].getCategory().equals(arrayOI.getCategory())) { throw new UDFArgumentTypeException(0, "[" + arrayOI.getTypeName() + "] type is needed, " + "but [" + objectInspectors[0].getTypeName() + "] type is found"); } // ObjectInspector.Category.PRIMITIVE if (!objectInspectors[1].getCategory().equals(valueOI.getCategory())) { throw new UDFArgumentTypeException(1, "[" + valueOI.getTypeName() + "] type is needed, " + "but [" + objectInspectors[1].getTypeName() + "] is found"); } // init jedis cluster jedisCluster = RedisClusterConnector.getRedisCluster(); // getUdfName:返回包名; getFuncName返回包名的最后一个字段,仅截取第10个字符以后的字符串 LOGGER.warn("[JUST_FOR_DEBUG] getUdfName:[{}], getFuncName:[{}]", getUdfName(), getFuncName()); return PrimitiveObjectInspectorFactory .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN); //定义函数的返回类型为Java的list //ObjectInspector returnOi = PrimitiveObjectInspectorFactory // .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING); //return ObjectInspectorFactory.getStandardListObjectInspector(returnOi); } // 这个方法类似UDF的evaluate()方法。它处理真实的参数,并返回最终结果。 @Override public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { // 读取传入UDF的第一个字段 Object queries = deferredObjects[0].get(); // 读取传入UDF的第二个字段 Object keywordPackageTag = deferredObjects[1].get(); if (null == queries || null == keywordPackageTag) { return false; } // 解析String字段(方式一):StringObjectInspector 解析 String keywordPackageName = stringOI.getPrimitiveJavaObject(keywordPackageTag); // 解析String字段(方式二):toString() // String udfName = deferredObjects[1].get().toString(); // 解析 List<String> 字段(方式一):一起解析 List<String> queryList = (List<String>) arrayOI.getList(queries); /* // 解析 List<String> 字段(方式二):逐个元素解析 int listLength = arrayOI.getListLength(queries); List<String> queryList = new ArrayList<>(listLength); for (int i = 0; i < listLength; i ++) { String s = arrayOI.getListElement(queries, i).toString(); queryList.add(s); } */ List<WordTrieEntity> dict = generateFromRedis(keywordPackageName); return WordTrieEntity.contains(queryList, dict); } // 里面写一些介绍性信息,在用户对sql语句进行explain的时候显示 @Override public String getDisplayString(String[] strings) { if (null == strings || strings.length == 0) { return "null or empty"; } else { return String.join(";", strings); } } /** * 从Redis 中读取词包,在此处构建字典树 * @param keywordPackageName * @return */ private static List<WordTrieEntity> generateFromRedis(String keywordPackageName) { if (null == dictTrie || dictTrie.isEmpty() || !dictTrie.containsKey(keywordPackageName)) { synchronized (KeyWordKeyFilterUdf.class) { if (null == dictTrie || dictTrie.isEmpty()) { dictTrie = new HashMap<>(); } if (!dictTrie.containsKey(keywordPackageName)) { LOGGER.warn("[JUST_FOR_DEBUG] Build dict trie by keyword: [{}]", keywordPackageName); String redisKey = PREFIX_REDIS_KEY + keywordPackageName; if (null == jedisCluster) { jedisCluster = RedisClusterConnector.getRedisCluster(); LOGGER.error("Something happened to jedis cluster, reconnect to it"); } String value = jedisCluster.get(redisKey); if (StringUtils.isEmpty(value)) { LOGGER.error("Cannot load keyword from redis by key:[{}]", redisKey); return new ArrayList<>(); } List<WordTrieEntity> wordTrieEntityList = WordTrieEntity.generateKeywordTrieList(value); dictTrie.put(keywordPackageName, wordTrieEntityList); } } } return dictTrie.get(keywordPackageName); } public static void main(String[] args) { // WordKeyFilterUdf wordKeyFilterUdf = new WordKeyFilterUdf(); // Set<String> dict = WordKeyFilterUdf.generateMap("sogo_dict_udf"); // System.out.println("trueOrFalse:" + dict.contains("sogo")); // System.out.println("trueOrFalse:" + dict.contains("sogo1")); // dict = WordKeyFilterUdf.generateMap("xiaomi_dict_udf"); // System.out.println("trueOrFalse:" + dict.contains("miui")); // System.out.println("trueOrFalse:" + dict.contains("sougou")); // // List<WordTrieEntity> wordTrieEntityList = WordKeyFilterUdf.generateFromRedis("xiaomi_udf", ""); // Seq<String> stringSeq = new ArraySeq<>(1); // String query = "小米10周年,雷军"; // stringSeq.update(0, query); // System.out.println("trueOrFalse:" + WordTrieEntity.contains(stringSeq, wordTrieEntityList)); StringObjectInspector stringObjectInspector = (StringObjectInspector) valueOI; System.out.println("stringObjectInspector:" + stringObjectInspector.getTypeName()); System.out.println("valueOI:" + valueOI.getTypeName()); } }
将上述代码打包后的文件名为:
sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
在开发机上的地址为:
/search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
HiveSQL/SparkSQL
进入Hive或SparkSQL环境后,执行
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar; CREATE TEMPORARY FUNCTION keyword_udf AS 'com.sogo.sparkudf.udf.KeyWordKeyFilterUdf'; show functions;
PySpark
进入PySpark环境后,执行
spark.sql("ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar") spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION keyword_udf AS 'com.sogo.sparkudf.udf.KeyWordKeyFilterUdf'") spark.sql("show user functions").show(10,0)
以PySpark中测试为列:
testDs.show(4)
DataFrame[imei: string, fwords: array<string>] +--------------------+--------------------+ | imei| fwords| +--------------------+--------------------+ |00003AC86C0E62825...| [鬼谷子狼道等十本书]| |00005FD5EA9B96624...|[哄女友, 后来, 古, 火焰切割...| |00006231671F8272E...| [欧尚]| |00007A428750D7C19...| [公仔迷你]| +--------------------+--------------------+
测试 1: 测试UDF是否能正常运行
testDs.registerTempTable('testDs') xiaomi = spark.sql("select imei,fwords from testDs where keyword_udf(fwords, 'xiaomi_udf')!=0") xiaomi.show(10,0)
结果:
每执行1条SQL,打印一条日志,结果符合预期
20/09/01 15:44:12 WARN KeyWordKeyFilterUdf: [DEBUG] getUdfName:[com.sogo.sparkudf.udf.KeyWordKeyFilterUdf], getFuncName:[filterudf]
过滤结果符合预期
+----------------------------------------+--------+ |imei |fword | +----------------------------------------+--------+ |82C455E4845CA8C50ABFCCDAE80FFB1D4F444135|小米手机的 | |82C455E4845CA8C50ABFCCDAE80FFB1D4F444135|小米主题 | |08B687D554A238008EA117049A87776C4E6A6730|小米 | |08B687D554A238008EA117049A87776C4E6A6730|小米书包 | |08B687D554A238008EA117049A87776C4E6A6730|小米10 | |08B687D554A238008EA117049A87776C4E6A6730|小米10至尊宝 | |08B687D554A238008EA117049A87776C4E6A6730|小米10至尊 | |08B687D554A238008EA117049A87776C4E6A6730|小米旅行箱青春版| |08B687D554A238008EA117049A87776C4E6A6730|如何进入小米 | |08B687D554A238008EA117049A87776C4E6A6730|小米 | +----------------------------------------+--------+
测试 2: 测试动态加载词包
在测试1的基础上,直接运行华为词包
huawei = spark.sql("select imei,fwords from testDs where keyword_udf(fwords, 'huawei_udf')!=0") huawei.show(10,0)
输出结果符合预期
20/09/01 16:02:41 WARN KeyWordKeyFilterUdf: [DEBUG] getUdfName:[com.sogo.sparkudf.udf.KeyWordKeyFilterUdf], getFuncName:[filterudf] +----------------------------------------+-------------------+ |imei |fword | +----------------------------------------+-------------------+ |52686FA528D898ECE0F30EDF43A2E4B94D444D33|华为P40pro | |52686FA528D898ECE0F30EDF43A2E4B94D444D33|华为手机 | |FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为手机屏幕最左侧有色线 | |FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为P40Pro侧边色线 | |FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为 | |FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为手机屏幕最左侧有色线 | |C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|华为P40开发者xuanxiang选项| |C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|华为P40kaifazhe开发者 | |C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|电脑怎么用华为手机怎么共享网络 | |C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|huawei华为 | +----------------------------------------+-------------------+
为了使用同一个UDF动态加载不同的词包(词包可以无限扩展),通过构建常量列的方式,补充UDF不能传入非数据列,最终实现了动态加载词包的功能。当然,我们还应删除过期得词包,以节约资源占用。
1 Spark UDF加载外部资源 https://cloud.tencent.com/developer/article/1688828
4 Spark UDF实现demo https://cloud.tencent.com/developer/article/1672068
5 Hive udf、UDF、GenericUDF https://blog.csdn.net/wangshuminjava/article/details/79663998
6 Hive- UDF&GenericUDF https://www.jianshu.com/p/ca9dce6b5c37
7 Hive之ObjectInspector接口解析笔记 https://blog.csdn.net/weixin_39469127/article/details/89739285
原创声明,本文系作者授权云+社区发表,未经许可,不得转载。
如有侵权,请联系 yunjia_community@tencent.com 删除。
我来说两句