前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark1.6学习(四)——计算pv和uv的例子

spark1.6学习(四)——计算pv和uv的例子

作者头像
Java架构师必看
发布2021-05-14 17:02:58
5550
发布2021-05-14 17:02:58
举报
文章被收录于专栏:Java架构师必看Java架构师必看

本文主要介绍如何通过spark进行pv和uv的计算。一般我们经常会计算pv和uv,那么我们计算pv和uv的时候是不是性能最优的呢?

好,我们开始看例子:

首先看一下数据:

代码语言:javascript
复制
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_cccc","timestamp":1543309410741,"device":null,"ip":null,"bucket":1,"data":{"templateName":"votePost","appType":1,"sendNum":1}}
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_rvm-lhpa2gPc7QBs","timestamp":1543309410741,"device":null,"ip":null,"bucket":4,"data":{"templateName":"dailySignPush","appType":3,"sendNum":1}}
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_rvm-eeee","timestamp":1543309410741,"device":null,"ip":null,"bucket":5,"data":{"templateName":"replyPost","appType":2,"sendNum":1}}
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_rvm-xxxxx","timestamp":1543309410741,"device":null,"ip":null,"bucket":4,"data":{"templateName":"dailySignPush","appType":3,"sendNum":1}}
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_rvm-pppeeee","timestamp":1543309410741,"device":null,"ip":null,"bucket":4,"data":{"templateName":"dailySignPush","appType":3,"sendNum":1}}
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_rvm-lhpa2gPc7QBs","timestamp":1543309410741,"device":null,"ip":null,"bucket":4,"data":{"templateName":"dailySignPush","appType":3,"sendNum":1}}

然后我们按行读取数据,读取后,我们需要算出不同bucket中不同openid的sendNum的pv和uv,其中pv为sendNum的总和,uv为不重复的openId数。

代码语言:javascript
复制
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, SparkConf
import json


def parseJson(log_line):
    json_dic = json.loads(log_line)
    print json_dic
    return (json_dic["flag"], json_dic["actionType"], json_dic["data"]["appType"], json_dic["data"]["templateName"],
            json_dic["bucket"], json_dic["openId"], json_dic["data"]["sendNum"])


def fileterRdd(line):
    a, b, c, d, e, f, g = line
    if a == 'sendTemplateMessage' and b == 'success':
        return True
    else:
        return False


def splitRdd(line):
    a, b, c, d, e, f, g = line
    return ((c, d, e, f), g)


def transformRdd(line):
    (c, d, e, f), g = line
    return ((c, d, e), (f, g, 1))


def caculateRes(line1, line2):
    f, g, k = line1
    f2, g2, k2 = line2
    return (f, g + g2, k + k2)


def main():
    logFile = "/user/root/spark/sparkstudy02.txt"
    master = 'yarn-client'
    appName = 'Simple App spark study02'
    conf = SparkConf().setAppName(appName).setMaster(master)
    sc = SparkContext(conf=conf)
    logData = sc.textFile(logFile)
    logStage1 = logData.map(lambda x: parseJson(x))
    logStage2 = logStage1.filter(lambda x: fileterRdd(x))
    logStage3 = logStage2.map(lambda x: splitRdd(x))
    logStage4 = logStage3.reduceByKey(lambda x, y: x + y)
    logStage5 = logStage4.map(lambda x: transformRdd(x))
    logStage6 = logStage5.reduceByKey(lambda x, y: caculateRes(x, y))
    print("============================")
    for item in logStage1.collect():
        print(item)
    print("============================")
    for item in logStage2.collect():
        print(item)
    print("============================")
    for item in logStage3.collect():
        print(item)
    print("============================")
    for item in logStage4.collect():
        print(item)
    print("============================")
    for item in logStage5.collect():
        print(item)
    print("============================")
    for item in logStage6.collect():
        print(item)


if __name__ == '__main__':
    main()

运行结果如下:

代码语言:javascript
复制
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-lhpa2gPc7QBs', 2, 1))
((1, u'votePost', 1), (u'otU065OELPd_cccc', 1, 1))
((2, u'replyPost', 5), (u'otU065OELPd_rvm-eeee', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-pppeeee', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-xxxxx', 1, 1))
============================
((1, u'votePost', 1), (u'otU065OELPd_cccc', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-lhpa2gPc7QBs', 4, 3))
((2, u'replyPost', 5), (u'otU065OELPd_rvm-eeee', 1, 1))
[root@cdh1 demo-simple]#
[root@cdh1 demo-simple]#
[root@cdh1 demo-simple]#
[root@cdh1 demo-simple]# bash sparkstudy02.sh
============================
(u'sendTemplateMessage', u'success', 1, u'votePost', 1, u'otU065OELPd_cccc', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs', 1)
(u'sendTemplateMessage', u'success', 2, u'replyPost', 5, u'otU065OELPd_rvm-eeee', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-xxxxx', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-pppeeee', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs', 1)
============================
(u'sendTemplateMessage', u'success', 1, u'votePost', 1, u'otU065OELPd_cccc', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs', 1)
(u'sendTemplateMessage', u'success', 2, u'replyPost', 5, u'otU065OELPd_rvm-eeee', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-xxxxx', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-pppeeee', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs', 1)
============================
((1, u'votePost', 1, u'otU065OELPd_cccc'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs'), 1)
((2, u'replyPost', 5, u'otU065OELPd_rvm-eeee'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-xxxxx'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-pppeeee'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs'), 1)
============================
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs'), 2)
((1, u'votePost', 1, u'otU065OELPd_cccc'), 1)
((2, u'replyPost', 5, u'otU065OELPd_rvm-eeee'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-pppeeee'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-xxxxx'), 1)
============================
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-lhpa2gPc7QBs', 2, 1))
((1, u'votePost', 1), (u'otU065OELPd_cccc', 1, 1))
((2, u'replyPost', 5), (u'otU065OELPd_rvm-eeee', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-pppeeee', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-xxxxx', 1, 1))
============================
((1, u'votePost', 1), (u'otU065OELPd_cccc', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-lhpa2gPc7QBs', 4, 3))
((2, u'replyPost', 5), (u'otU065OELPd_rvm-eeee', 1, 1))

 最后我们进行简要的分析:

注意,还有一种方法是通过groupbykey的方式,同时可以通过distinct()操作过滤掉重复的数据从而实现uv,但是这里没有使用这种方法。

原因有如下两个:

1、reduceByKey相比groupByKey在计算效率上会更好一些。

Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.

2、如果采用两次计算,一次使用所有数据,一次使用distinct()的数据,那么最后汇总的时候如何处理呢?

同时,在处理时也会出现同一个rdd使用多次的现象,虽然我们可以使用cache把rdd暂时保存在内存中,但是我们应该尽量去使用能够一次的到pv和uv的方法。

本文来源0day__,由javajgs_com转载发布,观点不代表Java架构师必看的立场,转载请标明来源出处

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档