前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用 Apache PIG 统计积累型数据的差值

使用 Apache PIG 统计积累型数据的差值

原创
作者头像
邵靖
修改2017-08-10 14:21:10
8560
修改2017-08-10 14:21:10
举报
文章被收录于专栏:邵靖的专栏邵靖的专栏

线上运行的生产系统会定时采集一项丢包数据,这项数据与某个进程相关联,从进程启动开始就一直递增,每隔1分钟采集一次数据,当进程重启之后,这项数据会清零。现在要求使用PIG来统计某个时间段(1 hour)内,多个进程此项数据的变化量汇总。可以看到数据形如以下形式。进程会通过GrpID分组,每个组内有多个进程,需要计算的是各组VALUE值的总的变化量。总数据量约为12w。

代码语言:txt
复制
PID		GrpID	TIMESTAMP				VALUE
a.1		a		2017-08-04 20:01:00		5
a.1		a		2017-08-04 20:02:00		7
a.2		a		2017-08-04 20:00:00		13
a.1		a		2017-08-04 20:03:00		0
a.1		a		2017-08-04 20:04:00		10
b.3		b		2017-08-04 20:10:00		12
b.3		b		2017-08-04 20:11:00		27
b.3		b		2017-08-04 20:12:00		33
c.2		c		2017-08-04 20:01:00		27
c.3		c		2017-08-04 20:02:00		30
......

粗看起来这个问题似乎很简单,因为数据量并不是很大,可以首先LOAD整个数据集,然后按照PID分组,在分组内对TIMESTAMP时间排序,计算最后一个与第一个VALUE的差值,然后再对GrpID分组将刚才计算出来的差值求和即可。仔细想想这是不行的,因为在每个PID分组内,本次时间片内的数据有可能因为进程重启而清零(如下图),所以不能简单的按照时间排序后尾首相减来计算。

[1502345407840_4827_1502345410051.png]
[1502345407840_4827_1502345410051.png]

这种累积型数据的计算方式应该如下图,计算多个分段分别的diff值,最后汇总。

[1502345430844_9904_1502345432924.png]
[1502345430844_9904_1502345432924.png]

具体的算法也非常简单:

  1. 对数据集按照PID聚合
  2. 对于每个聚合子集,按照TIMESTAMP进行ASC排序
  3. 对于排序过后的VALUE序列 V1, V_2, V_3 …… ,V(n-1), V_n 计算:
代码语言:txt
复制
SUM_Diff = SUM((V_t – V_(t-1)) >= 0 ? (V_t – V_(t-1)) : 0)

从最后一个VALUE开始,计算Vt – V(t-1) 的值并求和,当遇到差值为负的情况,也就是出现了进程重启清零的情况,就加零。

  1. 对GrpID聚合,求出一个分组下所有进程SUM_Diff的求和值。

上述算法很简单,用脚本可以很快搞定。但如果需要用PIG任务来写,第3个步骤就没有这么容易实现了。不过好在PIG脚本可以调用其他语言编写的UDF(User Define Function)来完成某些复杂的计算逻辑,我们就采用此种方案。如何使用Jython实现PIG UDF请参考官方文档

https://pig.apache.org/docs/r0.9.1/udf.html

先来看PIG脚本代码:

代码语言:txt
复制
REGISTER 'pycalc/calc_lost_pkg.py' using jython as myudf;
REGISTER /data/gdata/pig-0.15.0/thirdparty/mysql-connector-java-5.1.38-bin.jar;
REGISTER /data/gdata/pig-0.15.0/thirdparty/piggybank-0.15.0.jar;
REGISTER /data/gdata/pig-0.16.0/thirdparty/KVLoader-0.5.1.jar

A = LOAD 'data.log' USING com.tencent.gdata.pig.KVLoader('&', '=', 'PID:GrpID:TIMESTAMP:VALUE');
B = FOREACH A GENERATE $0 AS pid, $1 AS grpid, $2 as ts, (int)$3 as value;
C = FILTER B BY pid is not null AND grpid is not null AND ts is not null AND value is not null;
D = FOREACH C GENERATE
            pid as pid,
			grpid as grpid,
            ts as ts,
            value as value;
			
uniq_D = DISTINCT D;
E = GROUP uniq_D BY (pid, grpid);
F = FOREACH E {
    sorted = ORDER uniq_D by ts DESC;
	diff_sum = myudf.calc_lost_pkg_cnt(sorted);
	GENERATE FLATTEN(group) AS (pid, grpid), 
	               diff_sum AS diff_sum;
};
G = FOREACH (GROUP F BY grpid) GENERATE
    ${MACRO_LOG_DAY}${MACRO_LOG_HOUR} as logtime,
    group as grpid,
    SUM(F.diff_sum) as lost_pkg_cnt;
	
H = FILTER G BY lost_pkg_cnt is not null;
STORE H INTO '/pigtest/test.result.7' USING org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 
	'jdbc:mysql://${MACRO_DBHOST}:${MACRO_DBPORT}/${MACRO_DATABASE}', 
	'${MACRO_USERNAME}', '${MACRO_PASSWORD}', 
	'REPLACE INTO  ${MACRO_TABLENAME} (logtime, grpid, lost_pkg_cnt) VALUES (?, ?, ?)');

我们选用Jython来实现UDF,主要是实现第3步的逻辑,Python代码如下:

代码语言:txt
复制
@outputSchema("sum:long")
def calc_lost_pkg_cnt(sorted_data):
    sum = 0
    for idx in xrange(len(sorted_data)):
        if idx < len(sorted_data) - 1:
            delta = sorted_data[idx][3] - sorted_data[idx+1][3]
            if delta >= 0: 
                sum += delta 
    return sum

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档