前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Storm上的实时统计利器-easycount

Storm上的实时统计利器-easycount

作者头像
腾讯大数据
发布2018-01-26 19:13:45
1.2K0
发布2018-01-26 19:13:45
举报

背景

Storm是TRC(腾讯实时计算)平台的核心组件。与Hadoop不同,storm之上没有像hive,pig之类的解放应用开发人员效率的工具。开发原生的storm应用必须掌握storm的api,开发门槛高,调试困难,效率低下。

EasyCount(SQL on strom)是构建在storm之上的一套实时计算系统。应用开发人员只需通过配置定制化的脚本来完成业务逻辑的描述,能够快速实现各种实时统计需求,降低使用门槛,提升开发效率。

系统设计与实现

上图是EC系统的架构图。用于描述用户业务逻辑的SQL脚本通过上层提供的不同接口以文本的方式传递给开源语法解析工具Antrl,生成AST(抽象语法树)。进一步通过对抽象语法树上的每一个节点进行语义层面的丰富生成具有独立执行含义的Operator(算子)对象并生成具有Logical Plan(逻辑执行计划)。再进一步结合Apache Storm提供的API,将构成logical plan的算子映射到storm不同类型的task之中,从而生成具有执行能力的物理执行计划。最后通过配置运行资源,task分配比例等参数后调用Apache Storm的api生成可运行的Storm Topolgy,并提交到Storm(on yarn)集群上运行。

语言规范

本套系统所使用的SQL脚本结构包含两个部分:表描述配置和sql配置

下图是一个实际业务的配置文件结构。

Ø 表描述配置

EC系统中表的分类

l 从数据输入和输出进行分类

从数据输入和输出来看分为源表(用于关联的维表也称为源表)和目标表(被更新的维表也称为目标表)。源表是数据输入部分,任何实时统计的源表一定包含至少一个流水表,可能包含若干维表。目标表是计算结果,或者计算的中间结果所在的表,原则上可以是任意类型的数据表。通常目标表可能是一个流水表,但是有的时候也可能是mysql,tde等,目标表可以有多张,取决于实际应用。

l 从数据表的功能进行分类

从数据表的功能来看分为流水表和维表。这个分类是EC系统中最重要的一个分类。

流水表,EC是实时统计系统,处理的数据是流式数据,因此所有来自TDBank的流式数据都是流水表。传统数据表的一次计算是针对整张表数据进行的,而流水表数据的计算只针对当前时间的数据进行,最多是针对截止到当前的所有数据进行的计算。流水表数据的统计是实时进行的,根据我们大多数应用需求场景,流水表一般都是按照某个小的时间粒度进行数据统计的,例如10s,1min,5min等。基于此,EC系统在进行实时统计计算的时候,要求数据中必须有一个时间字段作为协调(COORDINATE),如果数据中确实没有时间字段,那么就EC系统按照接受到数据的时间进行协调。

维表,维表是指那些静态数据表,在实际的统计中需要关联一些配置信息对流水数据进行维度上上补充,之后再进行数据统计。这些配置信息,有时候需要修改,但是通常变化不大。为此EC系统将这类数据表抽象为维表,维表数据一般存储在KV系统中,如果维表数据量较小的话,可以存储到DB甚至以配置文件的方式抽象为内存表,直接从内存中关联。维表目前支持以下几种:TDE,redis,mem,DB(tpg,mysql)。其中tde和redis维表支持实时更新(数据统计结果更新维表)。

l 从数据表的存储方式分类

从数据存储来看又分为:stream(特指tdbank中的流水表),mysql,tpg,tde,redis,hbase等。

1) stream表,源表中必须包含至少一张stream表。

2) Mysql表,mysql表可以作为维表,也可以作为目标表。

3) Tpg表,指tpg上的表,和mysql表完全类似。

4) Tde表,是一个kv表,主要是用来做维表使用,支持插入和更新。

5) Redis表,目前主要是测试使用,目前来看,在实际应用中建议使用tde

6) Hbase表, 是一个kv表,主要做结果表。

Ø SQL语法定义

本套系统所使用的SQL语法是基于HIVEQL,同时添加了符合流式计算语义的语法结构。HIVEQL是针对离线数据集的查询工具,自身无法支持流式计算语义的描述。为了最大程度上保留了用户编写标准sql的习惯,系统在hiveql的基础之上设计并且拓展了专用于流式计算的特性的语法结构。上图是对本套系统语法文件简化之后的语法结构描述。

l SQL语法和规范说明

1) withquerys

一条sql的正确写法是先用with语句将所有的中间结果用子查询的方式作为临时表存储(并非实际存储,这里相当于java语言中的中间变量)。对于较为复杂的sql,这样写很有好处可以使得sql逻辑更加的清晰,减少文本的输入量;同时对于重复使用的子查询可以优化计算,减少资源的开销。WITH语句是可选的,对于简单的统计sql可以不写with语句。

2) joinquery

目前SQL不支持流水表之间的join操作,仅支持用于维表关联的left join操作,相当于实时流水数据在字段维度上的补充,转换或扩展。其中左表必须是stream表,右边的维表可以是多个,用逗号隔开,所有的关联条件都必须是等值关联,并且所有的关联条件统一写在最后的on子句中,on子句必须包含每张维表的关联条件。目前系统已经支持的维表类型有TDE,HBASE以及DB(Mysql and Postgresql)。

3) 聚合函数

SQL支持全部HIVEQL支持的聚合函数功能,同时为了支持复杂数据类型(Map,Array,structs,binary)还提供了若干内置自定义函数。

实时计算简而言之就是在时间维度上更细力度更低延迟的批数据处理。实现实时计算必须具备在时间维度上切分流式数据的语义表述能力。因此在传统SQL基础之上,我们定义了聚合窗口(AGGR INTERVAL)的概念,即指进行数据聚合所采用的时间粒度。假设聚合窗口为60s,那就表示每一分钟进行一次聚合计算,聚合计算的结果是针对这1分钟数据进行的。针对系统接收到每一条流水记录,需要根据coordinateby表达式所指定的带有时间属性的字段货表达式来决定分配该记录到对应的时间窗口参与计算。默认情况下,如果不使用coordinate by子句的话,系统按照接收到数据时的系统时间进行协调。

在聚合窗口的基础之上,根据实时计算需求的特点,我们又拓展了两种新的窗口:累加窗口和滑动窗口。并针对这两种新的窗口扩展聚合函数的能力,提供了三种模式的聚合,分别是:普通聚合,累加聚合,滑动窗口聚合,丰富了实时计算需求的语义。

累加窗口以及滑动窗口,和聚合窗口一样是两个聚合时间粒度。不过这里要求累加窗口和滑动窗口必须是聚合窗口的整数倍。

普通聚合:和传统聚合函数一致,对每个聚合窗口进行一次聚合计算

累加聚合:在累加窗口内的每个聚合窗口进行一次聚合计算,不过计算的数据是针对从累加窗口起始直到当前聚合窗口的聚合值。如下图所示:

滑动窗口聚合:在每个聚合窗口结束的时候计算,从当前聚合窗口向前推到滑动窗口大小内的数据进行聚合计算。如下图所示:

聚合窗口时间通过WITH AGGR INERVAL ?SECONDS定义,累加窗口通过WITHACCU INTERVAL ? SECONDS定义,滑动窗口通过WITH SW INTERVAL ? SECONDS定义。

一个聚合函数是如果采用累加聚合模式需要通过在函数调用中嵌入ACCU(Accumulate)关键字标识,滑动窗口聚合通过嵌入SW(Slide Window)关键字标识。

4) 扩展SQL的表述能力

传统SQL都是有Schema的,但对于非结构化数据处理能力有限。本系统提供了复杂数据类型(map, array/list,struct, binary)的定义,用以表述更复杂的数据结构。为了适应复杂数据结构,提供了foreach和execute算子对复杂数据结构进行操作,提供了更加灵活的语义表述。

辅助工具

为了能够帮助业务开发人员快速调试自己的脚本,我们还提供了脚本调试的IDE环境(eclipse插件的形式提供),方便用户定位问题,降低线上运行出错的几率。目前仅实现了离线调试的功能。下图是系统配套的前台管理页面以及ide调试功能的截图。

EC任务的执行

Ø 步骤

编写符合业务逻辑的脚本

执行storm命令将脚本以参数的形式传递给给系统可执行jar文件,自动生成topology提交的storm集群运行。开始数据的处理。

Ø 示例脚本

[system]

sql=\

WITH \

(SELECT 'ylzt' name, qq FROM tbl_ylzt WHEREtype = 'login') ylzt \

(SELECT 'lol' name, qq FROM tbl_lol WHEREtype = 'login') lol \

(SELECT name, qq FROM ylzt UNION ALLSELECT name, qq FROM lol) utbl \

INSERT INTO result SELECTFROM_UNIXTIME(AGGRTIME DIV 1000, 'yyyy-MM-dd HH:mm:ss'), name, COUNT(qq ACCU),COUNT(qq SW), COUNT(qq) FROM utbl \

GROUP BY name COORDINATE BYUNIX_TIMESTAMP()*1000 \

WITH AGGR INTERVAL 60 SECONDS WITH ACCUINTERVAL 3600 SECONDS WITH SW INTERVAL 300 SECONDS

[tabledesc-tbl_ylzt]

table.name=tbl_ylzt

table.fields=qq,bigint,:type,string,

table.type=stream

table.tube.addrlist=xxx

table.topic=xxx

table.interfaceId=xxx

table.field.splitter=xxx

[tabledesc-tbl_lol]

table.name=tbl_lol

table.fields=qq,bigint,:type,string,

table.type=stream

table.tube.addrlist=xxx

table.topic=xxx

table.interfaceId=xxx

table.field.splitter=xxx

[tabledesc-result]

table.name=result

table.fields=agtime,string,:name,string,:pv,bigint,:fmpv,bigint,:hpv,bigint,

table.type=mysql

table.mysql.db.host=127.0.0.1

table.mysql.db.name=xxx

table.mysql.db.port=3306

table.mysql.db.username=xxx

table.mysql.db.passwd=xxx

l 脚本业务需求说明

“御龙在天”和“英雄联盟”这两款星级游戏用户操作记录流水数据已实时接入TDBank系统,根据用户登录流水数据,统计这两款游戏

1) 每分钟的用户登录次数,每分钟输出一次。

2) 最近连续5分钟的用户登录次数,每分钟输出一次。

3) 统计当前小时开始到当前分钟的用户登录次数,每分钟输出一次。

4) 统计结果插入指定的关系型数据库MYSQL的结果表中。

l 脚本结构说明:

脚本格式是ini文件格式,包含四个根配置 [system],[tabledesc-tbl_ylzt],[tabledesc-tbl_lol],[tabledesc-result]最前面的system用来配置sql后面三个都是具体的输入输出表配置。

l sql部分简要说明:

脚本使用了FROM_UNIXTIME(),COUNT(),UNIX_TIMESTAMP()函数是hive的函数,本系统兼容hive的官方的大部分函数,并沿用了hive自定义函数(UDF)的编程接口,用户可以根据需要扩充自己需要的函数功能。

脚本使用了三种窗口,通过

WITH AGGR INTERVAL 60SECONDS WITH ACCU INTERVAL 3600 SECONDS WITH SW INTERVAL 300 SECONDS 分别指定了普通窗口,累计窗口,滑动窗口的时间窗大小。并通过指定聚合函数(本例为count)的参数中嵌入ACCU,SW关键字来表示该聚合函数工作在是累计聚合和滑动聚合的模式下。

Ø 编译执行过程

l 语法树:利用开源的语法解析工具Antrl,根据系统自定义的SQL语法,将脚本翻译成为抽象语法树。抽象语法树上的每一个节点代表一个操作或被操作的对象,与脚本是一一对应的。

l 查询树:根据抽象语法树,将具有连续含义的一组节点组织在一起,抽象成查询块的概念。一个查询块相当于一组连续节点的集合(相当于单词和语句的概念),从SQL层面来看相当于一条子查询语句。

l 逻辑计划:对每个查询块进行细化。将每一个查询块分解成独立执行含义的Operator(算子)的组合。Operator按照功能区分可分为(TS, SEL, FIL, UNION, JOIN, MGBY, RGBY, FS),分别负责(扫描表,选取,过滤,合并,连接,局部分组,总分组,输出。根据抽象语法树的节点类型及其提供的参数,实例化具有具备执行含义的特定类型算子的对象,将operator有序的组织起来,形成完整的数据处理“流水线”,为数据处理做好准备。

l 物理计划:结合Apache Storm的提供的接口,将组成逻辑计划的算子分配到Storm不同的task上执行。Storm的数据处理task分为spout,bolt两种。切分task采用的规则是将MG(MGBY)前驱算子映射到SPOURT-TASK上,将RG(RGBY)后继算子映射到BOLT-TASK上(图示绿色部分为SPOUT-TASK,紫色部分为BOLT-TASK)。通过配置task 资源数及比例生成可运行在storm上的topology。本例中一共配置了5个task,3个spout task,2个bolt task。

总结

目前本技术方案,应用于TRC(腾讯实时计算平台),面向公司内部提供业务支撑,上线运营至今已接入120款业务,日均接入量达到6000亿条数据,涉及实时报表,实时监控,实时推荐和实时分析等多种应用场景,成为TRC平台实时统计领域的名副其实的尖兵利器。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-04-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档