我在建造一条完善的管道时遇到了困难。假设我有一个文件,称之为streamA.py和streamB.py。这两个文件的目的是连续24/7地流数据,并每500条记录流一次就将数据推入红流。
我创建了另一个名为redis_to_postgres.py的文件,它异步地获取redis流中的所有数据,并将数据推送到postgresql,并从我最近推送ids的redis流中清除内存。这是通过异步完成的。我希望在前一条管道启动后每15分钟计时一次。
这样做最实际的方法是什么?在这种情况下,我会创建3条独立的管道吗?一个用于streamA,一个用于streamB,第三个用于从redis读取并推送到postgresql并最终清理数据?或者,我会创建一个管道来以并行的方式流数据,而另一个管道只是读取并推送到postgres?谢谢
发布于 2022-02-23 04:22:38
有趣的用例!你是问≤1.0长官还是猎户座?对于Orion,有一个博客帖子更详细地讨论了这个问题,并显示了示例流程。
但我假设您要求的是Prefect≤1.0。
为了读取Redis中的数据并将其加载到Postgres,例如每10秒,您可以在Prefect任务中使用一个循环:
for iteration in range(1, 7):
logger.info("iteration nr %s", iteration)
read_from_redis_and_load_to_postgres() # your logic here
if iteration < 6:
logger.info("Sleeping for 10 seconds...")
time.sleep(10)
这个流程可以每分钟运行一次。这将为您提供重试、可观察性和所有Prefect功能,并且每10秒将数据加载到Postgres不应该淹没您的数据库。
但是对于获取实时数据并将其连续加载到Redis流的部分,您可以将其作为单独的服务运行,而不是作为Prefect流运行,因为Prefect 1.0流更适合批处理,并且希望在某个时候结束,以便判断流运行是否成功。如果你想让它成为一个永无止境的完美流,它可能会失去心跳,并被僵尸杀手程序杀死。因此,运行此部分可能更容易,例如,作为一个单独的容器化服务运行24/7,您可以将其部署为单独的Kubernetes部署或ECS服务。
这也取决于许多因素,包括。这段代码所做的是什么,这个API有多可靠(从它提取数据的源系统有一些速率限制吗?为什么是500张唱片?这500张唱片被填写的频率是多少?你写到Redis的频率有多高?)。
尽管如此,我还是想看看您是否可以在猎户座中实现它,类似于blog post示例所做的工作。我们目前正在收集关于Orion的流用例的反馈,所以如果您在Orion中实现这一点,我们将有兴趣听取您对此的反馈。
https://stackoverflow.com/questions/71229984
复制相似问题