嗨,我们正试图并行化一个巨大的选择,把它切成更小的选择。dataset有一个“段”列,因此我们使用它来划分select。我们的目标是一个PosgreSQL数据库。不幸的是,我们没有观察到性能的好处,换句话说,性能与我们使用的线程成线性增长。
我们能够将观察结果隔离到一个合成测试用例中。我们模拟每个从generate_series查询中获取的多个获取(11)。
我们使用一个连接,每个连接按顺序运行,或者11个连接并行运行。
我们没有观察到任何表现上的好处。
相反,如果我们只是将fetch模拟为1行fetch,阻塞了5秒(QUERY1),那么我们就有了预期的性能效益。
我们用来并行化的主要代码。
def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
    print(f"TASK is {segment}")
    sql_query = config.QUERY2
    with the_conn_pool.getconn() as conn:
        conn.set_session(readonly=True, autocommit=True)
        start = default_timer()
        with conn.cursor() as curs:
            curs.execute(sql_query)
            data = curs.fetchall()
        end = default_timer()
        print(f'DB to retrieve {segment} took : {end - start:.5f}')
    the_conn_pool.putconn(conn)
    return data
def get_sales(the_conn_pool) -> pd.DataFrame:
    tasks : Dict = {}
    start = default_timer()
    with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
        for segment in range(0, config.SEGMENTS_NO):
            task = executor.submit(pandas_per_segment,
                            the_conn_pool = the_conn_pool,
                            segment=segment)
            tasks[task] = segment
    end = default_timer()
    print(f'Consumed : {end-start:.5f}')
    start = default_timer()
    master_list = [task.result() or task in tasks]
    result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
    end = default_timer()
    print(f'Chained : {end - start:.5f}')
    return result通过直接从CSV获取,我们也看到了同样的性能好处。
其理论是,Python中的套接字/线程/大数据获取不能很好地发挥作用。
这是正确的吗?我们是不是做错了什么。
在Big x64上进行测试,Python3.9.6,Postgresql 13,附加的其余代码
我们的码头-合成文件
version: '2'
services:
  database:
    container_name:
      posgres
    image: 'docker.io/bitnami/postgresql:latest'
    ports:
      - '5432:5432'
    volumes:
      - 'postgresql_data:/bitnami/postgresql'
    environment:
      - POSTGRESQL_USERNAME=my_user
      - POSTGRESQL_PASSWORD=password123
      - POSTGRESQL_DATABASE=mn_dataset
    networks:
      - pgtapper
volumes:
  postgresql_data:
    driver: local
networks:
  pgtapper:
    driver: bridgeconfig.py文件
TASKS = 1
SEGMENTS_NO = 11
HOST='localhost'
PORT=5432
DBNAME='mn_dataset'
USER='my_user'
PASSWORD='password123'
# PORT=15433
# DBNAME='newron'
# USER='flyway'
# PASSWORD='8P87PE8HKuvjQaAP'
CONNECT_TIMEOUT=600
QUERY1 = '''
select 
    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter
from (select pg_sleep(5)) xxx
'''
QUERY3 = '''
 select * from t1 LIMIT 10000
'''
QUERY2 = '''
 select 
    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter
from generate_series(1, 10000)
'''
MYSQL_QUERY = '''
select 
    123456789 as item_id,
    'm$$$' as brand_name,
    true as is_exclusive,
    0.409 as units,
    0.567 as revenue,
    0.999 as abs_price,
    'aaaa' as segment,
    TRUE as matches_filter
from t1
limit 10000
'''以及我们的全部例子
# This is a sample Python script.
# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import itertools
from psycopg2.pool import ThreadedConnectionPool
from concurrent import futures
from timeit import default_timer
from typing import Dict, List, Tuple
import config
import pandas as pd
def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
    print(f"TASK is {segment}")
    sql_query = config.QUERY2
    with the_conn_pool.getconn() as conn:
        conn.set_session(readonly=True, autocommit=True)
        start = default_timer()
        with conn.cursor() as curs:
            curs.execute(sql_query)
            data = curs.fetchall()
        end = default_timer()
        print(f'DB to retrieve {segment} took : {end - start:.5f}')
    the_conn_pool.putconn(conn)
    return data
def get_sales(the_conn_pool) -> pd.DataFrame:
    tasks : Dict = {}
    start = default_timer()
    with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
        for segment in range(0, config.SEGMENTS_NO):
            task = executor.submit(pandas_per_segment,
                            the_conn_pool = the_conn_pool,
                            segment=segment)
            tasks[task] = segment
    end = default_timer()
    print(f'Consumed : {end-start:.5f}')
    start = default_timer()
    master_list = [task.result() or task in tasks]
    result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
    end = default_timer()
    print(f'Chained : {end - start:.5f}')
    return result
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
    connection_pool = ThreadedConnectionPool(
        minconn=config.TASKS,
        maxconn=config.TASKS,
        host=config.HOST,
        port=config.PORT,
        dbname=config.DBNAME,
        user=config.USER,
        password=config.PASSWORD,
        connect_timeout=config.CONNECT_TIMEOUT
    )
    get_sales(connection_pool)
# See PyCharm help at https://www.jetbrains.com/help/pycharm/发布于 2022-07-14 07:57:59
在对psycopg3和asyncpg进行了一些测试之后,我确定了连接器-x库。有一个包含三列(时间:时间戳、变量:text和value:double)和300万行的PostgreSQL表,下面的代码将在2秒内以pandas格式返回数据:
from time import time
import connectorx as cx
print("Fetch start...")
start = time()
df = cx.read_sql(
    "postgresql://postgres:password@localhost:5432/test",
    "SELECT * FROM variables")
end = time()
print(f"Fetched {len(df.index)} records in {end - start} ")在1.2秒内将分区/并行化为4个线程:
df = cx.read_sql(
"postgresql://postgres:password@localhost:5432/test",
[
    "SELECT * FROM variables WHERE time <= '2021-03-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-03-30 23:59:59' AND time <= '2021-06-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-06-30 23:59:59' AND time <= '2021-09-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-09-30 23:59:59'"
])相同的代码以arrow2格式以子秒格式返回数据:
df = cx.read_sql(
"postgresql://postgres:password@localhost:5432/test",
[
    "SELECT * FROM variables WHERE time <= '2021-03-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-03-30 23:59:59' AND time <= '2021-06-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-06-30 23:59:59' AND time <= '2021-09-30 23:59:59'",
    "SELECT * FROM variables WHERE time > '2021-09-30 23:59:59'"
],
return_type="arrow2")https://stackoverflow.com/questions/68300268
复制相似问题