TimescaleDB是否支持连续聚合视图的并发完全刷新?因为在文档中没有明确提到这一点。这类似于PostgreSQL实现了REFRESH MATERIALIZED VIEW CONCURRENTLY的观点。
我之所以问这个问题,是因为我需要刷新一些陈旧的数据,但是普通的refresh_continuous_aggregate()给了canceling statement due to statement timeout。
作为一种解决办法,我增加了语句超时,但这是一个停止间隙修复。
如果有更好的方法在连续聚合数据上完成“重建”(例如使用PSQL脚本或简单的迭代循环),我很高兴听到这些。
发布于 2022-08-20 19:06:15
最后我写了一个复习循环
refresh_continuous_aggregate()refresh_continuous_aggregate()都停留在SQL语句超时下。刷新不是并发的,尽管这很容易实现(假设TimescaleDB可以同时执行两个不重叠的refresh_continuous_aggregate() )。
下面是Python代码示例:
import datetime
from sqlalchemy import func
from sqlalchemy.engine import Connection, Engine
from sqlalchemy.orm import Session
from sqlalchemy.testing.schema import Table
from tqdm import tqdm
def get_hypertable_range(
connection: Connection,
hypertable_name: str,
timestamp_col="timestamp",
) -> Tuple[datetime.datetime, datetime.datetime]:
"""Get absolute first and last timestamps in a hypertable.
Tested on prod. Takes ~1 min per query.
:param timestamp_col:
Timescamp column name. Assume naive UNIX timestamp.
:return:
Tuple (first timestamp, last timestamp)
"""
sql = f"""SELECT * from "{hypertable_name}" order by {timestamp_col} limit 1;"""
first = connection.execute(sql).scalar()
sql = f"""SELECT * from "{hypertable_name}" order by {timestamp_col} DESC limit 1;"""
last = connection.execute(sql).scalar()
return first, last
def force_refresh_aggregate_iterative(
connection: Connection,
aggregate_name: str,
start: datetime.datetime,
end: datetime.datetime,
slice=datetime.timedelta(days=90),
):
"""Iteratively force refresh continuous aggregate view.
Assume the automated policy has missed data in the past, for reason or another
and we need to rebuild the whole continuous aggregate.
Call TimescaleDB `refresh_continuous_aggregate` in slices that
are small enough not to hit any PSQL statement timeout issues.
Display a progress bar using TQDM about the progress of the refresh.
:param slice:
How many days we force refresh once
"""
assert start
assert end
cursor = start
last_duration = None
longest_duration = datetime.timedelta(0)
days = (end - start).total_seconds() // (24*3600)
with tqdm(total=days) as progress_bar:
progress_bar.set_description(f"force_refresh_aggregate_iterative({aggregate_name})")
while cursor <= end:
# Add extra day to make sure we don't get odd missing hours somewhere
# We don't mind overflowing at the end, because refresh_continuous_aggregate()
# will just index empty days.
cursor_end = cursor + slice + datetime.timedelta(days=1)
# Convert for PSQL strings
slice_start = f"{cursor.year}-{cursor.month}-{cursor.day}"
slice_end = f"{cursor_end.year}-{cursor_end.month}-{cursor_end.day}"
logger.debug("force_refresh_aggregate_iterative() for aggregate %s: %s - %s, last refresh took %s",
aggregate_name,
slice_start,
slice_end,
last_duration or "-"
)
# Execute refresh_continuous_aggregate() and time it
start = datetime.datetime.utcnow()
connection.execution_options(isolation_level="AUTOCOMMIT")\
.execute(f"CALL refresh_continuous_aggregate('{aggregate_name}', '{slice_start}', '{slice_end}');")
last_duration = datetime.datetime.utcnow() - start
longest_duration = max(last_duration, longest_duration)
formatted_time = cursor.strftime("%d-%m-%Y")
progress_bar.set_postfix({
"Currently at": formatted_time,
"Last refresh": last_duration or "-",
"Longest refresh": longest_duration,
})
progress_bar.update(slice // (24*3600))
# Move to the next slice
cursor += slice
def force_refresh_aggregate_smart(
session_manager: SessionManager,
buckets: Tuple = (TimeBucket.m5, TimeBucket.m15, TimeBucket.h1, TimeBucket.h4, TimeBucket.d1, TimeBucket.d7, TimeBucket.d30,),
statement_timeout="180min",
):
"""Do refresh_continuous_aggregate() in smaller slices.
https://stackoverflow.com/questions/73388687/full-force-refresh-of-continous-aggregate-view-concurrently-in-timescaledb?noredirect=1#comment129605333_73388687
"""
with session_manager.connect() as connection:
engine: Engine = connection.engine
database_name = engine.url.database
logger.info(f"force_refresh_aggregate_smart() for %s, statement timeout is %s", database_name, statement_timeout)
start, end = get_hypertable_range(connection, "candle_ohlcvx")
connection.execution_options(isolation_level="AUTOCOMMIT").execute(
f"SET statement_timeout = '{statement_timeout}';")
for bucket in buckets:
for name in long_list_of_aggregate_names:
force_refresh_aggregate_iterative(
connection,
name,
start,
end
)https://stackoverflow.com/questions/73388687
复制相似问题