首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用批处理的不完全Server更改表提取

使用批处理的不完全Server更改表提取
EN

Stack Overflow用户
提问于 2022-03-25 15:00:23
回答 1查看 307关注 0票数 0

基本问题:我有一个从CDC表格中提取记录的过程,这个表是“缺失”记录。

我正在从MS 2019 (数据中心Ed) DB中提取,在67个表上启用CDC。其中一个表容纳3.23亿行,宽约125列。在夜间处理期间,这些行中大约有1200万行被更新,因此在_CT表中生成了大约2000万行。在这个夜间过程中,CDC捕获仍然使用默认设置运行。它可以“躲在后面”,但我们检查一下。

在夜间过程完成后,我有一个Python3.6提取器,它使用ODBC连接到SQL服务器。我有一个循环,遍历67个源表中的每一个。在循环开始之前,我确保CDC捕捉到的信息被“捕获”了。

对于每个表,提取器首先从目标数据库读取最后一个成功加载的LSN,该数据库位于雪花中。

Python -表名、上次加载的LSN和表PKEY --用于获取表的当前MAX_LSN:

代码语言:javascript
运行
复制
def get_incr_count(self, table_name, pk, last_loaded_lsn):
    try:
        cdc_table_name = self.get_cdc_table(table_name)
        max_lsn = self.get_max_lsn(table_name)

        incr_count_query = """with incr as
        (
            select
                row_number() over
                    (
                        partition by """ + pk + """
                        order by
                            __$start_lsn desc,
                            __$seqval desc
                    ) as __$rn,
                *
            from """ + cdc_table_name + """
            where
                __$operation <> 3 and
                __$start_lsn > """ + last_loaded_lsn + """ and
                __$start_lsn <= """ + max_lsn + """
        )
        select COUNT(1) as count from incr where __$rn = 1 ;
        """

        lsn_df = pd.read_sql_query(incr_count_query, self.cnxn)
        incr_count = lsn_df['count'][0]

        return incr_count

    except Exception as e:
        raise Exception('Could not get the count of the incremental load for ' + table_name + ': ' + str(e))

如果该查询找到要处理的记录,则将运行此函数。每次提取50万条记录的限制是运行此代码的虚拟机上的内存限制。超过这个数量将使可用内存最大化。

代码语言:javascript
运行
复制
def get_cdc_data(self, table_name, pk, last_loaded_lsn, offset_iterator=0, fetch_count=500000):

    try: 
        cdc_table_name = self.get_cdc_table(table_name)
        max_lsn = self.get_max_lsn(table_name)

        #Get the lasst LSN loaded from the ODS.LOG_CDC table for the current table
        last_lsn = last_loaded_lsn

        incremental_pull_query = """with incr as
        (
            select
                row_number() over
                    (
                        partition by """ + pk + """
                        order by
                            __$start_lsn desc,
                            __$seqval desc
                    ) as __$rn,
                *
            from """ + cdc_table_name + """
            where
                __$operation <> 3 and
                __$start_lsn > """ + last_lsn + """ and
                __$start_lsn <= """ + max_lsn + """
        )
        select CONVERT(VARCHAR(max), __$start_lsn, 1) as __$conv_lsn, * 
        from incr where __$rn = 1 
        order by __$conv_lsn
        offset """ + str(offset_iterator) + """ rows
        fetch first """ + str(fetch_count) + """ rows only;
        """
        # Load the incremental data into a dataframe, df, using the SQL Server connection and the incremental query
        full_df = pd.read_sql_query(incremental_pull_query, self.cnxn)

        # Trim all cdc columns except __$operation
        df = full_df.drop(['__$conv_lsn', '__$rn', '__$start_lsn', '__$end_lsn', '__$seqval', '__$update_mask', '__$command_id'], axis=1)

        return df


    except Exception as e:
        raise Exception('Could not get the incremental load dataframe for ' + table_name + ': ' + str(e))

然后将该文件移动到雪花中,并合并到一个表中。如果每个导入循环都成功,则更新目标db中的最大LSN以设置下一个起点。如果有任何失败,我们留下最大值,然后重试下一关。在下面的场景中,没有识别的错误。

我们正在发现证据表明,第二个查询在开始和最大LSN之间循环时并没有提取所有有效的记录。没有丢失记录的可分辨模式,除非遗漏了一个LSN,否则丢失了内部的所有更改。

我认为这可能与我们如何订购记录有关:由__$conv_lsn订购。此值被转换为VARCHAR(最大)...so,我想知道是否应该尝试使用更可靠的密钥进行排序。我想不出一种不为这个过程增加额外工作的审计方法,因为这个过程对时间非常敏感。这确实使故障排除变得更加困难。

EN

回答 1

Stack Overflow用户

发布于 2022-03-25 17:23:26

我怀疑你的问题就在这里。

代码语言:javascript
运行
复制
row_number() over
(
    partition by """ + pk + """
    order by
        __$start_lsn desc,
        __$seqval desc
) as __$rn,
...
from incr where __$rn = 1

如果一个给定的事务影响到多个行,它们将被枚举为1-N。即使这是一个小手摇;我不知道如果一个行在一个事务中不止一次受到影响(我需要设置一个测试和.好吧..。我很懒)。

但所有这些都让我觉得这个工作流很奇怪。我过去曾与疾控中心合作过,虽然我承认我并没有针对雪花,但提取部分应该是相似的,而且相当简单。

  1. 使用sys.fn_cdc_get_max_lsn();获取最大LSN (即不需要查询CDC数据本身以获得此值):使用LSN端点从cdc.fn_cdc_get_all_changes_«capture_instance»()cdc.fn_cdc_get_net_changes_«capture_ instance»()选择(从该表的上一次运行开始,或者从第一次运行sys.fn_cdc_get_min_lsn(«capture_ instance»);从上面的最大值)

H 110将结果流到任何地方(也就是说,您不应该同时在内存中保存大量更改记录)。H 211G 212

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71619058

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档