前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink实战】新老用户方案优化使用状态与布隆过滤器的方式

【Flink实战】新老用户方案优化使用状态与布隆过滤器的方式

作者头像
大数据小禅
发布2023-09-14 08:11:55
5210
发布2023-09-14 08:11:55
举报
文章被收录于专栏:YO大数据

什么是布隆过滤器

  • 布隆过滤器(Bloom Filter)是一种经过哈希函数处理的数据结构,用于快速判断一个元素是否可能存在于一个集合中。它可以用来检索大规模数据集中的元素,过滤掉不存在的元素,从而减少昂贵的磁盘或网络访问操作。
  • 布隆过滤器的核心思想是使用一个位数组(通常由二进制位组成)和多个哈希函数。当将元素添加到布隆过滤器时,通过哈希函数将元素映射到位数组的多个位置,并将这些位置的二进制位设置为1。当需要查询某个元素是否存在时,同样通过哈希函数将元素映射到位数组的相应位置,并检查这些位置的二进制位,如果所有位置的二进制位都为1,则说明元素可能存在;如果有任何一个位置的二进制位为0,则说明元素一定不存在。
  • 由于布隆过滤器的位数组可以被复用,其空间占用相对较小。同时,通过适当的哈希函数设计和位数组大小的选择,可以控制误判率(即判断元素存在时的假阳性率)。
  • 布隆过滤器的优势在于对于大规模数据集的快速查询和判断,具有高效的时间和空间复杂度。但也存在一定的限制,如不能删除元素、存在一定的误判率以及无法提供元素具体的位置等。
  • 布隆过滤器在实际应用中有许多用途,如缓存击穿防护、恶意网址过滤、URL去重、数据同步检查等。但在使用过程中需要根据具体的应用场景和需求,权衡误判率和空间使用,并合理确定哈希函数的个数和位数组大小,以获得最佳的性能和准确性。

新的需求:使用Flink 新老用户->状态+布隆过滤器标识

  • 使用布隆过滤器的方式 加上状态管理
  • 读取数据后进行keyby根据设备类型 之后使用process窗口函数进行操作
代码语言:javascript
复制
/**
 * @Description 新老用户统计分析
 *          原来是根据数据中的某个字段
 *          现在我们是根据每个 device(设备) 来判断是否是新老用户
 * 思考 => device放在状态里面去呢?
 * 我们的实现:状态 + 布隆过滤器
 */
public class OsUserCntAppV3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment.readTextFile("data/access.json");
        environment.setParallelism(1); //设置并行度为1方便观察
        SingleOutputStreamOperator<Access> filter = stream.map(new MapFunction<String, Access>() {
            @Override
            public Access map(String s) throws Exception {
                // json 转 Access
                try {
                    return JSON.parseObject(s, Access.class);
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
            //这里是只要不为空的数据  x != null等于把上面的空的数据过滤掉
        }).filter(x -> x != null).filter(new FilterFunction<Access>() {
            @Override
            public boolean filter(Access access) throws Exception {
                //只过滤出来 event='startup'的数据
                return "startup".equals(access.event);
            }
        });
        //根据设别类型进行keyBy
        filter.keyBy(x->x.deviceType)
                //全窗口 key I O
                .process(new KeyedProcessFunction<String, Access, Access>() {

                    private ValueState<BloomFilter<String>> state;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //使用布隆过滤器
                        //import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;  注意路径
                        //状态初始化 使用ValueState描述符 布隆过滤器传入设备编号
                        //初始化传入名字和类型
                        ValueStateDescriptor<BloomFilter<String>> descriptor =
                                new ValueStateDescriptor<>("s", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
                        state = getRuntimeContext().getState(descriptor);
                    }

                    @Override
                    public void processElement(Access value, Context context, Collector<Access> collector) throws Exception {
                        //来一条处理一条 "device":"4759947c-cd47-433c-ac8f-ae923a6d38b6" 设备ID
                        String device = value.device;
                        //状态中获取值
                        BloomFilter<String> bloomFilter = state.value();
                        //布隆过滤器固定写法 数据值
                        if(null==bloomFilter){
                            bloomFilter=BloomFilter.create(Funnels.unencodedCharsFunnel(),10000);
                        }
                        //来一条数据判断一次看过滤器是否包含
                        //mightContain() 可能包含    !bloomFilter.mightContain(device) 肯定不包含
                        if(!bloomFilter.mightContain(device)){
                            //不包含放入设备ID
                            bloomFilter.put(device);
                            //1是新用户  这里布直接修改nu字段 应该会造成进来的可能全是 nu=1 这里使用新字段nu2
                            //nu2第一次赋值为1后是新用户  再进来就会给布隆写成0了
                            value.nu2=1;
                            //更新状态
                            state.update(bloomFilter);
                        }
                        //输出
                        collector.collect(value);
                    }
                }).print();

        environment.execute("OsUserCntAppV1");
    }
}

核心代码代码详解

  • 这段代码实现了一个新老用户统计分析的需求。原始数据中有大量的设备访问记录,代码通过使用状态和布隆过滤器来判断每个设备是否是新用户。
  • 代码首先读取了一个包含访问记录的文本文件,并将每行数据解析为Access对象。然后通过一系列过滤操作,过滤出其中eventType为"startup"的数据。
  • 接下来,代码根据设备类型进行keyBy操作,并使用全窗口处理函数(KeyedProcessFunction)进行处理。在处理过程中,使用一个布隆过滤器保存已经处理过的设备ID,用于判断设备是否是新用户。代码中通过状态(ValueState)来保存和更新布隆过滤器。
  • 对于每条访问记录,代码会先判断布隆过滤器是否包含该设备ID,如果不包含,则将该设备ID添加到布隆过滤器中,并修改Access对象的字段nu2为1,表示该设备是新用户。最后,输出处理过的Access对象。
  • 通过以上的处理,代码可以对大量的设备访问记录进行分析,判断每个设备是否是新用户,并输出结果。通过使用布隆过滤器来保存已处理过的设备ID,可以在大规模数据集中快速判断设备的新旧状态,提高处理效率。

结果字段截取

代码语言:javascript
复制
event='startup', net='WiFi', channel='华为商城', uid='user_1', nu=1, nu2=1, ip='171.11.85.21', 

event='startup', net='WiFi', channel='华为商城', uid='user_1', nu=1, nu2=0, ip='171.11.85.21'
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是布隆过滤器
  • 新的需求:使用Flink 新老用户->状态+布隆过滤器标识
  • 核心代码代码详解
  • 结果字段截取
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档