首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用Dataflow (Python)的Pub/Sub到BigQuery (批处理)

使用Dataflow (Python)的Pub/Sub到BigQuery (批处理)
EN

Stack Overflow用户
提问于 2021-07-28 07:37:20
回答 1查看 1K关注 0票数 0

我用Python创建了一个流数据流管道,只想澄清我下面的代码是否达到了我的预期。我打算这样做:

从Pub/Sub continuously

  • Batch负载到BigQuery的
  1. 每1分钟消耗一次,而不是流,以降低

的成本。

这是Python中的代码片段

代码语言:javascript
运行
复制
options = PipelineOptions(
    subnetwork=SUBNETWORK,
    service_account_email=SERVICE_ACCOUNT_EMAIL,
    use_public_ips=False,
    streaming=True,
    project=project,
    region=REGION,
    staging_location=STAGING_LOCATION,
    temp_location=TEMP_LOCATION,
    job_name=f"pub-sub-to-big-query-xxx-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
)

p = beam.Pipeline(DataflowRunner(), options=options)


pubsub = (
        p
        | "Read Topic" >> ReadFromPubSub(topic=INPUT_TOPIC)
        | "To Dict" >> Map(json.loads)
        | "Write To BigQuery" >> WriteToBigQuery(table=TABLE, schema=schema, method='FILE_LOADS',
                                                 triggering_frequency=60, max_files_per_bundle=1,
                                                 create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                                 write_disposition=BigQueryDisposition.WRITE_APPEND))

我可以知道上面的代码是否正在做我想做的事情吗?从Pub/Sub流,每60秒,它将批处理插入到BigQuery。我故意将max_files_per_bundle设置为1,以防止创建多个碎片,以便每分钟只加载一个文件,但不确定是否正确。Java有withNumFileShards选项,但我在Python中找不到相应的选项。我参考下面的文档:https://beam.apache.org/releases/pydoc/2.31.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery

https://cloud.google.com/blog/products/data-analytics/how-to-efficiently-process-both-real-time-and-aggregate-data-with-dataflow

只是好奇我是否应该用窗口来实现我想做的事情?

代码语言:javascript
运行
复制
options = PipelineOptions(
    subnetwork=SUBNETWORK,
    service_account_email=SERVICE_ACCOUNT_EMAIL,
    use_public_ips=False,
    streaming=True,
    project=project,
    region=REGION,
    staging_location=STAGING_LOCATION,
    temp_location=TEMP_LOCATION,
    job_name=f"pub-sub-to-big-query-xxx-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
)

p = beam.Pipeline(DataflowRunner(), options=options)

pubsub = (
        p
        | "Read Topic" >> ReadFromPubSub(topic=INPUT_TOPIC)
        | "To Dict" >> Map(json.loads)
        | 'Window' >> beam.WindowInto(window.FixedWindows(60), trigger=AfterProcessingTime(60),
                                      accumulation_mode=AccumulationMode.DISCARDING)
        | "Write To BigQuery" >> WriteToBigQuery(table=TABLE, schema=schema, method='FILE_LOADS',
                                                 triggering_frequency=60, max_files_per_bundle=1,
                                                 create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                                 write_disposition=BigQueryDisposition.WRITE_APPEND))

第一种方法在没有第二种方法的窗口的情况下是足够好的?我现在使用的是第一种方法,但我不确定是否每分钟都有从多个文件执行多个加载,或者它实际上将所有发布/子消息合并为1并执行单个大容量加载?

谢谢!

EN

回答 1

Stack Overflow用户

发布于 2021-11-02 10:18:36

不是python解决方案,但我最终求助于Java版本。

代码语言:javascript
运行
复制
public static PTransform<PCollection<String>, PCollection<TableRow>> jsonToTableRow() {
    return new JsonToTableRow();
}

private static class JsonToTableRow
        extends PTransform<PCollection<String>, PCollection<TableRow>> {

    @Override
    public PCollection<TableRow> expand(PCollection<String> stringPCollection) {
        return stringPCollection.apply("JsonToTableRow", MapElements.via(
                new SimpleFunction<String, TableRow>() {
                    @Override
                    public TableRow apply(String json) {
                        try {
                            InputStream inputStream =
                                    new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
                            return TableRowJsonCoder.of().decode(inputStream, Context.OUTER);
                        } catch (IOException e) {
                            throw new RuntimeException("Unable to parse input", e);
                        }
                    }
                }));
    }
}


public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);
    options.setDiskSizeGb(10);

    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply("Read from PubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
            .apply(jsonToTableRow())
            .apply("WriteToBigQuery", BigQueryIO.writeTableRows().to(options.getOutputTable())
                    .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                    .withTriggeringFrequency(Duration.standardMinutes(1))
                    .withNumFileShards(1)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    pipeline.run();
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68556242

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档