我有一个从Oracle加载大型表到Snowflake的用例。Oracle服务器位于远离Snowflake端点的位置,因此当通过假脱机脚本或cx_oracle加载大于12 GB的表(实际上是视图)时,我们确实会遇到连接问题。
为了测试和使用SessionPool,我在考虑使用最多4个线程的ThreadPoolExecutor。这样,我就可以为每个线程获得一个连接,这就是关键所在。因此,这意味着我必须为每个线程分批分发数据获取。我的问题是:我如何才能做到这一点?这样做是正确的:"Select * from table where rownum介于x和y之间“(不是这个语法,我know...but你明白我的意思),我应该依赖偏移量吗,...?
我的想法是每个线程获得一个select的“切片”,按批获取数据,并将数据成批写入csv,因为我宁愿使用小文件而不是大文件,将其发送到snowflake。
def query(start_off, pool):
start_conn = datetime.now()
con = pool.acquire()
end_conn = datetime.now()
print(f"Conn/Acquire time: {end_conn-start_conn}")
with con.cursor() as cur:
start_exec_ts = datetime.now()
cur.execute(QUERY, start_pos=start_off, end_pos=start_off+(OFFSET-1))
end_exec_ts = datetime.now()
rows = cur.fetchall()
end_fetch_ts = datetime.now()
total_exec_ts = end_exec_ts-start_exec_ts
total_fetch_ts = end_fetch_ts-end_exec_ts
print(f"Exec time : {total_exec_ts}")
print(f"Fetch time : {total_fetch_ts}")
print(f"Task executed {threading.current_thread().getName()}, {threading.get_ident()}")
return rows
def main():
pool = cx_Oracle.SessionPool(c.oracle_conn['oracle']['username'],
c.oracle_conn['oracle']['password'],
c.oracle_conn['oracle']['dsn'],
min=2, max=4, increment=1,
threaded=True,
getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT
)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(query, d, pool) for d in range(1,13,OFFSET)]
for future in as_completed(futures):
# process your records from each thread
print(repr(future.result()))
# process_records(future.result())
if __name__ == '__main__':
main()
另外,在查询函数中使用fetchMany,我如何发回结果,以便每次都能处理它们?
发布于 2021-09-20 14:37:06
如果要通过python脚本传输数据
您可以创建生产者->队列->使用者工作流来执行此操作
而消费者依赖于数据的ID
生产者
生产者获取数据的ID
将“一片ID”作为作业放入队列
使用者从队列中获取作业
获取带有id的数据(例如"select * from table where id in ...")
示例
这类概念的一个快速示例
import time
import threading
import queue
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
@dataclass
class Job:
ids: list
jobs = queue.Queue()
executor = ThreadPoolExecutor(max_workers=4)
fake_data = [i for i in range(0, 200)]
def consumer():
try:
job = jobs.get_nowait()
# select * from table where id in job.ids
# save the data
time.sleep(5)
print(f"done {job}")
except Exception as exc:
print(exc)
def fake_select(limit, offset):
if offset >= len(fake_data):
return None
return fake_data[offset:(offset+limit)]
def producer():
limit = 10
offset = 0
stop_fetch = False
while not stop_fetch:
# select id from table limit l offset o
ids = fake_select(limit, offset)
if ids is None:
stop_fetch = True
else:
job = Job(ids=ids)
jobs.put(job)
print(f"put {job}")
offset += limit
executor.submit(consumer)
time.sleep(0.2)
def main():
th = threading.Thread(target=producer)
th.start()
th.join()
while not jobs.empty():
time.sleep(1)
executor.shutdown(wait=True)
print("all jobs done")
if __name__ == "__main__":
main()
另外,
如果你想在消费者获取数据后做更多的操作
您可以在客户流中执行此操作
或者添加另一个队列和使用者来执行额外的操作
工作流程将如下所示
生产者->队列->获取和保存数据消费者->队列->消费者执行一些额外操作
https://stackoverflow.com/questions/68904608
复制相似问题