首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >带有SessionPool的cx_oracle ThreadPoolExecutor,并为每个线程分配负载

带有SessionPool的cx_oracle ThreadPoolExecutor,并为每个线程分配负载
EN

Stack Overflow用户
提问于 2021-08-24 08:59:29
回答 1查看 216关注 0票数 1

我有一个从Oracle加载大型表到Snowflake的用例。Oracle服务器位于远离Snowflake端点的位置,因此当通过假脱机脚本或cx_oracle加载大于12 GB的表(实际上是视图)时,我们确实会遇到连接问题。

为了测试和使用SessionPool,我在考虑使用最多4个线程的ThreadPoolExecutor。这样,我就可以为每个线程获得一个连接,这就是关键所在。因此,这意味着我必须为每个线程分批分发数据获取。我的问题是:我如何才能做到这一点?这样做是正确的:"Select * from table where rownum介于x和y之间“(不是这个语法,我know...but你明白我的意思),我应该依赖偏移量吗,...?

我的想法是每个线程获得一个select的“切片”,按批获取数据,并将数据成批写入csv,因为我宁愿使用小文件而不是大文件,将其发送到snowflake。

代码语言:javascript
运行
复制
def query(start_off, pool):
    start_conn = datetime.now()
    con = pool.acquire()
    end_conn = datetime.now()
    print(f"Conn/Acquire time: {end_conn-start_conn}")

    with con.cursor() as cur:
        start_exec_ts = datetime.now()
        cur.execute(QUERY, start_pos=start_off, end_pos=start_off+(OFFSET-1))
        end_exec_ts = datetime.now()
        rows = cur.fetchall()
        end_fetch_ts = datetime.now()
        total_exec_ts = end_exec_ts-start_exec_ts
        total_fetch_ts = end_fetch_ts-end_exec_ts
        print(f"Exec time : {total_exec_ts}")
        print(f"Fetch time : {total_fetch_ts}")
        print(f"Task executed {threading.current_thread().getName()}, {threading.get_ident()}")
    return rows


def main():
    pool = cx_Oracle.SessionPool(c.oracle_conn['oracle']['username'],
                                 c.oracle_conn['oracle']['password'],
                                 c.oracle_conn['oracle']['dsn'],
                                 min=2, max=4, increment=1,
                                 threaded=True,
                                 getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT
                                 )

    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(query, d, pool) for d in range(1,13,OFFSET)]

        for future in as_completed(futures):
            # process your records from each thread
            print(repr(future.result()))
            # process_records(future.result())
if __name__ == '__main__':
    main()

另外,在查询函数中使用fetchMany,我如何发回结果,以便每次都能处理它们?

EN

回答 1

Stack Overflow用户

发布于 2021-09-20 14:37:06

如果要通过python脚本传输数据

您可以创建生产者->队列->使用者工作流来执行此操作

而消费者依赖于数据的ID

生产者

生产者获取数据的ID

将“一片ID”作为作业放入队列

使用者从队列中获取作业

获取带有id的数据(例如"select * from table where id in ...")

示例

这类概念的一个快速示例

代码语言:javascript
运行
复制
import time
import threading
import queue
from dataclasses import dataclass

from concurrent.futures import ThreadPoolExecutor

@dataclass
class Job:
    ids: list

jobs = queue.Queue()
executor = ThreadPoolExecutor(max_workers=4)
fake_data = [i for i in range(0, 200)]

def consumer():
    try:
        job = jobs.get_nowait()
        # select * from table where id in job.ids
        # save the data
        time.sleep(5)
        print(f"done {job}")
    except Exception as exc:
        print(exc)

def fake_select(limit, offset):
    if offset >= len(fake_data):
        return None
    return fake_data[offset:(offset+limit)]

def producer():
    limit = 10
    offset = 0
    stop_fetch = False
    while not stop_fetch:
        # select id from table limit l offset o
        ids = fake_select(limit, offset)
        if ids is None:
            stop_fetch = True
        else:
            job = Job(ids=ids)
            jobs.put(job)
            print(f"put {job}")
            offset += limit
            executor.submit(consumer)
        time.sleep(0.2)


def main():
    th = threading.Thread(target=producer)
    th.start()
    th.join()

    while not jobs.empty():
        time.sleep(1)

    executor.shutdown(wait=True)
    print("all jobs done")


if __name__ == "__main__":
    main()

另外,

如果你想在消费者获取数据后做更多的操作

您可以在客户流中执行此操作

或者添加另一个队列和使用者来执行额外的操作

工作流程将如下所示

生产者->队列->获取和保存数据消费者->队列->消费者执行一些额外操作

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68904608

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档