首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在流管道中使用WriteToBigQuery FILE_LOADS只会创建大量临时表(python SDK)

在流管道中使用WriteToBigQuery FILE_LOADS只会创建大量临时表(python SDK)
EN

Stack Overflow用户
提问于 2020-10-26 01:07:27
回答 1查看 467关注 0票数 3

我有一个流管道,它从发布/订阅中获取消息,解析它们,并将它们写入BigQuery。挑战在于,每条消息都会根据消息中的event属性转到不同的事件表,并且它们是没有排序的。

这意味着(我相信) WriteToBigQuery方法不能有效地批量写入,我看到它基本上一次写入一条消息,因此它运行得太慢了。我还尝试添加了一个60秒的窗口,并添加了一个GroupByKey/FlatMap来尝试对它们进行重新排序,但在加快速度方面只取得了很小的成功。

使用WriteToBigQuery中的FILE_LOADS方法和60+秒触发频率,它似乎可以工作,发送加载作业,然后(至少有时)成功,我看到数据进入正确的表中。但是,创建的临时表永远不会被删除,所以我创建了成百上千个表(名称类似于beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_NAME_STEP_756_37417blahblahblah)...which显然是不可持续的。

通过STREAMING_INSERTS编写代码运行良好,只是速度较慢,这是一种使其更高效的尝试。

如果有人能帮我弄清楚为什么这些表没有被删除,我想这会给我一个有效、高效的管道。我尝试了更长的触发频率(最多1小时),但同样的行为发生了。

这是我的主要渠道-同样,我没有任何问题,其余的,只是提供上下文。

代码语言:javascript
复制
    events, non_events = (p 
        | 'ReadData' >> beam.io.ReadFromPubSub(subscription = known_args.input_subscription).with_output_types(bytes)
        | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
        | 'Parse JSON to Dict' >> beam.Map(lambda line: json.loads(line))
        | 'FilterOutNonEvents' >> beam.ParDo(FilterOutNonEvents()).with_outputs('MAIN_OUT', 'non_events')
    )
    
    parsed, missing_tables, _ = (events
        | 'ParseDict' >> beam.ParDo(ParseDict()).with_outputs('MAIN_OUT', 'missing_tables', 'ignore')
    )
    
    results, conversion_errors = (parsed
        | 'ConvertDataTypes' >> beam.ParDo(ConvertDataTypes()).with_outputs('MAIN_OUT', 'error_data')
    )
    
    final = (results
        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
                table = lambda record: '{project}:{dataset}.{table}'.format(project = known_args.project, dataset = known_args.dataset, table = parse_event_to_dataset_name(patterns, record["event"])),
                schema = lambda tbl: {'fields':[{'name':c.split(':')[0], 'type':c.split(':')[1]} for c in schema_json[tbl.split('.')[-1]].split(',')]},
                create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
                method = 'FILE_LOADS',
                triggering_frequency = 60
        )
    )

GCS arg是由消息的event属性确定的,而schema arg只是一个全局变量的重新格式化的片段(最初是从table读取的,同样,使用streaming_inserts也没有问题)。

感谢任何能帮上忙的人!我对此感到非常头疼(我对beam/dataflow还很陌生)。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-10-27 03:16:51

在使用具有多个分区和/或动态目标的LOAD_FILES时,行为应为follows

代码语言:javascript
复制
'''
2. Multiple partitions and/or Dynamic Destinations:

    When there are multiple partitions of files destined for a single
    destination or when Dynamic Destinations are used, multiple load jobs
    need to be triggered for each partition/destination. Load Jobs are
    triggered to temporary tables, and those are later copied to the actual
    appropriate destination table. This ensures atomicity when only some
    of the load jobs would fail but not other. If any of them fails, then
    copy jobs are not triggered.
'''

code中,似乎在加载作业之后,beam应该等待它们完成,然后从临时表中复制数据并删除它们;但是,当与流管道一起使用时,它似乎不会完成这些步骤。在我使用DirectRunner复制的时候,它甚至没有传到CopyJob上。我建议报告给apache beam团队here

尽管如此,对于您的用例,我会重新考虑使用加载作业方法,因为您可能很快就会达到loadcopy作业的配额;而streaming inserts可能更适合此场景,并且可能提供比每60+秒加载作业更好的插入性能

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64526500

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档