我的最终目标是将SQL/Python一起用于一个有太多数据供熊猫处理的项目(至少在我的机器上)。所以,我和dask
一起去了:
对于#1和#2,它们使用最小内存执行大约30秒(几个SQL查询~200行代码使用dask操作一个大型数据集)。快又好玩!
但是,上面的第三条一直是主要的瓶颈。在(1.内存和2.速度(执行时间))方面,有哪些有效的方法可以用dask或其他方法完成#3呢?看看更多的背景,以及我尝试过的和我得出的一些结论。
对于上面的#1、#2和#3,由于内存限制/执行时间长,这是我发现不可能与熊猫一起完成的任务,但是dask
解决了上面提到的#1和#2,但是我仍然在努力解决#3 --以一种自动的方式将数据返回到SQL表,而我并没有发送到.csv,然后导入到Server中。我尝试过.compute()
将dataframe转换成一个熊猫数据格式,然后编写to_sql
,但这种方法没有达到使用dask读取/数据模型的目的,而且内存不足/无论如何都要花费很长时间来执行。
因此,新的计划是使用to_csv
每天生成一个新的.csv,并使用查询将数据批量插入到表中。我认为这仍然是一个可行的解决方案;但是,今天,我非常高兴地发现,dask发布了一个新的to_sql
函数(https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_sql)。利用关于这个主题的现有StackOverflow文章/博客(例如,FrancoisLeblc-https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html),我修改了所有参数,以找到最有效的组合,这些组合具有最快的执行时间(当您每天为报告编写大型数据集时,这一点非常重要)。这就是我所发现的,类似于很多关于pd.to_sql
的文章,包括Leblanc的:
import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
使用以下非默认参数的任意组合,会减慢为我的to_sql
执行的时间(再次与LeBlanc在他的博客中提到的内容一致):
chunksize=40
(40是我在2098年Server参数限制下可以传递52列的最大值),method='multi'
,parallel=True
)注意:我意识到,除了(或替换)传递chunksize=40
之外,我还可以遍历我的33个dask数据帧分区,并分别处理每个块to_sql
。这将是更有效的内存,也可能是更快。一个分区花费45秒到1分钟,同时对所有分区执行整个dask数据帧需要>1小时。如果速度更快,我将尝试遍历所有分区并发布更新。一个小时似乎很长,但是当我试图和熊猫一起计算时,我感到完全被堵住了,因为熊猫花了一整晚的时间,或者内存耗尽了,所以这是一个进步。老实说,我对此感到非常高兴--我很可能现在就用pyinstaller
构建一个pyinstaller
,并让.exe每天运行一次,这样就可以完全自动化,从那里出发,但我认为这对其他人会有帮助,因为在过去的几周里,我一直在努力解决各种问题。
发布于 2020-06-18 19:37:55
我测试了在分区中通过循环方式将数据写入SQL Server的测试,而不是一次性编写,完成所有操作的时间类似于一次性编写所有内容。
import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
# From my question, I have replaced the commented out line of code with everything below that to see if there was a significant increase in speed. There was not. It was about the same as the cod in the question.
# ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
i = 0
for i in range(ddf.npartitions):
partition = ddf.get_partition(i)
if i == 0:
partition.to_sql('CDR_PBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
if i > 0:
partition.to_sql('CDR_PBI_Report', uri=to_sql_uri, if_exists='append', index=False)
i += 1
发布于 2020-12-10 16:09:11
选择将dask数据作为分区插入不应该加快插入过程所需的总时间。
每次调用insert
时,无论是否存在要插入的分区或整个数据,都会调用.compute()
方法从内存中提取数据并使用它,并且无法通过此方法对其进行优化。我真的怀疑这对于提取分区是必要的,我认为to_sql()
dask已经使用了这种方法。
https://stackoverflow.com/questions/62404502
复制相似问题