前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink计算PV,UV的案例及问题分析

Flink计算PV,UV的案例及问题分析

作者头像
Spark学习技巧
发布2019-12-25 11:50:44
3.5K0
发布2019-12-25 11:50:44
举报
文章被收录于专栏:Spark学习技巧

PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次。

UV(独立访客):即Unique Visitor,访问您网站的一台电脑客户端为一个访客。00:00-24:00内相同的客户端只被计算一次。

一个UV可以用很多PV,一个PV也只能对应一个IP

没有这些数据的支持,意味着你不知道产品的发展情况,用户获取成本,UV,PV,注册转化率;没有这些数据做参考,你不会知道接下来提供什么建议给领导采纳,也推测不出领导为啥烦忧,那么就么有任何表现的机会。

举两个UV计算的场景:

1. 实时计算当天零点起,到当前时间的uv。

2. 实时计算当天每个小时的UV。0点...12点...24点

请问这个用spark streaming如何实现呢?是不是很难有好的思路呢?

今天主要是想给大家用flink来实现一下,在这方面flink确实比较优秀了。

主要技术点就在group by的使用。

下面就是完整的案例:

代码语言:javascript
复制
package org.table.uv;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class ComputeUVDay {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
        tEnv.registerFunction("DateUtil",new DateUtil());
        tEnv.connect(
                new Kafka()
                        .version("0.10")
                        //   "0.8", "0.9", "0.10", "0.11", and "universal"
                        .topic("jsontest")
                        .property("bootstrap.servers", "localhost:9092")
                        .property("group.id","test")
                        .startFromLatest()
        )
                .withFormat(
                        new Json()
                                .failOnMissingField(false)
                                .deriveSchema()
                )
                .withSchema(

                        new Schema()
                                .field("rowtime", Types.SQL_TIMESTAMP)
                                .rowtime(new Rowtime()
                                        .timestampsFromField("eventtime")
                                        .watermarksPeriodicBounded(2000)
                                )
                                .field("fruit", Types.STRING)
                                .field("number", Types.INT)
                )
                .inAppendMode()
                .registerTableSource("source");

        // 計算天級別的uv
//        Table table = tEnv.sqlQuery("select  DateUtil(rowtime),count(distinct fruit) from source group by DateUtil(rowtime)");

        // 计算小时级别uv
        Table table = tEnv.sqlQuery("select  DateUtil(rowtime,'yyyyMMddHH'),count(distinct fruit) from source group by DateUtil(rowtime,'yyyyMMddHH')");

        tEnv.toRetractStream(table, Row.class).addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
            @Override
            public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
                System.out.println(value.f1.toString());
            }
        });

        System.out.println(env.getExecutionPlan());
        env.execute("ComputeUVDay");
    }
}

其中DateUtil类如下:

代码语言:javascript
复制
package org.table.uv;

import org.apache.flink.table.functions.ScalarFunction;

import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;

public class DateUtil extends ScalarFunction {
    public static String eval(long timestamp){
        String result = "null";
        try {
            DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            result = sdf.format(new Timestamp(timestamp));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }
    public static String eval(long ts, String format) {

        String result = "null";
        try {
            DateFormat sdf = new SimpleDateFormat(format);
            result = sdf.format(ts);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }
    public static void main(String[] args) {
        String eval = eval(System.currentTimeMillis(),"yyyyMMddHH");
        System.out.println(eval);
    }
}

代码里面的案例,是可以用于生产中的吗?

假如数据量小可以直接使用,每秒数据量大的话,就比较麻烦。因为你看group by后面的维度,只有当天date 这个维度,这样就会导致计算状态超级集中而使得内存占用超大进而引发oom。

这种情况解决办法就是将状态打散,然后再次聚合即可,典型的分治思想。

具体做法作为福利分享给球友吧。

还有一个问题就是由于存在全局去重及分组操作,flink内部必然要维护一定的状态信息,那么这些状态信息肯定不是要一直保存的,比如uv,我们只需要更新今天,最多昨天的状态,这个点之前的状态要删除的,不能让他白白占着内存,而导致任务内存消耗巨大,甚至因oom而挂掉。

代码语言:javascript
复制
StreamQueryConfig streamQueryConfig = tEnv.queryConfig();
streamQueryConfig.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(15));

tEnv.sqlUpdate(sql,streamQueryConfig);

再有就是能使用事件时间吗?事件时间假如事件严重超时了,比如,我们状态保留时间设置的是两天,两天之后状态清除,那么这时候来了事件时间刚刚好是两天之前的,由于已经没有状态就会重新计算uv覆盖已经生成的值,就导致值错误了,这个问题如何解决呢?

这算是一个疑问吧?

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-12-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档