最近有个需求,需要将200W 左右的 excel 格式数据录入 postgreSQL 数据库。 我想了几种办法:
且听我娓娓道来
主要作用是是数据库链接时候进行数据库链接字符串的管理
# data_to_database.py
class connet_databases:
def __init__(self):
'''
# 初始化数据库连接,使用pymysql模块
#
'''
_host = ''
_port = 5432
_databases = '' # 'produce' #
_username = ''
_password = ''
self._connect = r'postgres+psycopg2://{username}:{password}@{host}:{port}/{databases}'.format(
username=_username,
password=_password,
host=_host,
port=_port,
databases=_databases)
def init_sqlalchemy(dbname='',
Echo=True,
Base=declarative_base(),
DBSession=scoped_session(sessionmaker())):
# 主要用来建立表
print(dbname)
engine = create_engine(dbname,
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=2, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1, # 多久之后对线程池中的线程进行一次连接的回收(重置)
echo=True)
#con = engine.connect()
try:
# engine = create_engine(dbname, echo=Echo)
DBSession.remove()
DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
DBSession.flush()
DBSession.commit()
except Exception as e:
error = traceback.format_exc()
Multiprocess_loaddata_toDatabase.log.logger.error(error)
finally:
DBSession.remove()
engine.dispose()
def insert_list(list_obj, DBSession):
try:
# init_sqlalchemy(str_path_sqlite)
DBSession.add_all(list_obj)
DBSession.flush()
DBSession.commit()
except:
DBSession.rollback()
raise
def get_conn(dbname, Echo=True):
# 获取链接
try:
engine = create_engine(dbname, echo=Echo)
DBSession = scoped_session(sessionmaker())
#DBSession.remove()#scoped_session 本身是线程隔离的,这块不需要remove
DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
return DBSession
except:
DBSession.rollback()
raise
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import Column, TEXT, String, Integer, DateTime,Float
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class DetailsOfDrugsItems(Base):
'''
# 定义诊疗项目明细对象:
'''
__tablename__ = 'details_of_drugs_items'
# 表的结构:
id = Column(String(64), primary_key=True)
结算编号 = Column(String(64), index=True)
单价 = Column(Float)
数量 = Column(Float)
总金额 = Column(Float)
结算日期 = Column(DateTime)
def __init__(self):
pass
文档链接:https://www.psycopg.org/docs/module.html
文档:https://www.pgadmin.org/docs/pgadmin4/development/import_export_data.html
导入文件支持3中方式:
binary for a .bin file.
csv for a .csv file.
text for a .txt file.
具体导入速度待测试
pandas 数据清洗细节可以参考我的文章:
大数据ETL实践探索(5)---- 大数据ETL利器之 pandas
# pandas_to_postgresql.py
def change_dtypes(col_int, col_float, df):
'''
AIM -> Changing dtypes to save memory
INPUT -> List of column names (int, float), df
OUTPUT -> updated df with smaller memory
------
'''
df[col_int] = df[col_int].astype('int32')
df[col_float] = df[col_float].astype('float32')
def convert_str_datetime(df):
'''
AIM -> Convert datetime(String) to datetime(format we want)
INPUT -> df
OUTPUT -> updated df with new datetime format
------
'''
df.insert(loc=2, column='timestamp', value=pd.to_datetime(df.transdate, format='%Y-%m-%d %H:%M:%S.%f'))
from sqlalchemy import Column, TEXT, String, Integer, DateTime, Float
# 定义函数,自动输出DataFrme数据写入oracle的数类型字典表,配合to_sql方法使用(注意,其类型只能是SQLAlchemy type )
def mapping_df_types(df):
dtypedict = {}
for i, j in zip(df.columns, df.dtypes):
if "object" in str(j):
dtypedict.update({i: String(64)})
if "float" in str(j):
dtypedict.update({i: Float})
if "int" in str(j):
dtypedict.update({i: Float})
return dtypedict
几个数据脱敏的样例:
姓名脱敏
def desensitization_name(name):
new_name = str(name)[0] + '**'
return new_name
工作单位或者住址的脱敏
import random
def desensitization_location(location):
length = random.randint(2, len(location))
str_desensitization = ''
for i in range(0, length):
str_desensitization = str_desensitization + '*'
temp_str = location[0:length - 1]
new_location = location.replace(temp_str, str_desensitization)
return new_location
#基本敏感信息进行脱敏
明细['姓名'] = 明细['姓名'].apply(pandas_to_postgresql.desensitization_name)
明细['单位名称'] = 住院明细['单位名称'].apply(pandas_to_postgresql.desensitization_location)
参考文档:to_sql 方法文档
from sqlalchemy.types import Integer
engine = create_engine(data_to_database.connet_databases()._connect, echo=False)
df.to_sql('integers', con=engine, index=False,
dtype={"A": Integer()})
不得不说的是sqlalchemy这个玩意的文档可读性真的很差。
其实就是加个参数好像。
https://www.psycopg.org/docs/extras.html#fast-execution-helpers
Modern versions of psycopg2 include a feature known as Fast Execution Helpers , which have been shown in benchmarking to improve psycopg2’s executemany() performance, primarily with INSERT statements, by multiple orders of magnitude. SQLAlchemy allows this extension to be used for all executemany() style calls invoked by an Engine when used with multiple parameter sets, which includes the use of this feature both by the Core as well as by the ORM for inserts of objects with non-autogenerated primary key values, by adding the executemany_mode flag to create_engine():
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
executemany_mode='batch')
Possible options for executemany_mode include:
None - By default, psycopg2’s extensions are not used, and the usual cursor.executemany() method is used when invoking batches of statements.
‘batch’ - Uses psycopg2.extras.execute_batch so that multiple copies of a SQL query, each one corresponding to a parameter set passed to executemany(), are joined into a single SQL string separated by a semicolon. This is the same behavior as was provided by the use_batch_mode=True flag.
‘values’- For Core insert() constructs only (including those emitted by the ORM automatically), the psycopg2.extras.execute_values extension is used so that multiple parameter sets are grouped into a single INSERT statement and joined together with multiple VALUES expressions. This method requires that the string text of the VALUES clause inside the INSERT statement is manipulated, so is only supported with a compiled insert() construct where the format is predictable. For all other constructs, including plain textual INSERT statements not rendered by the SQLAlchemy expression language compiler, the psycopg2.extras.execute_batch method is used. It is therefore important to note that “values” mode implies that “batch” mode is also used for all statements for which “values” mode does not apply.
For both strategies, the executemany_batch_page_size and executemany_values_page_size arguments control how many parameter sets should be represented in each execution. Because “values” mode implies a fallback down to “batch” mode for non-INSERT statements, there are two independent page size arguments. For each, the default value of None means to use psycopg2’s defaults, which at the time of this writing are quite low at 100. For the execute_values method, a number as high as 10000 may prove to be performant, whereas for execute_batch, as the number represents full statements repeated, a number closer to the default of 100 is likely more appropriate:
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
executemany_mode='values',
executemany_values_page_size=10000, executemany_batch_page_size=500)