前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >指标统计:基于流计算Oceanus(Flink) 实现实时UVPV统计

指标统计:基于流计算Oceanus(Flink) 实现实时UVPV统计

作者头像
腾讯云开发者
发布2021-10-22 10:40:09
1K0
发布2021-10-22 10:40:09
举报

导语 | 最近梳理了一下如何用Flink来实现实时的UV、PV指标的统计,并和公司内微视部门的同事交流。然后针对该场景做了简化,并发现使用Flink SQL来实现这些指标的统计会更加便捷。

一、解决方案描述

(一)概述

本方案结合本地自建Kafka集群、腾讯云流计算Oceanus(Flink)、云数据库Redis对博客、购物等网站UV、PV指标进行实时可视化分析。分析指标包含网站的独立访客数量(UV)、产品的点击量(PV)、转化率(转化率=成交次数/点击量)等。

相关概念介绍

UV(Unique Visitor):独立访客数量。访问您网站的一台客户端为一个访客,如用户对同一页面访问了5次,那么该页面的UV只加1,因为UV统计的是去重后的用户数而不是访问次数。

PV(Page View):点击量或页面浏览量。如用户对同一页面访问了5次,那么该页面的PV会加5。

(二)方案架构及优势

根据以上实时指标统计场景,设计了如下架构图:

涉及产品列表:

  • 本地数据中心(IDC)的自建Kafka集群
  • 私有网络VPC
  • 专线接入/云联网/VPN连接/对等连接
  • 流计算Oceanus (Flink)
  • 云数据库Redis

二、前置准备

购买所需的腾讯云资源,并打通网络。自建的Kafka集群需根据集群所在区域需采用VPN连接、专线连接或对等连接的方式来实现网络互通互联。

(一)创建私有网络VPC

私有网络(VPC)是一块在腾讯云上自定义的逻辑隔离网络空间,在构建Oceanus集群、Redis组件等服务时选择的网络建议选择同一个VPC,网络才能互通。否则需要使用对等连接、NA网关、VPN等方式打通网络。私有网络创建步骤请参考帮助文档

(https://cloud.tencent.com/document/product/215/36515)

(二)创建Oceanus集群

流计算Oceanus是大数据产品生态体系的实时化分析利器,是基于Apache Flink构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算Oceanus以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

在Oceanus控制台的【集群管理->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。VPC及子网使用刚刚创建好的网络。创建完后Flink的集群如下:

(三)创建Redis集群

Redis控制台的【新建实例】页面创建集群,选择与其他组件同一地域,同区域的同一私有网络VPC,这里还选择同一子网。

Redis控制台:https://console.cloud.tencent.com/redis#/

(四)配置自建Kafka集群

  • 修改自建Kafka集群配置

自建Kafka集群连接时bootstrap-servers参数常常使用hostname而不是ip来连接。但用自建Kafka集群连接腾讯云上的Oceanus集群为全托管集群,Oceanus集群的节点上无法解析自建集群的hostname与ip的映射关系,所以需要改监听器地址由hostname为ip地址连接的形式。

将config/server.properties配置文件中advertised.listeners参数配置为ip地址。示例:

代码语言:javascript
复制
# 0.10.X及以后版本advertised.listeners=PLAINTEXT://10.1.0.10:9092# 0.10.X之前版本advertised.host.name=PLAINTEXT://10.1.0.10:9092

修改后重启Kafka集群。

注意:若在云上使用到自建的zookeeper地址,也需要将zk配置中的hostname修改ip地址形式。

  • 模拟发送数据到topic

本案例使用topic为topic为uvpv-demo。

  • Kafka客户端

进入自建Kafka集群节点,启动Kafka客户端,模拟发送数据。

代码语言:javascript
复制
./bin/kafka-console-producer.sh --broker-list 10.1.0.10:9092 --topic uvpv-demo>{"record_type":0, "user_id": 2, "client_ip": "100.0.0.2", "product_id": 101, "create_time": "2021-09-08 16:20:00"}>{"record_type":0, "user_id": 3, "client_ip": "100.0.0.3", "product_id": 101, "create_time": "2021-09-08 16:20:00"}>{"record_type":1, "user_id": 2, "client_ip": "100.0.0.1", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
  • 使用脚本发送

脚本一:Java代码参考:

https://cloud.tencent.com/document/product/597/54834

脚本二:Python脚本。

参考之前案例中Python脚本进行适当修改即可:《视频直播:实时数据可视化分析》

(五)打通自建IDC集群到腾讯云网络通信

自建Kafka集群联通腾讯云网络,可通过以下前3种方式打通自建IDC到腾讯云的网络通信。

  • 专线接入

https://cloud.tencent.com/document/product/216适用于本地数据中心IDC与腾讯云网络打通。

  • 云联网

https://cloud.tencent.com/document/product/877适用于本地数据中心IDC与腾讯云网络打通,也可用于云上不同地域间私有网络VPC打通。

  • VPN连接

https://cloud.tencent.com/document/product/554适用于本地数据中心IDC与腾讯云网络打通。

  • 对等连接+NAT网关

对等连接:

https://cloud.tencent.com/document/product/553

NAT网关:

https://cloud.tencent.com/document/product/552适合云上不同地域间私有网络VPC打通,不适合本地IDC到腾讯云网络。

本方案中使用了VPN连接的方式,实现本地IDC和云上网络的通信。参考链接:建立VPC到IDC的连接(路由表)

(https://cloud.tencent.com/document/product/554/52854)

根据方案绘制了下面的网络架构图:

三、方案实现

(一)业务目标

利用流计算Oceanus实现网站UV、PV、转化率指标的实时统计,这里只列取以下3种统计指标:

网站的独立访客数量UV。Oceanus处理后在Redis中通过set类型存储独立访客数量,同时也达到了对同一访客的数据去重的目的。

网站商品页面的点击量PV。Oceanus处理后在Redis中使用list类型存储页面点击量。

转化率(转化率=成交次数/点击量)。Oceanus处理后在Redis中用String存储即可。

(二)源数据格式

Kafka topic:uvpv-demo(浏览记录)

Kafka内部采用json格式存储,数据格式如下:

代码语言:javascript
复制
# 浏览记录{ "record_type":0,  # 0 表示浏览记录 "user_id": 6,  "client_ip": "100.0.0.6",  "product_id": 101,  "create_time": "2021-09-06 16:00:00"}
# 购买记录{ "record_type":1, # 1 表示购买记录 "user_id": 6,  "client_ip": "100.0.0.8",  "product_id": 101,  "create_time": "2021-09-08 18:00:00"}

(三)编写Flink SQL作业

示例中实现了UV、PV和转化率3个指标的获取逻辑,并写入Sink端。

  • 定义Source
代码语言:javascript
复制
CREATE TABLE `input_web_record` (  `record_type` INT,  `user_id` INT,  `client_ip` VARCHAR,  `product_id` INT,  `create_time` TIMESTAMP,  `times` AS create_time,  WATERMARK FOR times AS times - INTERVAL '10' MINUTE ) WITH (    'connector' = 'kafka',   -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector    'topic' = 'uvpv-demo',      'scan.startup.mode' = 'earliest-offset',     --'properties.bootstrap.servers' = '82.157.27.147:9092',     'properties.bootstrap.servers' = '10.1.0.10:9092',      'properties.group.id' = 'WebRecordGroup',  -- 必选参数, 一定要指定 Group ID    'format' = 'json',    'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常    'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null);
  • 定义Sink
代码语言:javascript
复制
-- UV sinkCREATE TABLE `output_uv` (  `userids`   STRING,`user_id` STRING) WITH ( 'connector' = 'redis',           'command' = 'sadd',              -- 使用集合保存uv(支持命令:set、lpush、sadd、hset、zadd) 'nodes' = '192.28.28.217:6379',  -- redis连接地址,集群模式多个节点使用'',''分隔。 -- 'additional-key' = '<key>',   -- 用于指定hset和zadd的key。hset、zadd必须设置。 'password' = 'yourpassword'   );
-- PV sinkCREATE TABLE `output_pv` (  `pagevisits`   STRING,`product_id` STRING,`hour_count` BIGINT) WITH ( 'connector' = 'redis',           'command' = 'lpush',              -- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd) 'nodes' = '192.28.28.217:6379',   -- redis连接地址,集群模式多个节点使用'',''分隔。 -- 'additional-key' = '<key>',   -- 用于指定hset和zadd的key。hset、zadd必须设置。 'password' = 'yourpassword'   );
-- 转化率 sinkCREATE TABLE `output_conversion_rate` (  `conversion_rate`   STRING,`rate` STRING) WITH ( 'connector' = 'redis',         'command' = 'set',              -- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd) 'nodes' = '192.28.28.217:6379', -- redis连接地址,集群模式多个节点使用'',''分隔。 -- 'additional-key' = '<key>', -- 用于指定hset和zadd的key。hset、zadd必须设置。 'password' = 'yourpassword'   );
  • 业务逻辑
代码语言:javascript
复制
-- 加工得到 UV 指标,统计所有时间内的 UVINSERT INTO output_uv SELECT  'userids' AS `userids`,CAST(user_id AS string) AS user_id FROM input_web_record ;
-- 加工并得到 PV 指标,统计每 10 分钟内的 PVINSERT INTO output_pv SELECT  'pagevisits' AS pagevisits, CAST(product_id AS string) AS product_id, SUM(product_id) AS hour_countFROM input_web_record WHERE record_type = 0 GROUP BY HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), product_id, user_id;
-- 加工并得到转化率指标,统计每 10 分钟内的转化率INSERT INTO output_conversion_rate SELECT  'conversion_rate' AS conversion_rate, CAST( (((SELECT COUNT(1) FROM input_web_record WHERE record_type=0)*1.0)/SUM(a.product_id)) as string) FROM (SELECT * FROM input_web_record where record_type = 1) AS aGROUP BY  HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), product_id;

(四)结果验证

通常情况,会通过Web网站来展示统计到的UV、PV指标,这里为了简单直接在Redis控制台(https://console.cloud.tencent.com/redis#/)登录进行查询:

  • userids: 存储UV
  • pagevisits: 存储PV
  • conversion_rate: 存储转化率,即购买商品次数/总页面点击量。

四、总结

通过自建Kafka集群采集数据,在流计算Oceanus (Flink) 中实时进行字段累加、窗口聚合等操作,将加工后的数据存储在云数据库Redis,统计到实时刷新的UV、PV等指标。这个方案在Kafka json格式设计时为了简便易懂做了简化处理,将浏览记录和产品购买记录都放在了同一个topic中,重点通过打通自建IDC和腾讯云产品间的网络来展现整个方案。针对超大规模的UV去重,微视的同事采用了Redis hyperloglog方式来实现UV统计。相比直接使用set类型方式有极小的内存空间占用的优点,详情见链接:

https://cloud.tencent.com/developer/article/1889162

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓


👇点击下方「阅读原文」,了解腾讯云流计算Oceanus更多信息~

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-10-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯云开发者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • (二)方案架构及优势
  • (一)创建私有网络VPC
  • (二)创建Oceanus集群
  • (三)创建Redis集群
    • (四)配置自建Kafka集群
    • (五)打通自建IDC集群到腾讯云网络通信
    • (一)业务目标
    • (二)源数据格式
    • (三)编写Flink SQL作业
    • (四)结果验证
    相关产品与服务
    云数据库 Redis
    腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档