首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Python小案例(九)PySpark读写数据

Python小案例(九)PySpark读写数据

作者头像
HsuHeinrich
发布2023-02-24 20:04:21
发布2023-02-24 20:04:21
1.9K00
代码可运行
举报
文章被收录于专栏:HsuHeinrichHsuHeinrich
运行总次数:0
代码可运行

Python小案例(九)PySpark读写数据

有些业务场景需要Python直接读写Hive集群,也需要Python对MySQL进行操作。pyspark就是为了方便python读取Hive集群数据,当然环境搭建也免不了数仓的帮忙,常见的如开发企业内部的Jupyter Lab

⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接公司hive集群的

利用PySpark读写Hive数据

代码语言:javascript
代码运行次数:0
运行
复制
# 设置PySpark参数
from pyspark.sql import *
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.instances", "20") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .enableHiveSupport() \
    .getOrCreate()

# 导入其他相关库
import pandas as pd
from datetime import datetime
import pymysql  # mysql连接库

创建hive表

代码语言:javascript
代码运行次数:0
运行
复制
sql_hive_create = '''
CREATE TABLE IF NOT EXISTS temp.hive_mysql
    (
        id int comment "id"
        ,dtype string comment "类型"
        ,cnt int comment "数量"
    )
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''

spark.sql(sql_hive_create)
代码语言:javascript
代码运行次数:0
运行
复制
DataFrame[]

写入hive表

代码语言:javascript
代码运行次数:0
运行
复制
sql_hive_insert = '''
insert overwrite table temp.hive_mysql


select 1 as id, 'A' as dtype, 10 as cnt

union all

select 2 as id, 'B' as dtype, 23 as cnt
'''

spark.sql(sql_hive_insert)
代码语言:javascript
代码运行次数:0
运行
复制
DataFrame[]

读取hive表

代码语言:javascript
代码运行次数:0
运行
复制
sql_hive_query = '''
select 
    id
    ,dtype
    ,cnt
from
    temp.hive_mysql
'''

df = spark.sql(sql_hive_query).toPandas()
df.head()

id

dtype

cnt

0

1

A

10

1

2

B

23

利用Python读写MySQL数据

连接mysql

代码语言:javascript
代码运行次数:0
运行
复制
# 数据库信息
config = {'host': '***',  # 默认127.0.0.1
          'user': '*',  # 用户名
          'password': '*',  # 密码
          'port': 3306  # 端口,默认为3306
          'database': 'dbname'  # 数据库名称
          }
代码语言:javascript
代码运行次数:0
运行
复制
# 校验关联是否成功
con = pymysql.connect(**config)  # 建立mysql连接
cursor = con.cursor()  # 获得游标
cursor.execute("show tables")  # 查询表
代码语言:javascript
代码运行次数:0
运行
复制
1335

创建mysql表

代码语言:javascript
代码运行次数:0
运行
复制
sql_mysql_create = '''
CREATE TABLE IF NOT EXISTS `hive_mysql`
    (
        `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键'
        ,`hmid` int(30) NOT NULL DEFAULT '0' COMMENT 'hmid'
        ,`dtype` varchar(30) NOT NULL DEFAULT 'total_count' COMMENT '类型'
        ,`cnt` int(30) NOT NULL DEFAULT '0' COMMENT '数量'

        ,`dbctime` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间'
        ,`dbutime` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间'

        ,PRIMARY KEY (`id`)
        ,UNIQUE KEY `u_key` (`dtype`)
    ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT '题目数量'
'''

# cursor.execute(sql_mysql_create) # 无建表权限,可申请权限或者内部管理工具手动建表

写入mysql表

代码语言:javascript
代码运行次数:0
运行
复制
insert_mysql_sql = '''
insert into hive_mysql (hmid, dtype, cnt) values (%s, %s, %s)
'''
代码语言:javascript
代码运行次数:0
运行
复制
try:
    con = pymysql.connect(**config)  # 建立mysql连接
    cursor = con.cursor()  # 获得游标
    
    # 清空数据
    cursor.execute('truncate table hive_mysql')
    
    for i in range(df.__len__()):
        # 插入的数据类型需要与数据库中字段类型保持一致
        cursor.execute(insert_mysql_sql, (int(df.iloc[i, 0]), df.iloc[i, 1], int(df.iloc[i, 2])))

    # 提交所有执行命令
    con.commit()
    print('数据写入成功!')
    cursor.close()  # 关闭游标
except Exception as e:
    raise e
finally:
    con.close()  # 关闭连接
代码语言:javascript
代码运行次数:0
运行
复制
数据写入成功!

读取mysql表

代码语言:javascript
代码运行次数:0
运行
复制
sql_mysql_query = '''
select 
    hmid
    ,dtype
    ,cnt
from
    hive_mysql
'''
代码语言:javascript
代码运行次数:0
运行
复制
try:
    con = pymysql.connect(**config)  # 建立mysql连接
    cursor = con.cursor()  # 获得游标
    
    cursor.execute(sql_mysql_query)  # 执行sql语句
    df_mysql = pd.DataFrame(cursor.fetchall())  # 获取结果转为dataframe

    # 提交所有执行命令
    con.commit()
    
    cursor.close()  # 关闭游标
except Exception as e:
    raise e
finally:
    con.close()  # 关闭连接
代码语言:javascript
代码运行次数:0
运行
复制
df_mysql.head()

0

1

2

0

1

A

10

1

2

B

23

利用PySpark写入MySQL数据

日常最常见的是利用PySpark将数据批量写入MySQL,减少删表建表的操作。但由于笔者当前公司线上环境没有配置mysql的驱动,下述方法没法使用。

MySQL的安全性要求很高,正常情况下,分析师关于MySQL的权限是比较低的。所以很多关于MySQL的操作方法也是无奈之举~

代码语言:javascript
代码运行次数:0
运行
复制
# ## 线上环境需配置mysql的驱动
# sp = spark.sql(sql_hive_query)
# sp.write.jdbc(url="jdbc:mysql://***:3306/dbname",   # dbname为库名,必须已存在(该语句不会创建库)
#               mode="overwrite",     # 模式分为overwrite 重写表    append表内内容追加
#               table="hive_mysql",    # 表名,表不需要去创建,可以自己生成
#               properties={'driver':'com.mysql.jdbc.Driver', 'user':'*', 'password':'*'})

总结

Python读取Hive数据,以及利用Python关联Hive和MySQL是后续自动化操作的基础,因此简单的理解PySpark如何进行Hive操作即可。

共勉~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 HsuHeinrich 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Python小案例(九)PySpark读写数据
    • 利用PySpark读写Hive数据
      • 创建hive表
      • 写入hive表
      • 读取hive表
    • 利用Python读写MySQL数据
      • 连接mysql
      • 创建mysql表
      • 写入mysql表
      • 读取mysql表
    • 利用PySpark写入MySQL数据
    • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档