我想使用Postgres数据库中的一个表作为输入文档的存储(将有数十亿)。文档正在不断添加(使用"UPSERT“逻辑以避免重复),很少从表中删除。
将会有多个辅助应用程序,这些应用程序应该不断地从这个表读取数据,从第一个插入的行到最新的,然后在插入时轮询新的行,每一行准确读取一次。此外,当工人的处理算法改变时,所有数据都应该从第一行重新读取。每个应用程序都应该能够独立于其他应用程序来维护自己的行处理过程。
我正在寻找一种跟踪最后处理过的行的方法,以便能够随时暂停并继续轮询。
我可以想到这些选择:
使用自动增量字段
然后将上一个处理过的行的自动增量字段值存储在某个地方,以便在下一个查询中使用它,如下所示:
SELECT * FROM document WHERE id > :last_processed_id LIMIT 100;
但经过一些研究后,我发现在并发环境中,具有较低的自动增量值的行可能会比具有较高的值的行晚于客户端可见,因此可以跳过一些行。
使用时间戳字段
此选项的问题是时间戳不是唯一的,而且在高插入率期间可能会重叠,这再次导致行跳过。此外,调整系统时间(手动或通过NTP)可能导致无法预测的结果。
向每一行添加一个进程完成标志
这是我能想到的唯一真正可靠的方法,但也有缺点,包括需要在处理后更新每一行,以及为每个应用程序存储完成标志字段所需的额外存储空间,以及运行新应用程序可能需要更改DB模式。这对我来说是最后的手段,如果有更优雅的方法来做,我想避免它。
我知道,任务定义要求我使用Kafka来完成这个任务,但问题是它不允许从主题中删除单个消息,我需要这个功能。保存一个卡夫卡记录的外部列表,在处理过程中应该跳过,对我来说,感觉非常笨拙和低效。此外,与卡夫卡的实时去重复也需要一些外部存储。
我想知道是否有其他更有效的方法来解决这个问题,使用Postgres。
发布于 2022-07-22 17:12:28
最后,我保存了每一行的事务id,然后选择txid值低于最小id的记录,如下所示:
SELECT * FROM document
WHERE ((txid = :last_processed_txid AND id > :last_processed_id) OR txid > :last_processed_txid)
AND txid < pg_snapshot_xmin(pg_current_snapshot())
ORDER BY txid, id
LIMIT 100
这样,即使在事务#1之后启动的事务#2比第一个事务完成得更快,它所写入的行在事务#1完成之前也不会被使用者读取。
xid8值严格单调增加,不能在数据库集群的生存期内重用。
所以它应该适合我的情况。
这个解决方案并不是那么节省空间,因为必须在每一行中保存一个额外的8字节txid字段,并且应该为txid字段创建一个索引,但是与这里的其他方法相比,这里的主要好处是:
https://stackoverflow.com/questions/73042180
复制相似问题