前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据ETL实践探索(9)---- postgresSQL 数据入库使用pandas sqlalchemy 以及多进程

大数据ETL实践探索(9)---- postgresSQL 数据入库使用pandas sqlalchemy 以及多进程

作者头像
流川疯
发布2020-06-16 11:02:53
1.3K0
发布2020-06-16 11:02:53
举报

最近有个需求,需要将200W 左右的 excel 格式数据录入 postgreSQL 数据库。 我想了几种办法:

  1. 使用psycopg2 原生 api
  2. 使用pgAdmin 页面 建立好table 直接导入csv
  3. 使用pandas to_sql 方法
  4. 使用 sqlalchemy 批量录入方法
  5. 使用python 多进程,pandas 数据清洗后用 sqlalchemy 批量录入方法

且听我娓娓道来


基础性工作

连接类

主要作用是是数据库链接时候进行数据库链接字符串的管理

代码语言:javascript
复制
# data_to_database.py

class connet_databases:
    def __init__(self):
        '''
        # 初始化数据库连接,使用pymysql模块
        #
        '''
        _host = ''
        _port = 5432
        _databases = ''  # 'produce' #
        _username = ''
        _password = ''

        self._connect = r'postgres+psycopg2://{username}:{password}@{host}:{port}/{databases}'.format(
            username=_username,
            password=_password,
            host=_host,
            port=_port,
            databases=_databases)

sqlclchemy 基础操作类

代码语言:javascript
复制
def init_sqlalchemy(dbname='',
                    Echo=True,
                    Base=declarative_base(),
                    DBSession=scoped_session(sessionmaker())):
    # 主要用来建立表
    print(dbname)
    engine = create_engine(dbname,
                           max_overflow=0,  # 超过连接池大小外最多创建的连接
                           pool_size=2,  # 连接池大小
                           pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                           pool_recycle=-1,  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                           echo=True)
    #con = engine.connect()
    try:
        # engine = create_engine(dbname, echo=Echo)
        DBSession.remove()
        DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)

        Base.metadata.drop_all(engine)
        Base.metadata.create_all(engine)

        DBSession.flush()
        DBSession.commit()

    except Exception as e:
        error = traceback.format_exc()
        Multiprocess_loaddata_toDatabase.log.logger.error(error)

    finally:
        DBSession.remove()
        engine.dispose()


def insert_list(list_obj, DBSession):
    try:
        # init_sqlalchemy(str_path_sqlite)
        DBSession.add_all(list_obj)
        DBSession.flush()
        DBSession.commit()

    except:
        DBSession.rollback()
        raise



def get_conn(dbname, Echo=True):
    # 获取链接
    try:

        engine = create_engine(dbname, echo=Echo)
        DBSession = scoped_session(sessionmaker())
        #DBSession.remove()#scoped_session 本身是线程隔离的,这块不需要remove
        DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)

        return DBSession


    except:
        DBSession.rollback()
        raise

sqlalchemy 数据库shema 表 样例

代码语言:javascript
复制
import sqlalchemy

from sqlalchemy import create_engine

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker

from sqlalchemy import Column, TEXT, String, Integer, DateTime,Float
from sqlalchemy.ext.declarative import declarative_base


Base = declarative_base()

class DetailsOfDrugsItems(Base):
    '''
    # 定义诊疗项目明细对象:

    '''
    __tablename__ = 'details_of_drugs_items'

    # 表的结构:
    id = Column(String(64), primary_key=True)
    结算编号 = Column(String(64), index=True)
    单价 = Column(Float)
    数量 = Column(Float)
    总金额 = Column(Float)
    结算日期 = Column(DateTime)

    def __init__(self):
        pass

psycopg2 原生 api

文档链接:https://www.psycopg.org/docs/module.html


pgAdmin 导入

文档:https://www.pgadmin.org/docs/pgadmin4/development/import_export_data.html

导入文件支持3中方式:

binary for a .bin file.

csv for a .csv file.

text for a .txt file.

具体导入速度待测试


pandas 数据清洗与to_sql方法录入数据

数据清洗

pandas 数据清洗细节可以参考我的文章:

大数据ETL实践探索(5)---- 大数据ETL利器之 pandas

代码语言:javascript
复制
# pandas_to_postgresql.py

def change_dtypes(col_int, col_float, df):
    '''
    AIM    -> Changing dtypes to save memory

    INPUT  -> List of column names (int, float), df

    OUTPUT -> updated df with smaller memory
    ------
    '''
    df[col_int] = df[col_int].astype('int32')
    df[col_float] = df[col_float].astype('float32')


def convert_str_datetime(df):
    '''
    AIM    -> Convert datetime(String) to datetime(format we want)

    INPUT  -> df

    OUTPUT -> updated df with new datetime format
    ------
    '''
    df.insert(loc=2, column='timestamp', value=pd.to_datetime(df.transdate, format='%Y-%m-%d %H:%M:%S.%f'))

from sqlalchemy import Column, TEXT, String, Integer, DateTime, Float


# 定义函数,自动输出DataFrme数据写入oracle的数类型字典表,配合to_sql方法使用(注意,其类型只能是SQLAlchemy type )
def mapping_df_types(df):
    dtypedict = {}
    for i, j in zip(df.columns, df.dtypes):
        if "object" in str(j):
            dtypedict.update({i: String(64)})
        if "float" in str(j):
            dtypedict.update({i: Float})
        if "int" in str(j):
            dtypedict.update({i: Float})
    return dtypedict

几个数据脱敏的样例:

姓名脱敏

代码语言:javascript
复制
def desensitization_name(name):
    new_name = str(name)[0] + '**'
    return new_name

工作单位或者住址的脱敏

代码语言:javascript
复制
import random
def desensitization_location(location):
    length = random.randint(2, len(location))
    str_desensitization = ''
    for i in range(0, length):
        str_desensitization = str_desensitization + '*'
    temp_str = location[0:length - 1]
    new_location = location.replace(temp_str, str_desensitization)

    return new_location

#基本敏感信息进行脱敏
明细['姓名'] = 明细['姓名'].apply(pandas_to_postgresql.desensitization_name)
明细['单位名称'] = 住院明细['单位名称'].apply(pandas_to_postgresql.desensitization_location)

to_sql 数据录入

参考文档:to_sql 方法文档

代码语言:javascript
复制
from sqlalchemy.types import Integer

engine = create_engine(data_to_database.connet_databases()._connect, echo=False)
df.to_sql('integers', con=engine, index=False,
          dtype={"A": Integer()})

使用 sqlalchemy 批量录入方法

不得不说的是sqlalchemy这个玩意的文档可读性真的很差。

性能调优

其实就是加个参数好像。

https://www.psycopg.org/docs/extras.html#fast-execution-helpers

Modern versions of psycopg2 include a feature known as Fast Execution Helpers , which have been shown in benchmarking to improve psycopg2’s executemany() performance, primarily with INSERT statements, by multiple orders of magnitude. SQLAlchemy allows this extension to be used for all executemany() style calls invoked by an Engine when used with multiple parameter sets, which includes the use of this feature both by the Core as well as by the ORM for inserts of objects with non-autogenerated primary key values, by adding the executemany_mode flag to create_engine():

代码语言:javascript
复制
engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    executemany_mode='batch')

Possible options for executemany_mode include:

None - By default, psycopg2’s extensions are not used, and the usual cursor.executemany() method is used when invoking batches of statements.

‘batch’ - Uses psycopg2.extras.execute_batch so that multiple copies of a SQL query, each one corresponding to a parameter set passed to executemany(), are joined into a single SQL string separated by a semicolon. This is the same behavior as was provided by the use_batch_mode=True flag.

‘values’- For Core insert() constructs only (including those emitted by the ORM automatically), the psycopg2.extras.execute_values extension is used so that multiple parameter sets are grouped into a single INSERT statement and joined together with multiple VALUES expressions. This method requires that the string text of the VALUES clause inside the INSERT statement is manipulated, so is only supported with a compiled insert() construct where the format is predictable. For all other constructs, including plain textual INSERT statements not rendered by the SQLAlchemy expression language compiler, the psycopg2.extras.execute_batch method is used. It is therefore important to note that “values” mode implies that “batch” mode is also used for all statements for which “values” mode does not apply.

For both strategies, the executemany_batch_page_size and executemany_values_page_size arguments control how many parameter sets should be represented in each execution. Because “values” mode implies a fallback down to “batch” mode for non-INSERT statements, there are two independent page size arguments. For each, the default value of None means to use psycopg2’s defaults, which at the time of this writing are quite low at 100. For the execute_values method, a number as high as 10000 may prove to be performant, whereas for execute_batch, as the number represents full statements repeated, a number closer to the default of 100 is likely more appropriate:

代码语言:javascript
复制
engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    executemany_mode='values',
    executemany_values_page_size=10000, executemany_batch_page_size=500)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-06-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基础性工作
    • 连接类
      • sqlclchemy 基础操作类
        • sqlalchemy 数据库shema 表 样例
        • psycopg2 原生 api
        • pgAdmin 导入
        • pandas 数据清洗与to_sql方法录入数据
          • 数据清洗
            • to_sql 数据录入
            • 使用 sqlalchemy 批量录入方法
              • 性能调优
              相关产品与服务
              数据脱敏
              数据脱敏(Data Masking,DMask)是一款敏感数据脱敏与水印标记工具,可对数据系统中的敏感信息进行脱敏处理并在泄漏时提供追溯依据,为企业数据共享、迁移、分发提供安全保护措施。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档