首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >TimescaleDB中连续聚集视图的全力刷新

TimescaleDB中连续聚集视图的全力刷新
EN

Stack Overflow用户
提问于 2022-08-17 12:42:37
回答 1查看 40关注 0票数 1

TimescaleDB是否支持连续聚合视图的并发完全刷新?因为在文档中没有明确提到这一点。这类似于PostgreSQL实现了REFRESH MATERIALIZED VIEW CONCURRENTLY的观点。

我之所以问这个问题,是因为我需要刷新一些陈旧的数据,但是普通的refresh_continuous_aggregate()给了canceling statement due to statement timeout

作为一种解决办法,我增加了语句超时,但这是一个停止间隙修复。

如果有更好的方法在连续聚合数据上完成“重建”(例如使用PSQL脚本或简单的迭代循环),我很高兴听到这些。

EN

Stack Overflow用户

回答已采纳

发布于 2022-08-20 19:06:15

最后我写了一个复习循环

  • 使用Python和SQLAlchemy
  • 计算超表的开始日期和结束日期。
  • 用固定大小的切片从开始到结束调用refresh_continuous_aggregate()
  • 提供了一个很好的TQDM进度条经验,它可以告诉您什么时候完成任务。
  • 每个单独的refresh_continuous_aggregate()都停留在SQL语句超时下。

刷新不是并发的,尽管这很容易实现(假设TimescaleDB可以同时执行两个不重叠的refresh_continuous_aggregate() )。

下面是Python代码示例:

代码语言:javascript
复制
import datetime
from sqlalchemy import func
from sqlalchemy.engine import Connection, Engine
from sqlalchemy.orm import Session
from sqlalchemy.testing.schema import Table
from tqdm import tqdm


def get_hypertable_range(
        connection: Connection,
        hypertable_name: str,
        timestamp_col="timestamp",
) -> Tuple[datetime.datetime, datetime.datetime]:
    """Get absolute first and last timestamps in a hypertable.

    Tested on prod. Takes ~1 min per query.

    :param timestamp_col:
        Timescamp column name. Assume naive UNIX timestamp.

    :return:
        Tuple (first timestamp, last timestamp)
    """

    sql = f"""SELECT * from "{hypertable_name}" order by {timestamp_col} limit 1;"""
    first = connection.execute(sql).scalar()

    sql = f"""SELECT * from "{hypertable_name}" order by {timestamp_col} DESC limit 1;"""
    last = connection.execute(sql).scalar()

    return first, last


def force_refresh_aggregate_iterative(
        connection: Connection,
        aggregate_name: str,
        start: datetime.datetime,
        end: datetime.datetime,
        slice=datetime.timedelta(days=90),
):
    """Iteratively force refresh continuous aggregate view.

    Assume the automated policy has missed data in the past, for reason or another
    and we need to rebuild the whole continuous aggregate.

    Call TimescaleDB `refresh_continuous_aggregate` in slices that
    are small enough not to hit any PSQL statement timeout issues.

    Display a progress bar using TQDM about the progress of the refresh.

    :param slice:
        How many days we force refresh once
    """

    assert start
    assert end

    cursor = start
    last_duration = None
    longest_duration = datetime.timedelta(0)

    days = (end - start).total_seconds() // (24*3600)

    with tqdm(total=days) as progress_bar:

        progress_bar.set_description(f"force_refresh_aggregate_iterative({aggregate_name})")

        while cursor <= end:
            # Add extra day to make sure we don't get odd missing hours somewhere
            # We don't mind overflowing at the end, because refresh_continuous_aggregate()
            # will just index empty days.
            cursor_end = cursor + slice + datetime.timedelta(days=1)
            # Convert for PSQL strings
            slice_start = f"{cursor.year}-{cursor.month}-{cursor.day}"
            slice_end = f"{cursor_end.year}-{cursor_end.month}-{cursor_end.day}"
            logger.debug("force_refresh_aggregate_iterative() for aggregate %s: %s - %s, last refresh took %s",
                         aggregate_name,
                         slice_start,
                         slice_end,
                         last_duration or "-"
                         )
            # Execute refresh_continuous_aggregate() and time it
            start = datetime.datetime.utcnow()
            connection.execution_options(isolation_level="AUTOCOMMIT")\
                .execute(f"CALL refresh_continuous_aggregate('{aggregate_name}', '{slice_start}', '{slice_end}');")

            last_duration = datetime.datetime.utcnow() - start
            longest_duration = max(last_duration, longest_duration)

            formatted_time = cursor.strftime("%d-%m-%Y")
            progress_bar.set_postfix({
                "Currently at": formatted_time,
                "Last refresh": last_duration or "-",
                "Longest refresh": longest_duration,
            })

            progress_bar.update(slice // (24*3600))

            # Move to the next slice
            cursor += slice


def force_refresh_aggregate_smart(
    session_manager: SessionManager,
    buckets: Tuple = (TimeBucket.m5, TimeBucket.m15, TimeBucket.h1, TimeBucket.h4, TimeBucket.d1, TimeBucket.d7, TimeBucket.d30,),
    statement_timeout="180min",
):
    """Do refresh_continuous_aggregate() in smaller slices.

    https://stackoverflow.com/questions/73388687/full-force-refresh-of-continous-aggregate-view-concurrently-in-timescaledb?noredirect=1#comment129605333_73388687
    """

    with session_manager.connect() as connection:

        engine: Engine = connection.engine
        database_name = engine.url.database

        logger.info(f"force_refresh_aggregate_smart() for %s, statement timeout is %s", database_name, statement_timeout)

        start, end = get_hypertable_range(connection, "candle_ohlcvx")

        connection.execution_options(isolation_level="AUTOCOMMIT").execute(
            f"SET statement_timeout = '{statement_timeout}';")

        for bucket in buckets:
            for name in long_list_of_aggregate_names:
                force_refresh_aggregate_iterative(
                    connection,
                    name,
                    start,
                    end
                )
票数 1
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73388687

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档