前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark GenericUDF动态加载外部资源

Spark GenericUDF动态加载外部资源

原创
作者头像
mikeLiu
修改2020-09-01 17:54:37
2.5K0
修改2020-09-01 17:54:37
举报
文章被收录于专栏:技术学习技术学习

Spark GenericUDF动态加载外部资源

前言

文章1中提到的动态加载外部资源,其实需要重启Spark任务才会生效。受到文章2启动,可以在数据中加入常量列,表示外部资源的地址,并作为UDF的参数(UDF不能输入非数据列,因此用此方法迂回解决问题),再结合文章1的方法,实现同一UDF,动态加载不同资源。本文通过继承GenericUDF类,读取存放在Redis集群中的字符串,构建字典树,完成词包匹配,来说明这一工作。

由于GenericUDF不能通过spark.udf().register(...)的方式注册3,我们将采用文章4方法,即通过在SparkSQL或Hive中创建UDF函数,再调用。

UDF和GenericUDF的区别

UDF和GenericUDF的区别可参考文章5:

开发自定义UDF函数有两种方式,一个是继承org.apache.hadoop.hive.ql.exec.UDF,另一个是继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF; 如果是针对简单的数据类型(比如String、Integer等)可以使用UDF,如果是针对复杂的数据类型(比如Array、Map、Struct等),可以使用GenericUDF,另外,GenericUDF还可以在函数开始之前和结束之后做一些初始化和关闭的处理操作。

GenericUDF的demo可参考文章6,文章7详细介绍了Generic UDF中的ObjectInspector。

准备工作

外部资源的数据结构

KeyWordSetEntity.java

name字段:两方面作用:1. 在外部存储中,name唯一标记对应资源(如mysql的主键,Redis中的key); 2. 后续UDF中的常量列的值。

keyWordSet字段:外部资源;list结构表示存在多个词包;KeyWordPackage结构表示词包中存在"关键词"和"否词"。

代码语言:txt
复制
package com.sogo.sparkudf.entity;

import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;
import java.util.*;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/31
 * @Time: 10:44
 * @des:
 */
@Setter
@Getter
public class KeyWordSetEntity implements Serializable {
    // name
    private String name;
    // key word set
    private List<KeyWordPackage> keyWordSet;
    // constructor
    public KeyWordSetEntity() {}
    // constructor
    public KeyWordSetEntity(String name, List<KeyWordPackage> keyWordSet) {
        this.name = name;
        this.keyWordSet = keyWordSet;
    }
    // constructor
    public KeyWordSetEntity(String name, KeyWordPackage... keyWordPackages) {
        this.name = name;
        keyWordSet = new ArrayList<>();
        Collections.addAll(keyWordSet, keyWordPackages);
    }

    @Getter
    @Setter
    public static class KeyWordPackage implements Serializable {
        private Set<String> keywords;
        private Set<String> stopwords;

        public KeyWordPackage() {}

        public KeyWordPackage(Set<String> keywords, Set<String> stopwords) {
            this.keywords = keywords;
            this.stopwords = stopwords;
        }
    }

    public static Set<String> generateKeyWordSet(String keyword, String separator) {
        Set<String> resSet = new HashSet<>();
        String[] fields = keyword.split(separator, -1);
        Collections.addAll(resSet, fields);
        return resSet;
    }

    public static KeyWordSetEntity generateTestData() {
        Set<String> keywords1 = generateKeyWordSet("小米手机,小米10", ",");
        Set<String> stopwords1 = generateKeyWordSet("大米", ",");
        Set<String> keywords2 = generateKeyWordSet("雷军,武汉大学", ",");
        Set<String> stopwords2 = generateKeyWordSet("联想", ",");

        KeyWordPackage keyWordPackage1 = new KeyWordPackage(keywords1, stopwords1);
        KeyWordPackage keyWordPackage2 = new KeyWordPackage(keywords2, stopwords2);

        String name = "xiaomi_udf";
        return new KeyWordSetEntity(name, keyWordPackage1, keyWordPackage2);
    }

    public static KeyWordSetEntity generateTestData2() {
        Set<String> keywords1 = generateKeyWordSet("华为手机,华为荣耀", ",");
        Set<String> stopwords1 = generateKeyWordSet("小米手机", ",");
        Set<String> keywords2 = generateKeyWordSet("华为P40", ",");
        Set<String> stopwords2 = generateKeyWordSet("联想", ",");

        KeyWordPackage keyWordPackage1 = new KeyWordPackage(keywords1, stopwords1);
        KeyWordPackage keyWordPackage2 = new KeyWordPackage(keywords2, stopwords2);

        String name = "huawei_udf";
        return new KeyWordSetEntity(name, keyWordPackage1, keyWordPackage2);
    }

    public static void main(String[] args) {
        KeyWordSetEntity keyWordSetEntity = generateTestData2();
        String jsonString = JSONObject.toJSONString(keyWordSetEntity, true);
        System.out.println("jsonString;" + jsonString);
    }
}

字典树数据类型

WordTrieEntity.java

完成字典树的构建、关键词匹配的工作。

代码语言:txt
复制
package com.sogo.sparkudf.entity;

import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.ahocorasick.trie.Trie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Seq;
import scala.collection.mutable.ArraySeq;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/26
 * @Time: 23:08
 * @des:
 */
@Setter
@Getter
public class WordTrieEntity implements Serializable {
    // LOGGER
    private static final Logger LOGGER = LoggerFactory.getLogger(WordTrieEntity.class);
    // 不被序列化
    private transient Trie keywordsTrie;
    // 不被序列化
    private transient Trie stopwordsTrie;

    public WordTrieEntity(Trie keywordsTrie, Trie stopwordsTrie) {
        this.keywordsTrie = keywordsTrie;
        this.stopwordsTrie = stopwordsTrie;
    }

    public static List<WordTrieEntity> generateKeywordTrieList(String jsonString) {
        // get key word
        KeyWordSetEntity keyWordSetEntity = JSON.parseObject(jsonString, KeyWordSetEntity.class);
        List<WordTrieEntity> keywordsTrieList = new ArrayList<>();
        for (KeyWordSetEntity.KeyWordPackage keyWordPackage: keyWordSetEntity.getKeyWordSet()) {
            Trie keywordsTrie = buildTrie(keyWordPackage.getKeywords());
            Trie stopwordsTrie = buildTrie(keyWordPackage.getStopwords());
            keywordsTrieList.add(new WordTrieEntity(keywordsTrie, stopwordsTrie));
        }
        System.out.println("[DEBUG]I am initialized in WordTrieEntity");
        return keywordsTrieList;
    }

    private static Trie buildTrie(Set<String> stringSet) {
        return Trie.builder().addKeywords(stringSet).build();
    }

    public static Boolean contains(Seq<String> stringSeq, List<WordTrieEntity> wordTrieList) {
        // nothing to filter
        if (null == wordTrieList || wordTrieList.isEmpty()) {
            return true;
        }
        for (WordTrieEntity wordTrie : wordTrieList) {
            // 词包间是“与”的关系
            if (Boolean.FALSE.equals(contains(wordTrie, stringSeq))) {
                return false;
            }
        }
        return true;
    }

    public static Boolean contains(List<String> stringSeq, List<WordTrieEntity> wordTrieList) {
        // nothing to filter
        if (null == wordTrieList || wordTrieList.isEmpty()) {
            return true;
        }
        for (WordTrieEntity wordTrie : wordTrieList) {
            // 词包间是“与”的关系
            if (Boolean.FALSE.equals(contains(wordTrie, stringSeq))) {
                return false;
            }
        }
        return true;
    }

    private static Boolean contains(WordTrieEntity wordTrie, Seq<String> stringSeq) {
        // 只要存在一个即可
        for (int i = 0; i < stringSeq.size(); i ++)
            // 词包内是“或”的关系
            if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.apply(i)))) {
                return true;
            }
        // 都不存在时,返回false
        return false;
    }

    private static Boolean contains(WordTrieEntity wordTrie, List<String> stringSeq) {
        // 只要存在一个即可
        for (int i = 0; i < stringSeq.size(); i ++)
            // 词包内是“或”的关系
            if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.get(i)))) {
                return true;
            }
        // 都不存在时,返回false
        return false;
    }

    private static Boolean contains(WordTrieEntity wordTrie, String query) {
        // 否词
        if (null != wordTrie.getStopwordsTrie() && wordTrie.getStopwordsTrie().containsMatch(query)) {
            return false;
        }
        // 匹配关键词
        if (null == wordTrie.getKeywordsTrie()) {
            LOGGER.error("keyword is null");
        }
        return null != wordTrie.getKeywordsTrie() && wordTrie.getKeywordsTrie().containsMatch(query);
    }

    private static Seq<String> list2Seq(List<String> list) {
        Seq<String> stringSeq = new ArraySeq<>(list.size());
        for (int i = 0; i < list.size(); i ++) {
            ((ArraySeq<String>) stringSeq).update(i, list.get(i));
        }
        return stringSeq;
    }
}

Redis Cluster的单例

RedisClusterConnector.java

存放外部资源(词包),用于构建字典树。单列模式保证了redis实例仅被初始化一次。

代码语言:txt
复制
package com.sogo.sparkudf.connnector;

import com.alibaba.fastjson.JSON;
import com.sogo.sparkudf.entity.KeyWordSetEntity;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/20
 * @Time: 22:25
 * @des:
 */
@Setter
@Getter
public class RedisClusterConnector implements Serializable {
    // LOGGER
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisClusterConnector.class);
    // jedis cluster instance
    private static transient JedisCluster jedisCluster;

    public static JedisCluster getRedisCluster() {
        if (null == jedisCluster) {
            synchronized (RedisClusterConnector.class) {
                if (null == jedisCluster) {
                    LOGGER.warn("[JUST_FOR_DEBUG] Test how many times the function is called");
                    // jedis pool config
                    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
                    poolConfig.setMaxIdle(50);
                    poolConfig.setMinIdle(0);
                    poolConfig.setMaxTotal(50);
                    // host and port
                    Set<HostAndPort> nodes = new HashSet<>();
                    nodes.add(new HostAndPort("10.162.48.186", 6001));
                    nodes.add(new HostAndPort("10.162.48.186", 6020));
                    nodes.add(new HostAndPort("10.162.48.225", 6001));
                    nodes.add(new HostAndPort("10.162.48.225", 6020));
                    // init
                    int connectionTimeout = 10000;
                    int soTimeout = 5000;
                    int maxAttempts = 2;
                    jedisCluster = new JedisCluster(nodes, connectionTimeout, soTimeout, maxAttempts, poolConfig);
                }
            }
        }
        return jedisCluster;
    }

    public static void close() {
        if (null != jedisCluster) {
            synchronized (RedisClusterConnector.class) {
                if (null != jedisCluster) {
                    jedisCluster.close();
                }
            }
        }
    }

    public static void main(String[] args) {

        JedisCluster jedisCluster = RedisClusterConnector.getRedisCluster();
        KeyWordSetEntity keyWordSetEntity = KeyWordSetEntity.generateTestData();
        String udfName = keyWordSetEntity.getName();
        String value = JSON.toJSONString(keyWordSetEntity);
        jedisCluster.set("keyword_package_" + udfName, value);
        String s = jedisCluster.get("keyword_package_" + udfName);
        System.out.println("value:" + s);
    }
}

继承GenericUDF

KeyWordKeyFilterUdf.java

代码语言:txt
复制
package com.sogo.sparkudf.udf;

import com.sogo.sparkudf.connnector.RedisClusterConnector;
import com.sogo.sparkudf.entity.WordTrieEntity;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;

import java.util.*;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/29
 * @Time: 10:58
 * @des: reference from
 * https://www.jianshu.com/p/ca9dce6b5c37
 * https://www.jianshu.com/p/ba0e54579cc4
 */

/**
 * @Describtion 注解是可选的,用于对函数进行说明,其中的FUNC字符串表示函数名,
 * 当使用DESCRIBE FUNCTION命令时,替换成函数名。其包含三个属性:
 *
 * name:用于指定Hive中的函数名。
 * value:用于描述函数的参数。
 * extended:额外的说明,如,给出示例。当使用DESCRIBE FUNCTION EXTENDED name的时候打印。
 * 链接:https://www.jianshu.com/p/ca9dce6b5c37
 */
@Description(
        name = "keyword_match_udf",
        value = "_FUNC_(queries, 'keyword_package') - from the input string"
                + "returns true if queries contain keyword_package",
        extended = "Example:\n"
                + " > SELECT _FUNC_(queries, 'keyword_package') FROM src;"
)
public class KeyWordKeyFilterUdf extends GenericUDF {
    // LOGGER
    private static final Logger LOGGER = LoggerFactory.getLogger(KeyWordKeyFilterUdf.class);
    // prefix of redis key
    private static final String PREFIX_REDIS_KEY = "keyword_package_";
    // key word trie
    private static Map<String, List<WordTrieEntity>> dictTrie = new HashMap<>();
    // object inspector: String
    private static final ObjectInspector valueOI = PrimitiveObjectInspectorFactory
            .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);
    // string object inspector: for parse string
    private static final StringObjectInspector stringOI = (StringObjectInspector) valueOI;
    // object inspector: list<string>
    private static final ListObjectInspector arrayOI = ObjectInspectorFactory.getStandardListObjectInspector(valueOI);
    // redis cluster client
    private static JedisCluster jedisCluster;

    /**
     * Additionally setup GenericUDF with MapredContext before initializing.
     * This is only called in runtime of MapRedTask.
     *
     * @param context context
     */
    @Override
    public void configure(MapredContext context) {
        /*
        org.apache.hadoop.mapred.JobConf jobConf = context.getJobConf();
        String user = jobConf.getUser();
        String jobName = jobConf.getJobName();
        String queueName = jobConf.getQueueName();
        int numMapTasks = jobConf.getNumMapTasks();
        int numReduceTasks = jobConf.getNumReduceTasks();
        int maxMapAttempts = jobConf.getMaxMapAttempts();
        int maxReduceAttempts = jobConf.getMaxReduceAttempts();
        LOGGER.warn("[JUST_FOR_DEBUG] user:{}, jobName:{}, queueName:{}, " +
                "numMapTasks:{}, numReduceTasks:{}, maxMapAttempts:{}, maxReduceAttempts:{}",
                user, jobName, queueName, numMapTasks, numReduceTasks, maxMapAttempts, maxReduceAttempts);
        */
    }

    //这个方法只调用一次,并且在evaluate()方法之前调用。该方法接受的参数是一个ObjectInspectors数组。
    // 该方法检查接受正确的参数类型和参数个数。
    // 注:每次执行sql前,都会运行
    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // 输入1个元素:1. query
        if (objectInspectors.length < 2) {
            throw new UDFArgumentLengthException("param invalid: 1->query word, 2->type");
        }
        // ObjectInspector.Category.LIST
        if (!objectInspectors[0].getCategory().equals(arrayOI.getCategory())) {
            throw new UDFArgumentTypeException(0, "[" + arrayOI.getTypeName() + "] type is needed, " +
                    "but [" + objectInspectors[0].getTypeName() + "] type is found");
        }
        // ObjectInspector.Category.PRIMITIVE
        if (!objectInspectors[1].getCategory().equals(valueOI.getCategory())) {
            throw new UDFArgumentTypeException(1, "[" + valueOI.getTypeName() + "] type is needed, " +
                    "but [" + objectInspectors[1].getTypeName() + "] is found");
        }
        // init jedis cluster
        jedisCluster = RedisClusterConnector.getRedisCluster();

        // getUdfName:返回包名; getFuncName返回包名的最后一个字段,仅截取第10个字符以后的字符串
        LOGGER.warn("[JUST_FOR_DEBUG] getUdfName:[{}], getFuncName:[{}]", getUdfName(), getFuncName());

        return PrimitiveObjectInspectorFactory
                .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN);
        //定义函数的返回类型为Java的list
        //ObjectInspector returnOi = PrimitiveObjectInspectorFactory
        // .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);
        //return ObjectInspectorFactory.getStandardListObjectInspector(returnOi);
    }

    // 这个方法类似UDF的evaluate()方法。它处理真实的参数,并返回最终结果。
    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        // 读取传入UDF的第一个字段
        Object queries = deferredObjects[0].get();
        // 读取传入UDF的第二个字段
        Object keywordPackageTag = deferredObjects[1].get();
        if (null == queries || null == keywordPackageTag) {
            return false;
        }
        // 解析String字段(方式一):StringObjectInspector 解析
        String keywordPackageName = stringOI.getPrimitiveJavaObject(keywordPackageTag);
        // 解析String字段(方式二):toString()
        // String udfName = deferredObjects[1].get().toString();

        // 解析 List<String> 字段(方式一):一起解析
        List<String> queryList = (List<String>) arrayOI.getList(queries);
        /*
        // 解析 List<String> 字段(方式二):逐个元素解析
        int listLength = arrayOI.getListLength(queries);
        List<String> queryList = new ArrayList<>(listLength);
        for (int i = 0; i < listLength; i ++) {
            String s = arrayOI.getListElement(queries, i).toString();
            queryList.add(s);
        }
        */
        List<WordTrieEntity> dict = generateFromRedis(keywordPackageName);
        return WordTrieEntity.contains(queryList, dict);
    }

    // 里面写一些介绍性信息,在用户对sql语句进行explain的时候显示
    @Override
    public String getDisplayString(String[] strings) {
        if (null == strings || strings.length == 0) {
            return "null or empty";
        } else {
            return String.join(";", strings);
        }
    }

    /**
     * 从Redis 中读取词包,在此处构建字典树
     * @param keywordPackageName
     * @return
     */
    private static List<WordTrieEntity> generateFromRedis(String keywordPackageName) {
        if (null == dictTrie || dictTrie.isEmpty() || !dictTrie.containsKey(keywordPackageName)) {
            synchronized (KeyWordKeyFilterUdf.class) {
                if (null == dictTrie || dictTrie.isEmpty()) {
                    dictTrie = new HashMap<>();
                }
                if (!dictTrie.containsKey(keywordPackageName)) {
                    LOGGER.warn("[JUST_FOR_DEBUG] Build dict trie by keyword: [{}]", keywordPackageName);
                    String redisKey = PREFIX_REDIS_KEY + keywordPackageName;
                    if (null == jedisCluster) {
                        jedisCluster = RedisClusterConnector.getRedisCluster();
                        LOGGER.error("Something happened to jedis cluster, reconnect to it");
                    }
                    String value = jedisCluster.get(redisKey);
                    if (StringUtils.isEmpty(value)) {
                        LOGGER.error("Cannot load keyword from redis by key:[{}]", redisKey);
                        return new ArrayList<>();
                    }
                    List<WordTrieEntity> wordTrieEntityList = WordTrieEntity.generateKeywordTrieList(value);
                    dictTrie.put(keywordPackageName, wordTrieEntityList);
                }
            }
        }
        return dictTrie.get(keywordPackageName);
    }

    public static void main(String[] args) {
//        WordKeyFilterUdf wordKeyFilterUdf = new WordKeyFilterUdf();
//        Set<String> dict = WordKeyFilterUdf.generateMap("sogo_dict_udf");
//        System.out.println("trueOrFalse:" + dict.contains("sogo"));
//        System.out.println("trueOrFalse:" + dict.contains("sogo1"));
//        dict = WordKeyFilterUdf.generateMap("xiaomi_dict_udf");
//        System.out.println("trueOrFalse:" + dict.contains("miui"));
//        System.out.println("trueOrFalse:" + dict.contains("sougou"));
//
//        List<WordTrieEntity> wordTrieEntityList = WordKeyFilterUdf.generateFromRedis("xiaomi_udf", "");
//        Seq<String> stringSeq = new ArraySeq<>(1);
//        String query = "小米10周年,雷军";
//        stringSeq.update(0, query);
//        System.out.println("trueOrFalse:" + WordTrieEntity.contains(stringSeq, wordTrieEntityList));
        StringObjectInspector stringObjectInspector = (StringObjectInspector) valueOI;
        System.out.println("stringObjectInspector:" + stringObjectInspector.getTypeName());
        System.out.println("valueOI:" + valueOI.getTypeName());
    }
}

注册UDF

打包

将上述代码打包后的文件名为:

代码语言:txt
复制
sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar

发送至开发机

在开发机上的地址为:

代码语言:txt
复制
/search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar

注册

HiveSQL/SparkSQL

进入Hive或SparkSQL环境后,执行

代码语言:txt
复制
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar;
CREATE TEMPORARY FUNCTION keyword_udf  AS 'com.sogo.sparkudf.udf.KeyWordKeyFilterUdf';
show functions;

PySpark

进入PySpark环境后,执行

代码语言:txt
复制
spark.sql("ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION keyword_udf  AS 'com.sogo.sparkudf.udf.KeyWordKeyFilterUdf'")
spark.sql("show user functions").show(10,0)

测试

以PySpark中测试为列:

测试数据

testDs.show(4)

代码语言:txt
复制
DataFrame[imei: string, fwords: array<string>]
+--------------------+--------------------+
|                imei|              fwords|
+--------------------+--------------------+
|00003AC86C0E62825...|         [鬼谷子狼道等十本书]|
|00005FD5EA9B96624...|[哄女友, 后来, 古, 火焰切割...|
|00006231671F8272E...|                [欧尚]|
|00007A428750D7C19...|              [公仔迷你]|
+--------------------+--------------------+

关键词匹配

测试 1: 测试UDF是否能正常运行

代码语言:txt
复制
testDs.registerTempTable('testDs')
xiaomi = spark.sql("select imei,fwords from testDs where keyword_udf(fwords, 'xiaomi_udf')!=0")
xiaomi.show(10,0)

结果:

每执行1条SQL,打印一条日志,结果符合预期

代码语言:txt
复制
20/09/01 15:44:12 WARN KeyWordKeyFilterUdf: [DEBUG] getUdfName:[com.sogo.sparkudf.udf.KeyWordKeyFilterUdf], getFuncName:[filterudf]

过滤结果符合预期

代码语言:txt
复制
+----------------------------------------+--------+                             
|imei                                    |fword   |
+----------------------------------------+--------+
|82C455E4845CA8C50ABFCCDAE80FFB1D4F444135|小米手机的   |
|82C455E4845CA8C50ABFCCDAE80FFB1D4F444135|小米主题    |
|08B687D554A238008EA117049A87776C4E6A6730|小米      |
|08B687D554A238008EA117049A87776C4E6A6730|小米书包    |
|08B687D554A238008EA117049A87776C4E6A6730|小米10    |
|08B687D554A238008EA117049A87776C4E6A6730|小米10至尊宝 |
|08B687D554A238008EA117049A87776C4E6A6730|小米10至尊  |
|08B687D554A238008EA117049A87776C4E6A6730|小米旅行箱青春版|
|08B687D554A238008EA117049A87776C4E6A6730|如何进入小米  |
|08B687D554A238008EA117049A87776C4E6A6730|小米      |
+----------------------------------------+--------+

测试 2: 测试动态加载词包

在测试1的基础上,直接运行华为词包

代码语言:txt
复制
huawei = spark.sql("select imei,fwords from testDs where keyword_udf(fwords, 'huawei_udf')!=0")
huawei.show(10,0)

输出结果符合预期

代码语言:txt
复制
20/09/01 16:02:41 WARN KeyWordKeyFilterUdf: [DEBUG] getUdfName:[com.sogo.sparkudf.udf.KeyWordKeyFilterUdf], getFuncName:[filterudf]

+----------------------------------------+-------------------+                  
|imei                                    |fword              |
+----------------------------------------+-------------------+
|52686FA528D898ECE0F30EDF43A2E4B94D444D33|华为P40pro           |
|52686FA528D898ECE0F30EDF43A2E4B94D444D33|华为手机               |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为手机屏幕最左侧有色线       |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为P40Pro侧边色线       |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为                 |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为手机屏幕最左侧有色线       |
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|华为P40开发者xuanxiang选项|
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|华为P40kaifazhe开发者   |
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|电脑怎么用华为手机怎么共享网络    |
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|huawei华为           |
+----------------------------------------+-------------------+

小结

为了使用同一个UDF动态加载不同的词包(词包可以无限扩展),通过构建常量列的方式,补充UDF不能传入非数据列,最终实现了动态加载词包的功能。当然,我们还应删除过期得词包,以节约资源占用。

参考文献

1 Spark UDF加载外部资源 https://cloud.tencent.com/developer/article/1688828

2 流水账:使用GenericUDF为Hive编写扩展函数 http://zuojie.github.io/2014/03/14/%E6%B5%81%E6%B0%B4%E8%B4%A6-%E4%BD%BF%E7%94%A8GenericUDF%E4%B8%BAHive%E7%BC%96%E5%86%99%E6%8F%92%E4%BB%B6%E5%87%BD%E6%95%B0.html

3 https://stackoverflow.com/questions/36915090/in-spark-sql-how-do-you-register-and-use-a-generic-udf

4 Spark UDF实现demo https://cloud.tencent.com/developer/article/1672068

5 Hive udf、UDF、GenericUDF https://blog.csdn.net/wangshuminjava/article/details/79663998

6 Hive- UDF&GenericUDF https://www.jianshu.com/p/ca9dce6b5c37

7 Hive之ObjectInspector接口解析笔记 https://blog.csdn.net/weixin_39469127/article/details/89739285

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark GenericUDF动态加载外部资源
    • 前言
      • UDF和GenericUDF的区别
        • 准备工作
          • 外部资源的数据结构
          • 字典树数据类型
          • Redis Cluster的单例
        • 继承GenericUDF
          • 注册UDF
            • 打包
            • 发送至开发机
            • 注册
          • 测试
            • 测试数据
            • 关键词匹配
          • 小结
            • 参考文献
            相关产品与服务
            云数据库 Redis
            腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档