专栏首页流川疯编写程序的艺术大数据ETL实践探索(3)---- 大数据ETL利器之pyspark

大数据ETL实践探索(3)---- 大数据ETL利器之pyspark


大数据ETL 系列文章简介

本系列文章主要针对ETL大数据处理这一典型场景,基于python语言使用Oracle、aws、Elastic search 、Spark 相关组件进行一些基本的数据导入导出实战,如:

  • oracle使用数据泵impdp进行导入操作。
  • aws使用awscli进行上传下载操作。
  • 本地文件上传至aws es
  • spark dataframe录入ElasticSearch

等典型数据ETL功能的探索。

系列文章: 1.大数据ETL实践探索(1)---- python 与oracle数据库导入导出 2.大数据ETL实践探索(2)---- python 与aws 交互 3.大数据ETL实践探索(3)---- pyspark 之大数据ETL利器 4.大数据ETL实践探索(4)---- 之 搜索神器elastic search 5.使用python对数据库,云平台,oracle,aws,es导入导出实战 6.aws ec2 配置ftp----使用vsftp 7.浅谈pandas,pyspark 的大数据ETL实践经验


pyspark Dataframe ETL

本部分内容主要在 系列文章7 :浅谈pandas,pyspark 的大数据ETL实践经验 上已有介绍 ,不用多说


spark dataframe 数据导入Elasticsearch

下面重点介绍 使用spark 作为工具和其他组件进行交互(数据导入导出)的方法

ES 对于spark 的相关支持做的非常好,https://www.elastic.co/guide/en/elasticsearch/hadoop/2.4/spark.html 在官网的文档中基本上说的比较清楚,但是大部分代码都是java 的,所以下面我们给出python 的demo 代码

dataframe 及环境初始化

初始化, spark 第三方网站下载包:elasticsearch-spark-20_2.11-6.1.1.jar http://spark.apache.org/third-party-projects.html

import sys
import os
print(os.getcwd())
# 加载包得放在这里
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars elasticsearch-spark-20_2.11-6.1.1.jar pyspark-shell'

import os
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
import json
import math
import numbers
import numpy as np
import pandas as pd

os.environ["PYSPARK_PYTHON"] = "/home/hadoop/anaconda/envs/playground_py36/bin/python"



try:
    spark.stop()
    print("Stopped a SparkSession")
except Exception as e:
    print("No existing SparkSession")

SPARK_DRIVER_MEMORY= "10G"
SPARK_DRIVER_CORE = "5"
SPARK_EXECUTOR_MEMORY= "3G"
SPARK_EXECUTOR_CORE = "1"


conf = SparkConf().\
        setAppName("insurance_dataschema").\
        setMaster('yarn-client').\
        set('spark.executor.cores', SPARK_EXECUTOR_CORE).\
        set('spark.executor.memory', SPARK_EXECUTOR_MEMORY).\
        set('spark.driver.cores', SPARK_DRIVER_CORE).\
        set('spark.driver.memory', SPARK_DRIVER_MEMORY).\
        set('spark.driver.maxResultSize', '0').\
        set("es.index.auto.create", "true").\
        set("es.resource", "tempindex/temptype").\
        set("spark.jars", "elasticsearch-hadoop-6.1.1.zip")  # set the spark.jars
    
        
spark = SparkSession.builder.\
    config(conf=conf).\
    getOrCreate()

sc=spark.sparkContext
hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
  • 数据加载
#数据加载
df = (spark
                 .read
                 .option("header","true")
                 .option("multiLine", "true")
                 .csv('EXPORT.csv')
                 .cache()
                )
print(df.count())


#
  • 数据清洗,增加一列,或者针对某一列进行udf 转换
'''  
#加一列yiyong ,如果是众城数据则为zhongcheng
'''

from pyspark.sql.functions import udf


from pyspark.sql import functions
df = df.withColumn('customer',functions.lit("腾讯用户"))
  • 使用udf 清洗时间格式及数字格式
#udf 清洗时间
#清洗日期格式字段
from dateutil import parser

def clean_date(str_date):
    try:
        if str_date:
            d = parser.parse(str_date)
            return d.strftime('%Y-%m-%d')
        else:
            return None
    except Exception as e:
         return None
        


func_udf_clean_date = udf(clean_date, StringType())

def is_number(s):
    try:
        float(s)
        return True
    except ValueError:
        pass
    return False

def clean_number(str_number):

    try:
        if str_number:

                if is_number(str_number):
                    return str_number
                else:
                    None
        else:
            return None
    except Exception as e:
        return None




func_udf_clean_number = udf(clean_number, StringType())

column_Date = [
"DATE_FROM",
"DATE_TO",
]


for column in column_Date:
      df=df.withColumn(column,  func_udf_clean_date(df[column]))

df.select(column_Date).show(2)
#数据写入

df.write.format("org.elasticsearch.spark.sql").\
option("es.nodes", "IP").\
option("es.port","9002").\
mode("Overwrite").\
save("is/doc")

列式数据存储格式parquet

parquet 是针对列式数据存储的一种申请的压缩格式,百万级的数据用spark 加载成pyspark 的dataframe 然后在进行count 操作基本上是秒出结果

读写 demo code

#直接用pyspark dataframe写parquet数据(overwrite模式)
df.write.mode("overwrite").parquet("data.parquet")

# 读取parquet 到pyspark dataframe,并统计数据条目

DF = spark.read.parquet("data.parquet")
DF.count()

Parquet 用于 Spark SQL 时表现非常出色。它不仅提供了更高的压缩率,还允许通过已选定的列和低级别的读取器过滤器来只读取感兴趣的记录。因此,如果需要多次传递数据,那么花费一些时间编码现有的平面文件可能是值得的。

参考

parquet

https://www.ibm.com/developerworks/cn/analytics/blog/5-reasons-to-choose-parquet-for-spark-sql/index.html

parquet 实战应用

http://www.cnblogs.com/piaolingzxh/p/5469964.html

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • OpenCV鼠标画图例程,鼠标绘制矩形

    用户1539362
  • python3.4 + Django1.7.7 表单的一些问题

    上面是没有调用cleaned_data的提交结果,可见模版直接把form里面的整个标签都接收过来了

    用户1539362
  • python +Django 搭建web开发环境初步,显示当前时间

    网上很多关于django跟python 开发的资料,这块我正在实习准备用这个两个合起来搞一个基于web 的东西出来现在开始学习,写点东西记录一下心得。

    用户1539362
  • python import问题

    python中包:一个文件夹中必须要有__init__.py文件,才能被识别为 包,才能被其他模块引入 python中 模块的查找顺序是:内存中已经加载的模块...

    用户1558882
  • 应用程序的通信成本

    应用程序的通信成本 什么是通信 一个程序中两个以上功能相互传递信号或数据叫做通信。 什么是成本 这是是指时间成本与空间成本。 时间就是传递数据所花费的时间。空间...

    netkiller old
  • 应用程序的通信成本

    应用程序的通信成本 什么是通信 一个程序中两个以上功能相互传递信号或数据叫做通信。 什么是成本 这是是指时间成本与空间成本。 时间就是传递数据所花费的时间。空间...

    netkiller old
  • 图解django

    Django是用Python开发的一个免费开源的Web框架,可以用于快速的网站。Python下有许多不同的Web框架。Django是重量级选手中最具有代表性的一...

    cctester
  • 资本的长期主义,长期主义的新零售

    在投资界比较流行的一个词语叫“长期主义”,意思是说投资要告别快进快出,急功急利的方式,选择一种更加长远的方式。这其实是一种相当明智的做法,因为在经历了那样一个红...

    孟永辉
  • Shiro系列 | 《Shiro开发详细教程》第四章:Shiro中Ini配置

    之前章节我们已经接触过一些 INI 配置规则了,如果大家使用过如 Spring 之类的 IOC/DI 容器的话,Shiro 提供的 INI 配置也是非常类似的,...

    码神联盟
  • 《Experiment with MATLAB》读书笔记(六)

    读书笔记(六) 这是第六部分绘图 主要通过绘制分形图案展示绘图命令 function fern shg %显示画图界面 clf reset %清...

    万木逢春

扫码关注云+社区

领取腾讯云代金券