首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >在read_sql_table之后无法持久化dask数据帧

在read_sql_table之后无法持久化dask数据帧
EN

Stack Overflow用户
提问于 2018-01-02 06:07:00
回答 1查看 1.4K关注 0票数 0

我正在尝试将数据库表读入dask数据帧,然后持久化该数据帧。我尝试了一些变体,它们要么导致内存不足,要么导致错误。

我使用的是一台8 GB内存的Windows 10笔记本电脑。当我试图读取大型MySQL或Oracle数据库表时,问题就开始了。我可以用SQLite重现这个问题。

下面是设置700MB SQLite表以重现该问题的代码。(请原谅python代码中的任何笨拙--我已经做了10年的SAS数据分析师。我正在寻找一种更便宜的替代品,所以我对python、numpy、pandas和dask都是新手。请注意,SAS可以读取SQLite表,将其写入磁盘,并在90秒内创建索引,而无需锁定笔记本电脑。)

代码语言:javascript
复制
import numpy as np
from sqlalchemy import create_engine, MetaData, Table, Column, Integer
import sqlite3

# function to create a list of dicts with chunkSize rows by 3columns
# except for the first column, the columns are filled with random integers

def getChunk(chunkSize, prevCount):
    x = np.random.randint(low=0, high=2**32, size=(chunkSize,3), dtype='int64')
    y = x.ravel().view(dtype=[('a', 'i8'), ('b', 'i8'), ('c', 'i8')])
    y['a'] = [k + prevCount for k in range(chunkSize)]
    names = y.dtype.names
    listOfDicts = [dict(zip(names, row)) for row in y] 
    return listOfDicts

# set up a SQLAlchemy engine to a sqlite DB

dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
engine = create_engine(connString)

# create a table with 3 Integer columns

metadata = MetaData()
testTable = Table('testTbl', metadata,
                  Column('a', Integer, primary_key='True'),
                  Column('b', Integer),
                  Column('c', Integer)
                 )

metadata.create_all(engine)
conn = engine.connect()

chunkSize = 25000
numberChunks = 1400

sqlite3.register_adapter(np.int64, lambda x: int(x))

# use the SQLAlchemy table insert method to load list of dicts into the table, one chunk at a time
prevCount = 0

with conn.begin():
    for i in range(0, numberChunks) :
        listOfDicts = getChunk(chunkSize, prevCount)
        conn.execute(testTable.insert(), listOfDicts)
        prevCount = prevCount + chunkSize

conn.close()

我已经在dask调度程序上尝试了4种变体:

此变体导致错误,工人被杀死。

每个变体的代码如下所示。我怎么才能让它工作呢?

1.

代码语言:javascript
复制
# default scheduler -- OOM
import dask.dataframe as ddf
from dask.distributed import Client
import dask
import chest

cache = chest.Chest(path='c:\\temp2', available_memory=8e9)
dask.set_options(cache=cache)
dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
df = df.persist() 

  1. 本地分布式调度程序

异常是这样开始的:

代码语言:javascript
复制
>>> tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:57522, threads: 1>>
Traceback (most recent call last):
  File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 635, in wrapper
    return fun(self, *args, **kwargs)
  File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 821, in create_time
    return cext.proc_create_time(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Program Files\Python36\lib\site-packages\psutil\__init__.py", line 368, in _init
    self.create_time()
  File "C:\Program Files\Python36\lib\site-packages\psutil\__init__.py", line 699, in create_time
    self._create_time = self._proc.create_time()
  File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 640, in wrapper
    raise NoSuchProcess(self.pid, self._name)
psutil._exceptions.NoSuchProcess: psutil.NoSuchProcess process no longer exists (pid=14212)

  1. one process --面向对象模型

将dask.dataframe作为ddf从dask.distributed导入客户端导入dask导入胸部缓存= chest.Chest(path='c:\temp2',available_memory=8e9) dask.set_options(cache=cache,get=dask.get) client = Client(processes=False) dbPath = "C:\temp2\test.db“connString =”sqlite:/{}“.format(DbPath) df = ddf.read_sql_table('testTbl',connString,index_col = 'a') df = client.persist(df,testTbl dask-worker

一个命令行: c:>dask-scheduler --host 127.0.0.1

另一个命令行: c:>dask-worker 127.0.0.1:8786 --nprocs 1--nthread 1 --name worker-1 --memory-limit 3 3GB -local-directory c:\temp2

代码语言:javascript
复制
import dask.dataframe as ddf
from dask.distributed import Client
import dask
import chest
cache = chest.Chest(path='c:\\temp2', available_memory=8e9)
dask.set_options(cache=cache)
client = Client(address="127.0.0.1:8786")
dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
df = client.persist(df)

工人被一次又一次地用这些消息杀死:

代码语言:javascript
复制
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.12 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.16 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.24 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.31 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.39 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Worker is at 81% memory usage. Pausing worker.  Process memory: 2.46 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.47 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.54 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.61 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.66 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.73 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.81 GB -- Worker memory limit: 3.00 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Worker process 17916 was killed by signal 15
distributed.nanny - WARNING - Restarting worker
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-01-02 23:14:09

我不相信您在列'a'上有索引,这意味着在扫描表时,每个分区访问都可能使用sqlite中的大量内存。在任何情况下,pandas通过sqlalchemy访问数据库的内存效率都不是特别高,因此我对您在访问期间出现内存峰值并不感到非常惊讶。

但是,您可以增加能够访问数据的分区数量。例如:

代码语言:javascript
复制
df = ddf.read_sql_table('testTbl', connString, index_col = 'a', npartitions=20)

或者减少可用的线程/进程的数量,以便为每个线程提供更多的内存。

请注意,chest在这里对您没有任何帮助,它只能保存完成的结果,并且内存峰值发生在数据加载过程中(此外,分布式工作线程应该溢出到磁盘,而不显式提供缓存)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48053628

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档