Spark日志清洗流程

作者:哈士奇说喵

前言

整理了一下使用spark来进行日志清洗及数据处理的套路,这里以pyspark为例。

pyspark的启动任务套路

对于使用spark作为查询清洗工具而言,启动spark的套路主要使用sh文件进行终端带参数启动,启动后开始调用sh传递处理参数,并且构造好sparkconf后传递提交(spark-submit) python文件,当然最主要的函数逻辑都是在python的文件中处理的。对于工程类而非工具类,就稍微复杂点,而且会显得更规范点,以下为一个例子的套路。

对于参数输入启动的方法主要有两种第一种是构造启动的sh文件,第二种是构造启动的python文件,启动文件的作用就是在终端执行该文件,然后输入参数,参数会被启动文件捕获并传递

方法一:start.sh

# start.sh

read -p "Please input info as (year/month/day) and split by space:" raw_info

year=$(echo $raw_info | awk -F'/' '')

month=$(echo $raw_info | awk -F'/' '')

day=$(echo $raw_info | awk -F'/' '')

echo $year

echo $month

echo $day

# 进行spark任务的提交,这里需要注意的是提交的时候需要设置队列等

方法二:start.py

# start.py

info=raw_input("Please input info as (year/month/day/):") # raw_input make input as string

year=info.split("/")[0]

month=info.split("/")[1]

day=info.split("/")[2]

print "year=,month=,day=".format(year=year,month=month,day=day)

# 进行spark任务的提交,这里需要注意的是提交的时候需要设置队列等

从脚本中获取外界传入的参数后,开始执行核心的逻辑操作,这里以pyspark为例,新建deal.py。

#deal.py

#-*-coding:utf-8-*-

import sys

from pyspark import *

from pyspark.sql import *

import time

# 核心操作计算

def func_1():

funct...

def dealfunc(sc):

rdd=sc.textFile(hdfspath).map(func_1)... # 构建rdd方式可以从hdfs上把文件加载进来处理,之后进行各种Transformation操作,作为清洗数据的第一步

# 方法一:如果清洗完后数据量不大,完全可以加载到内存中然后当做流来处理,但是数据量非常大,那请不要作死使用collect()

for data in rdd.collect():

func...

==================

# 方法二:直接进行多次map,filter,等各种操作,凡事都可以靠函数解决,一个不够就写两个,函数传递进去的时候就当字符串流处理就可以了

data=rdd.map(func_1).filter(func_2).reduceByKey(func_3).map(func_4)...

# 如果需要把清洗好的数据上传hdfs,

data.repartition(5).saveAsTextFile('/user/test/restore')

# 获取上层传递进来的参数

year = sys.argv[1]

month = sys.argv[2]

day = sys.argv[3]

#在pysaprk中初始化spark

sparkconf = (SparkConf().setAppName("Wifi-God wants to see air quality").set("spark.akka.frameSize","2000"))

sc = (conf=sparkconf)

# 强烈建议使用try,finally来处理

try:

dealfunc(sc)

except Exceptions as ex:

print ex

finally:

sc.stop() # necessay

对于一般的任务,这样差不多就可以结束了,然而对于日志清洗和存储来说,必不可少的是解压和存储过程,日志的量非常多,所以即使存在hdfs上也是压缩过的,比如文件格式是xxx.lzo样式的,即使取cat,没有解压也将看不到任何有用的信息,这里就要用到newAPIHadoopFile函数了,当然,textFile仍然可用。

# 读取

# 用法见官方文档

# 上传至hdfs,使用saveAsTextFile

# repartition的作用是重新设定rdd分区数(关系到存入hdfs)

rdd_dealed.repartition(5).saveAsTextFile('/user/test/rdd_restore')

# 存在hdfs上rdd_restore文件夹中有几个片段就是有几个区

# $ hadoop fs -ls /user/test/rdd_restore

# Found 5 items

# -rw-r--r-- 3 owntest 0 2017-04-14 16:42 /user/test/rdd_restore/_SUCCESS

# drwxr-xr-x - owntest 0 2017-04-14 16:42 /user/test/rdd_restore/_temporary

# -rw-r--r-- 3 owntest 51 2017-04-14 16:42 /user/test/rdd_restore/part-00000

# -rw-r--r-- 3 owntest 150 2017-04-14 16:42 /user/rdd_restore/part-00001

# -rw-r--r-- 3 owntest 251 2017-04-14 16:42 /user/rdd_restore/part-00002

关于alais一些技巧,在终端各个目录内都可直接启动对应文件,工具化最方便的地方

#第一步,修改.bashrc文件,添加上以下信息,然后保存退出

$ vi ~/.bashrc

alias test='cd ~/user/test; sh start.sh' #这句话的意思是,当在终端输入test的时候,执行的命令是先切换到user/test文件夹,然后,执行start.sh

#第二步,使配置生效,或者重新再启动终端都可

$source ~/.bashrc

#第三步,启动

$test

#这样,使用别名的方法就可以再任何路径下启动自己对应的工具,值得注意的是,别名也不可用的太泛滥,不然不好管理

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180820B1BT9200?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

同媒体快讯

扫码关注云+社区

领取腾讯云代金券