使用 Apache PIG 统计积累型数据的差值

线上运行的生产系统会定时采集一项丢包数据,这项数据与某个进程相关联,从进程启动开始就一直递增,每隔1分钟采集一次数据,当进程重启之后,这项数据会清零。现在要求使用PIG来统计某个时间段(1 hour)内,多个进程此项数据的变化量汇总。可以看到数据形如以下形式。进程会通过GrpID分组,每个组内有多个进程,需要计算的是各组VALUE值的总的变化量。总数据量约为12w。

PID		GrpID	TIMESTAMP				VALUE
a.1		a		2017-08-04 20:01:00		5
a.1		a		2017-08-04 20:02:00		7
a.2		a		2017-08-04 20:00:00		13
a.1		a		2017-08-04 20:03:00		0
a.1		a		2017-08-04 20:04:00		10
b.3		b		2017-08-04 20:10:00		12
b.3		b		2017-08-04 20:11:00		27
b.3		b		2017-08-04 20:12:00		33
c.2		c		2017-08-04 20:01:00		27
c.3		c		2017-08-04 20:02:00		30
......

粗看起来这个问题似乎很简单,因为数据量并不是很大,可以首先LOAD整个数据集,然后按照PID分组,在分组内对TIMESTAMP时间排序,计算最后一个与第一个VALUE的差值,然后再对GrpID分组将刚才计算出来的差值求和即可。仔细想想这是不行的,因为在每个PID分组内,本次时间片内的数据有可能因为进程重启而清零(如下图),所以不能简单的按照时间排序后尾首相减来计算。

这种累积型数据的计算方式应该如下图,计算多个分段分别的diff值,最后汇总。

具体的算法也非常简单:

  1. 对数据集按照PID聚合
  2. 对于每个聚合子集,按照TIMESTAMP进行ASC排序
  3. 对于排序过后的VALUE序列 V1, V_2, V_3 …… ,V(n-1), V_n 计算:
SUM_Diff = SUM((V_t – V_(t-1)) >= 0 ? (V_t – V_(t-1)) : 0)

从最后一个VALUE开始,计算Vt – V(t-1) 的值并求和,当遇到差值为负的情况,也就是出现了进程重启清零的情况,就加零。

  1. 对GrpID聚合,求出一个分组下所有进程SUM_Diff的求和值。

上述算法很简单,用脚本可以很快搞定。但如果需要用PIG任务来写,第3个步骤就没有这么容易实现了。不过好在PIG脚本可以调用其他语言编写的UDF(User Define Function)来完成某些复杂的计算逻辑,我们就采用此种方案。如何使用Jython实现PIG UDF请参考官方文档

https://pig.apache.org/docs/r0.9.1/udf.html

先来看PIG脚本代码:

REGISTER 'pycalc/calc_lost_pkg.py' using jython as myudf;
REGISTER /data/gdata/pig-0.15.0/thirdparty/mysql-connector-java-5.1.38-bin.jar;
REGISTER /data/gdata/pig-0.15.0/thirdparty/piggybank-0.15.0.jar;
REGISTER /data/gdata/pig-0.16.0/thirdparty/KVLoader-0.5.1.jar

A = LOAD 'data.log' USING com.tencent.gdata.pig.KVLoader('&', '=', 'PID:GrpID:TIMESTAMP:VALUE');
B = FOREACH A GENERATE $0 AS pid, $1 AS grpid, $2 as ts, (int)$3 as value;
C = FILTER B BY pid is not null AND grpid is not null AND ts is not null AND value is not null;
D = FOREACH C GENERATE
            pid as pid,
			grpid as grpid,
            ts as ts,
            value as value;
			
uniq_D = DISTINCT D;
E = GROUP uniq_D BY (pid, grpid);
F = FOREACH E {
    sorted = ORDER uniq_D by ts DESC;
	diff_sum = myudf.calc_lost_pkg_cnt(sorted);
	GENERATE FLATTEN(group) AS (pid, grpid), 
	               diff_sum AS diff_sum;
};
G = FOREACH (GROUP F BY grpid) GENERATE
    ${MACRO_LOG_DAY}${MACRO_LOG_HOUR} as logtime,
    group as grpid,
    SUM(F.diff_sum) as lost_pkg_cnt;
	
H = FILTER G BY lost_pkg_cnt is not null;
STORE H INTO '/pigtest/test.result.7' USING org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 
	'jdbc:mysql://${MACRO_DBHOST}:${MACRO_DBPORT}/${MACRO_DATABASE}', 
	'${MACRO_USERNAME}', '${MACRO_PASSWORD}', 
	'REPLACE INTO  ${MACRO_TABLENAME} (logtime, grpid, lost_pkg_cnt) VALUES (?, ?, ?)');

我们选用Jython来实现UDF,主要是实现第3步的逻辑,Python代码如下:

@outputSchema("sum:long")
def calc_lost_pkg_cnt(sorted_data):
    sum = 0
    for idx in xrange(len(sorted_data)):
        if idx < len(sorted_data) - 1:
            delta = sorted_data[idx][3] - sorted_data[idx+1][3]
            if delta >= 0: 
                sum += delta 
    return sum

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊ImageIO使用argb操作jpg的bug

本文主要来聊一下使用ImageIO在BufferedImage.TYPE_INT_ARGB默认下操作jpg格式图片显示黑色的bug。

501
来自专栏Hongten

java开发_java小程序_邮死你(yousini)_源码下载

==========================================================

842
来自专栏数据和云

Oracle 12c 新特性:SQL Plan Directives与过量的动态采样解析

在 12c 中,优化器进行了较大的改变,推出了 Adaptive query optimization,从整体上说,Adaptive query optimiz...

862
来自专栏Hongten

java开发_闹钟

==========================================================

752
来自专栏新智元

【TensorFlow1.2.0版发布】14大新功能,增加Intel MKL集成

【新智元导读】TensorFlow 今天发布最新版 1.2.0,公布了14大最新功能。新智元带来最新介绍,包括 API 的重要变化、contrib API的变化...

3229
来自专栏生信宝典

network3D 交互式网络生成

networkD3是基于D3JS的R包交互式绘图工具,用于转换R语言生成的图为交互式网页嵌套图。目前支持网络图,桑基图,树枝图 (后续相继推出)等。 关于网络图...

2655
来自专栏点滴积累

使用 python 处理 nc 数据

1354
来自专栏生信技能树

转录组数据拼接之应用篇

前前后后接触了一些基因组和转录组拼接的工作,而且后期还会持续进行。期间遇到了各种各样莫名其妙的坑,也尝试了一些不同的方法和软件,简单做一个阶段性小结。上周的今天...

3916
来自专栏算法+

3D Lut 电影级调色算法 附完整C代码

长话短说,3d lut(全称 : 3D Lookup table )它是通过建立一个颜色映射表,对图像的色调进行重调的算法。

66010
来自专栏新智元

TensorFlow正式发布1.5.0,支持CUDA 9和cuDNN 7,双倍提速

来源:Github 编译:费欣欣 【新智元导读】TensorFlow今天正式发布了1.5.0版本,支持CUDA 9和cuDNN 7,进一步提速。并且,从1.6版...

2956

扫码关注云+社区