首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Python中对Postgresql的并行选择

Python中对Postgresql的并行选择
EN

Stack Overflow用户
提问于 2021-07-08 10:54:31
回答 1查看 575关注 0票数 0

嗨,我们正试图并行化一个巨大的选择,把它切成更小的选择。dataset有一个“段”列,因此我们使用它来划分select。我们的目标是一个PosgreSQL数据库。不幸的是,我们没有观察到性能的好处,换句话说,性能与我们使用的线程成线性增长。

我们能够将观察结果隔离到一个合成测试用例中。我们模拟每个从generate_series查询中获取的多个获取(11)。

我们使用一个连接,每个连接按顺序运行,或者11个连接并行运行。

我们没有观察到任何表现上的好处。

相反,如果我们只是将fetch模拟为1行fetch,阻塞了5秒(QUERY1),那么我们就有了预期的性能效益。

我们用来并行化的主要代码。

代码语言:javascript
运行
复制
def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
    print(f"TASK is {segment}")
    sql_query = config.QUERY2
    with the_conn_pool.getconn() as conn:
        conn.set_session(readonly=True, autocommit=True)
        start = default_timer()
        with conn.cursor() as curs:
            curs.execute(sql_query)
            data = curs.fetchall()
        end = default_timer()
        print(f'DB to retrieve {segment} took : {end - start:.5f}')
    the_conn_pool.putconn(conn)
    return data

def get_sales(the_conn_pool) -> pd.DataFrame:
    tasks : Dict = {}
    start = default_timer()
    with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
        for segment in range(0, config.SEGMENTS_NO):
            task = executor.submit(pandas_per_segment,
                            the_conn_pool = the_conn_pool,
                            segment=segment)
            tasks[task] = segment
    end = default_timer()
    print(f'Consumed : {end-start:.5f}')
    start = default_timer()
    master_list = [task.result() or task in tasks]
    result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
    end = default_timer()
    print(f'Chained : {end - start:.5f}')
    return result

通过直接从CSV获取,我们也看到了同样的性能好处。

其理论是,Python中的套接字/线程/大数据获取不能很好地发挥作用。

这是正确的吗?我们是不是做错了什么。

在Big x64上进行测试,Python3.9.6,Postgresql 13,附加的其余代码

我们的码头-合成文件

代码语言:javascript
运行
复制
version: '2'

services:

  database:
    container_name:
      posgres
    image: 'docker.io/bitnami/postgresql:latest'
    ports:
      - '5432:5432'
    volumes:
      - 'postgresql_data:/bitnami/postgresql'
    environment:
      - POSTGRESQL_USERNAME=my_user
      - POSTGRESQL_PASSWORD=password123
      - POSTGRESQL_DATABASE=mn_dataset
    networks:
      - pgtapper

volumes:
  postgresql_data:
    driver: local

networks:
  pgtapper:
    driver: bridge

config.py文件

代码语言:javascript
运行
复制
TASKS = 1
SEGMENTS_NO = 11
HOST='localhost'

PORT=5432
DBNAME='mn_dataset'
USER='my_user'
PASSWORD='password123'


# PORT=15433
# DBNAME='newron'
# USER='flyway'
# PASSWORD='8P87PE8HKuvjQaAP'


CONNECT_TIMEOUT=600

QUERY1 = '''
select 

    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter

from (select pg_sleep(5)) xxx
'''

QUERY3 = '''
 select * from t1 LIMIT 10000
'''

QUERY2 = '''
 select 

    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter

from generate_series(1, 10000)
'''


MYSQL_QUERY = '''
select 

    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter

from t1
limit 10000
'''

以及我们的全部例子

代码语言:javascript
运行
复制
# This is a sample Python script.

# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import itertools

from psycopg2.pool import ThreadedConnectionPool

from concurrent import futures
from timeit import default_timer
from typing import Dict, List, Tuple
import config
import pandas as pd

def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
    print(f"TASK is {segment}")
    sql_query = config.QUERY2
    with the_conn_pool.getconn() as conn:
        conn.set_session(readonly=True, autocommit=True)
        start = default_timer()
        with conn.cursor() as curs:
            curs.execute(sql_query)
            data = curs.fetchall()
        end = default_timer()
        print(f'DB to retrieve {segment} took : {end - start:.5f}')
    the_conn_pool.putconn(conn)
    return data

def get_sales(the_conn_pool) -> pd.DataFrame:
    tasks : Dict = {}
    start = default_timer()
    with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
        for segment in range(0, config.SEGMENTS_NO):
            task = executor.submit(pandas_per_segment,
                            the_conn_pool = the_conn_pool,
                            segment=segment)
            tasks[task] = segment
    end = default_timer()
    print(f'Consumed : {end-start:.5f}')
    start = default_timer()
    master_list = [task.result() or task in tasks]
    result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
    end = default_timer()
    print(f'Chained : {end - start:.5f}')
    return result

# Press the green button in the gutter to run the script.
if __name__ == '__main__':
    connection_pool = ThreadedConnectionPool(
        minconn=config.TASKS,
        maxconn=config.TASKS,
        host=config.HOST,
        port=config.PORT,
        dbname=config.DBNAME,
        user=config.USER,
        password=config.PASSWORD,
        connect_timeout=config.CONNECT_TIMEOUT
    )
    get_sales(connection_pool)

# See PyCharm help at https://www.jetbrains.com/help/pycharm/
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-14 07:57:59

在对psycopg3asyncpg进行了一些测试之后,我确定了连接器-x库。有一个包含三列(时间:时间戳、变量:text和value:double)和300万行的PostgreSQL表,下面的代码将在2秒内以pandas格式返回数据:

代码语言:javascript
运行
复制
from time import time
import connectorx as cx

print("Fetch start...")
start = time()
df = cx.read_sql(
    "postgresql://postgres:password@localhost:5432/test",
    "SELECT * FROM variables")
end = time()

print(f"Fetched {len(df.index)} records in {end - start} ")

在1.2秒内将分区/并行化为4个线程:

代码语言:javascript
运行
复制
df = cx.read_sql(
"postgresql://postgres:password@localhost:5432/test",
[
    "SELECT * FROM variables WHERE time <= '2021-03-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-03-30 23:59:59' AND time <= '2021-06-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-06-30 23:59:59' AND time <= '2021-09-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-09-30 23:59:59'"
])

相同的代码以arrow2格式以子秒格式返回数据:

代码语言:javascript
运行
复制
df = cx.read_sql(
"postgresql://postgres:password@localhost:5432/test",
[
    "SELECT * FROM variables WHERE time <= '2021-03-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-03-30 23:59:59' AND time <= '2021-06-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-06-30 23:59:59' AND time <= '2021-09-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-09-30 23:59:59'"
],
return_type="arrow2")
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68300268

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档