前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink sql实时计算当天pv写入mysql

flink sql实时计算当天pv写入mysql

作者头像
大数据技术与应用实战
发布2020-09-24 16:18:48
2.9K0
发布2020-09-24 16:18:48
举报

今天我们主要来讲一个很简单但是很常见的需求,实时计算出网站当天的pv值,然后将结果实时更新到mysql数据库,以供前端查询显示。

接下来我们看看如何用flink sql来实现这个简单的功能。

首先我们还是使用datagen生成测试数据,随机生成一些用户id

代码语言:javascript
复制
     String sourceSql = "CREATE TABLE datagen (\n" +
                " userid int,\n" +
                " proctime as PROCTIME()\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='100',\n" +
                " 'fields.userid.kind'='random',\n" +
                " 'fields.userid.min'='1',\n" +
                " 'fields.userid.max'='100'\n" +
                ")";

定义mysql的sink,这里mysql是作为了一个upsert的sink,所以必须要一个主键,在mysql建表的时候我们指定了当天的日期作为主键,mysql ddl如下

代码语言:javascript
复制
CREATE TABLE `pv` (
  `day_str` varchar(100) NOT NULL,
  `pv` bigint(10) DEFAULT NULL,
  PRIMARY KEY (`day_str`)
)

Flink中的ddl要和mysql中对的上,也要指定主键。

代码语言:javascript
复制
    String mysqlsql = "CREATE TABLE pv (\n" +
                "  day_str STRING,\n" +
                "  pv bigINT,\n" +
                "  PRIMARY KEY (day_str) NOT ENFORCED\n" +
                ") WITH (\n" +
                "   'connector' = 'jdbc',\n" +
                "   'username' = 'root',\n" +
                "   'password' = 'root',\n" +
                "   'url' = 'jdbc:mysql://localhost:3306/test',\n" +
                "   'table-name' = 'pv'\n" +
                ")";

接下来我们写一个简单的查询:

代码语言:javascript
复制
     tEnv.executeSql("insert into pv SELECT DATE_FORMAT(proctime, 'yyyy-MM-dd') as day_str, count(*) \n" +
                "FROM datagen \n" +
                "GROUP BY DATE_FORMAT(proctime, 'yyyy-MM-dd')");

可能对于以前一直做批处理的同学来说会感到疑惑,对于流式处理来说,group by将会返回一个可撤回流(RetractStream),转化成datastream,将会得到一个Tuple2<Boolean, T>对象,这个对象第一个字段如果是false表示数据要撤回,true表示数据是我们新添加的,第二个字段是实际的数据。在这里,我们将这个实时更新的结果写入到了mysql。这样mysql表,每天就会只有一个数据,系统会不断地更新pv字段。

类似的需求我们还可以使用flink的窗口来实现,定义一个窗口周期是一天的窗口,然后自定义一个触发器,比如每秒钟触发一次,然后将结果输出写入第三方sink,可以参考下 【flink实战-模拟简易双11实时统计大屏

由于笔者水平有限,也难免有错误,请大家不吝赐教,更多信息,也请关注我的公众号【大数据技术与应用实战】,谢谢。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档