嗨,我们正试图并行化一个巨大的选择,把它切成更小的选择。dataset有一个“段”列,因此我们使用它来划分select。我们的目标是一个PosgreSQL数据库。不幸的是,我们没有观察到性能的好处,换句话说,性能与我们使用的线程成线性增长。
我们能够将观察结果隔离到一个合成测试用例中。我们模拟每个从generate_series查询中获取的多个获取(11)。
我们使用一个连接,每个连接按顺序运行,或者11个连接并行运行。
我们没有观察到任何表现上的好处。
相反,如果我们只是将fetch模拟为1行fetch,阻塞了5秒(QUERY1),那么我们就有了预期的性能效益。
我们用来并行化的主要代码。
def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
print(f"TASK is {segment}")
sql_query = config.QUERY2
with the_conn_pool.getconn() as conn:
conn.set_session(readonly=True, autocommit=True)
start = default_timer()
with conn.cursor() as curs:
curs.execute(sql_query)
data = curs.fetchall()
end = default_timer()
print(f'DB to retrieve {segment} took : {end - start:.5f}')
the_conn_pool.putconn(conn)
return data
def get_sales(the_conn_pool) -> pd.DataFrame:
tasks : Dict = {}
start = default_timer()
with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
for segment in range(0, config.SEGMENTS_NO):
task = executor.submit(pandas_per_segment,
the_conn_pool = the_conn_pool,
segment=segment)
tasks[task] = segment
end = default_timer()
print(f'Consumed : {end-start:.5f}')
start = default_timer()
master_list = [task.result() or task in tasks]
result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
end = default_timer()
print(f'Chained : {end - start:.5f}')
return result通过直接从CSV获取,我们也看到了同样的性能好处。
其理论是,Python中的套接字/线程/大数据获取不能很好地发挥作用。
这是正确的吗?我们是不是做错了什么。
在Big x64上进行测试,Python3.9.6,Postgresql 13,附加的其余代码
我们的码头-合成文件
version: '2'
services:
database:
container_name:
posgres
image: 'docker.io/bitnami/postgresql:latest'
ports:
- '5432:5432'
volumes:
- 'postgresql_data:/bitnami/postgresql'
environment:
- POSTGRESQL_USERNAME=my_user
- POSTGRESQL_PASSWORD=password123
- POSTGRESQL_DATABASE=mn_dataset
networks:
- pgtapper
volumes:
postgresql_data:
driver: local
networks:
pgtapper:
driver: bridgeconfig.py文件
TASKS = 1
SEGMENTS_NO = 11
HOST='localhost'
PORT=5432
DBNAME='mn_dataset'
USER='my_user'
PASSWORD='password123'
# PORT=15433
# DBNAME='newron'
# USER='flyway'
# PASSWORD='8P87PE8HKuvjQaAP'
CONNECT_TIMEOUT=600
QUERY1 = '''
select
123456789 as item_id,
'm$$$' as brand_name,
true as is_exclusive,
0.409 as units,
0.567 as revenue,
0.999 as abs_price,
'aaaa' as segment,
TRUE as matches_filter
from (select pg_sleep(5)) xxx
'''
QUERY3 = '''
select * from t1 LIMIT 10000
'''
QUERY2 = '''
select
123456789 as item_id,
'm$$$' as brand_name,
true as is_exclusive,
0.409 as units,
0.567 as revenue,
0.999 as abs_price,
'aaaa' as segment,
TRUE as matches_filter
from generate_series(1, 10000)
'''
MYSQL_QUERY = '''
select
123456789 as item_id,
'm$$$' as brand_name,
true as is_exclusive,
0.409 as units,
0.567 as revenue,
0.999 as abs_price,
'aaaa' as segment,
TRUE as matches_filter
from t1
limit 10000
'''以及我们的全部例子
# This is a sample Python script.
# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import itertools
from psycopg2.pool import ThreadedConnectionPool
from concurrent import futures
from timeit import default_timer
from typing import Dict, List, Tuple
import config
import pandas as pd
def pandas_per_segment(the_conn_pool, segment)-> List[Tuple]:
print(f"TASK is {segment}")
sql_query = config.QUERY2
with the_conn_pool.getconn() as conn:
conn.set_session(readonly=True, autocommit=True)
start = default_timer()
with conn.cursor() as curs:
curs.execute(sql_query)
data = curs.fetchall()
end = default_timer()
print(f'DB to retrieve {segment} took : {end - start:.5f}')
the_conn_pool.putconn(conn)
return data
def get_sales(the_conn_pool) -> pd.DataFrame:
tasks : Dict = {}
start = default_timer()
with futures.ThreadPoolExecutor(max_workers=config.TASKS) as executor:
for segment in range(0, config.SEGMENTS_NO):
task = executor.submit(pandas_per_segment,
the_conn_pool = the_conn_pool,
segment=segment)
tasks[task] = segment
end = default_timer()
print(f'Consumed : {end-start:.5f}')
start = default_timer()
master_list = [task.result() or task in tasks]
result = pd.DataFrame(itertools.chain(*master_list), columns=['item_id', 'brand_name', 'is_exclusive', 'units', 'revenue', 'abs_price', 'segment', 'matches_filter'])
end = default_timer()
print(f'Chained : {end - start:.5f}')
return result
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
connection_pool = ThreadedConnectionPool(
minconn=config.TASKS,
maxconn=config.TASKS,
host=config.HOST,
port=config.PORT,
dbname=config.DBNAME,
user=config.USER,
password=config.PASSWORD,
connect_timeout=config.CONNECT_TIMEOUT
)
get_sales(connection_pool)
# See PyCharm help at https://www.jetbrains.com/help/pycharm/发布于 2022-07-14 07:57:59
在对psycopg3和asyncpg进行了一些测试之后,我确定了连接器-x库。有一个包含三列(时间:时间戳、变量:text和value:double)和300万行的PostgreSQL表,下面的代码将在2秒内以pandas格式返回数据:
from time import time
import connectorx as cx
print("Fetch start...")
start = time()
df = cx.read_sql(
"postgresql://postgres:password@localhost:5432/test",
"SELECT * FROM variables")
end = time()
print(f"Fetched {len(df.index)} records in {end - start} ")在1.2秒内将分区/并行化为4个线程:
df = cx.read_sql(
"postgresql://postgres:password@localhost:5432/test",
[
"SELECT * FROM variables WHERE time <= '2021-03-30 23:59:59'",
"SELECT * FROM variables WHERE time > '2021-03-30 23:59:59' AND time <= '2021-06-30 23:59:59'",
"SELECT * FROM variables WHERE time > '2021-06-30 23:59:59' AND time <= '2021-09-30 23:59:59'",
"SELECT * FROM variables WHERE time > '2021-09-30 23:59:59'"
])相同的代码以arrow2格式以子秒格式返回数据:
df = cx.read_sql(
"postgresql://postgres:password@localhost:5432/test",
[
"SELECT * FROM variables WHERE time <= '2021-03-30 23:59:59'",
"SELECT * FROM variables WHERE time > '2021-03-30 23:59:59' AND time <= '2021-06-30 23:59:59'",
"SELECT * FROM variables WHERE time > '2021-06-30 23:59:59' AND time <= '2021-09-30 23:59:59'",
"SELECT * FROM variables WHERE time > '2021-09-30 23:59:59'"
],
return_type="arrow2")https://stackoverflow.com/questions/68300268
复制相似问题