首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >从Postgres表轮询数据的可靠方法

从Postgres表轮询数据的可靠方法
EN

Stack Overflow用户
提问于 2022-07-19 18:55:38
回答 1查看 172关注 0票数 2

我想使用Postgres数据库中的一个表作为输入文档的存储(将有数十亿)。文档正在不断添加(使用"UPSERT“逻辑以避免重复),很少从表中删除。

将会有多个辅助应用程序,这些应用程序应该不断地从这个表读取数据,从第一个插入的行到最新的,然后在插入时轮询新的行,每一行准确读取一次。此外,当工人的处理算法改变时,所有数据都应该从第一行重新读取。每个应用程序都应该能够独立于其他应用程序来维护自己的行处理过程。

我正在寻找一种跟踪最后处理过的行的方法,以便能够随时暂停并继续轮询。

我可以想到这些选择:

使用自动增量字段

然后将上一个处理过的行的自动增量字段值存储在某个地方,以便在下一个查询中使用它,如下所示:

代码语言:javascript
运行
复制
SELECT * FROM document WHERE id > :last_processed_id LIMIT 100;

但经过一些研究后,我发现在并发环境中,具有较低的自动增量值的行可能会比具有较高的值的行晚于客户端可见,因此可以跳过一些行。

使用时间戳字段

此选项的问题是时间戳不是唯一的,而且在高插入率期间可能会重叠,这再次导致行跳过。此外,调整系统时间(手动或通过NTP)可能导致无法预测的结果。

向每一行添加一个进程完成标志

这是我能想到的唯一真正可靠的方法,但也有缺点,包括需要在处理后更新每一行,以及为每个应用程序存储完成标志字段所需的额外存储空间,以及运行新应用程序可能需要更改DB模式。这对我来说是最后的手段,如果有更优雅的方法来做,我想避免它。

我知道,任务定义要求我使用Kafka来完成这个任务,但问题是它不允许从主题中删除单个消息,我需要这个功能。保存一个卡夫卡记录的外部列表,在处理过程中应该跳过,对我来说,感觉非常笨拙和低效。此外,与卡夫卡的实时去重复也需要一些外部存储。

我想知道是否有其他更有效的方法来解决这个问题,使用Postgres。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-22 17:12:28

最后,我保存了每一行的事务id,然后选择txid值低于最小id的记录,如下所示:

代码语言:javascript
运行
复制
SELECT * FROM document
WHERE ((txid = :last_processed_txid AND id > :last_processed_id) OR txid > :last_processed_txid) 
  AND txid < pg_snapshot_xmin(pg_current_snapshot())
ORDER BY txid, id
LIMIT 100

这样,即使在事务#1之后启动的事务#2比第一个事务完成得更快,它所写入的行在事务#1完成之前也不会被使用者读取。

Postgres文档声明

xid8值严格单调增加,不能在数据库集群的生存期内重用。

所以它应该适合我的情况。

这个解决方案并不是那么节省空间,因为必须在每一行中保存一个额外的8字节txid字段,并且应该为txid字段创建一个索引,但是与这里的其他方法相比,这里的主要好处是:

  • DB模式在添加新使用者时保持不变。
  • 不需要更新将行标记为已处理行,使用者只应保留最后处理行的id和txid值。
  • 系统时钟漂移或调整不会导致行被跳过
  • 在多个生产者使用id插入行并使用预先分配的池生成时,为每行设置txid有助于按插入顺序查询数据(例如,生产者1立即插入id为1..100的行、生产者2-101.200等)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73042180

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档