使用 Apache PIG 统计积累型数据的差值

线上运行的生产系统会定时采集一项丢包数据,这项数据与某个进程相关联,从进程启动开始就一直递增,每隔1分钟采集一次数据,当进程重启之后,这项数据会清零。现在要求使用PIG来统计某个时间段(1 hour)内,多个进程此项数据的变化量汇总。可以看到数据形如以下形式。进程会通过GrpID分组,每个组内有多个进程,需要计算的是各组VALUE值的总的变化量。总数据量约为12w。

PID		GrpID	TIMESTAMP				VALUE
a.1		a		2017-08-04 20:01:00		5
a.1		a		2017-08-04 20:02:00		7
a.2		a		2017-08-04 20:00:00		13
a.1		a		2017-08-04 20:03:00		0
a.1		a		2017-08-04 20:04:00		10
b.3		b		2017-08-04 20:10:00		12
b.3		b		2017-08-04 20:11:00		27
b.3		b		2017-08-04 20:12:00		33
c.2		c		2017-08-04 20:01:00		27
c.3		c		2017-08-04 20:02:00		30
......

粗看起来这个问题似乎很简单,因为数据量并不是很大,可以首先LOAD整个数据集,然后按照PID分组,在分组内对TIMESTAMP时间排序,计算最后一个与第一个VALUE的差值,然后再对GrpID分组将刚才计算出来的差值求和即可。仔细想想这是不行的,因为在每个PID分组内,本次时间片内的数据有可能因为进程重启而清零(如下图),所以不能简单的按照时间排序后尾首相减来计算。

这种累积型数据的计算方式应该如下图,计算多个分段分别的diff值,最后汇总。

具体的算法也非常简单:

  1. 对数据集按照PID聚合
  2. 对于每个聚合子集,按照TIMESTAMP进行ASC排序
  3. 对于排序过后的VALUE序列 V1, V_2, V_3 …… ,V(n-1), V_n 计算:
SUM_Diff = SUM((V_t – V_(t-1)) >= 0 ? (V_t – V_(t-1)) : 0)

从最后一个VALUE开始,计算Vt – V(t-1) 的值并求和,当遇到差值为负的情况,也就是出现了进程重启清零的情况,就加零。

  1. 对GrpID聚合,求出一个分组下所有进程SUM_Diff的求和值。

上述算法很简单,用脚本可以很快搞定。但如果需要用PIG任务来写,第3个步骤就没有这么容易实现了。不过好在PIG脚本可以调用其他语言编写的UDF(User Define Function)来完成某些复杂的计算逻辑,我们就采用此种方案。如何使用Jython实现PIG UDF请参考官方文档

https://pig.apache.org/docs/r0.9.1/udf.html

先来看PIG脚本代码:

REGISTER 'pycalc/calc_lost_pkg.py' using jython as myudf;
REGISTER /data/gdata/pig-0.15.0/thirdparty/mysql-connector-java-5.1.38-bin.jar;
REGISTER /data/gdata/pig-0.15.0/thirdparty/piggybank-0.15.0.jar;
REGISTER /data/gdata/pig-0.16.0/thirdparty/KVLoader-0.5.1.jar

A = LOAD 'data.log' USING com.tencent.gdata.pig.KVLoader('&', '=', 'PID:GrpID:TIMESTAMP:VALUE');
B = FOREACH A GENERATE $0 AS pid, $1 AS grpid, $2 as ts, (int)$3 as value;
C = FILTER B BY pid is not null AND grpid is not null AND ts is not null AND value is not null;
D = FOREACH C GENERATE
            pid as pid,
			grpid as grpid,
            ts as ts,
            value as value;
			
uniq_D = DISTINCT D;
E = GROUP uniq_D BY (pid, grpid);
F = FOREACH E {
    sorted = ORDER uniq_D by ts DESC;
	diff_sum = myudf.calc_lost_pkg_cnt(sorted);
	GENERATE FLATTEN(group) AS (pid, grpid), 
	               diff_sum AS diff_sum;
};
G = FOREACH (GROUP F BY grpid) GENERATE
    ${MACRO_LOG_DAY}${MACRO_LOG_HOUR} as logtime,
    group as grpid,
    SUM(F.diff_sum) as lost_pkg_cnt;
	
H = FILTER G BY lost_pkg_cnt is not null;
STORE H INTO '/pigtest/test.result.7' USING org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 
	'jdbc:mysql://${MACRO_DBHOST}:${MACRO_DBPORT}/${MACRO_DATABASE}', 
	'${MACRO_USERNAME}', '${MACRO_PASSWORD}', 
	'REPLACE INTO  ${MACRO_TABLENAME} (logtime, grpid, lost_pkg_cnt) VALUES (?, ?, ?)');

我们选用Jython来实现UDF,主要是实现第3步的逻辑,Python代码如下:

@outputSchema("sum:long")
def calc_lost_pkg_cnt(sorted_data):
    sum = 0
    for idx in xrange(len(sorted_data)):
        if idx < len(sorted_data) - 1:
            delta = sorted_data[idx][3] - sorted_data[idx+1][3]
            if delta >= 0: 
                sum += delta 
    return sum

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏数据和云

【安全警告】Oracle 12c 多租户的SQL注入高危风险防范

在使用Oracle多租户选件时,由于Container容器和PDB融合共存,则权限控制必将更加重要,在之前的文章中我们提到,Oracle 12.2 的 loc...

37360
来自专栏数据和云

一次由查询转换引起的性能问题的分析

作者介绍 郭成日 云和恩墨北区技术工程师 专注于SQL审核和优化相关工作。曾经服务的客户涉及金融保险、电信运营商、政府、生产制造等行业。 在优化器进行查询转换...

29050
来自专栏james大数据架构

聚合索引(clustered index) / 非聚合索引(nonclustered index)

以下我面试经常问的2道题..尤其针对觉得自己SQL SERVER 还不错的同志.. 呵呵 很难有人答得好.. 各位在我收集每个人擅长的东西时,大部分都把SQL...

68050
来自专栏用户画像

网上书店管理系统数据库 sql sever

1.数据库各数据对象的设计与实现:表、约束、完整性体现、查询、视图,要求用合理的数据体现。

15930
来自专栏杨建荣的学习笔记

关于奇怪的并行进程分析(二) (r6笔记第46天)

前几天的并行问题自己分析了下,也算有了一些进展,但是目前还没有找到让人信服的理由,有些读者也比较关心这个问题,所以第二篇中会把自己的分析过程写出来,第三篇中应该...

26630
来自专栏Jerry的SAP技术分享

SAP CRM Survey调查问卷的存储模型

数据库表CRM_SVY_DB_SVS,通过如下的函数CRM_SVY_DB_SVS_CREATE插入:

17530
来自专栏杨建荣的学习笔记

生产环境sql语句调优实战第二篇(r2第38天)

在生产环境通过sql monitor监控到有一条sql执行效率很差。执行了大约5个小时,得到的sql monitor报告如下: Global Informat...

29270
来自专栏杨建荣的学习笔记

分析函数牛刀小试 (59天)

今天有个同事问我一个问题,想通过一条sql语句完成一个稍显复杂的查询。 结构如下面所示。需要算出tax apply 的值,但是需要汇总charge_amount...

32180
来自专栏java达人

MySQL的limit查询优化

我们大家都知道MySQL数据库的优化是相当重要的。其他最为常用也是最为需要优化的就是limit。MySQL的limit给分页带来了极大的方便,但数据量一大的时候...

24180
来自专栏杨建荣的学习笔记

关于segment的一个小问题

今天统计数据的时候,发现一个奇怪的小问题,通过segment去判断一个表的大小,然后查表的count,有一个表明明在,但是从segment里面去查的时候查不出来...

35680

扫码关注云+社区

领取腾讯云代金券