我有一个信息流进来,想要将它们记录在BigQuery中,但问题是用户可以在启动apache_beam作业之前定义一个新的表名。此外,流中的架构可以更改超时。
我想知道是否有人使用Python实现了流apache_beam作业的自动表创建和模式更改。
发布于 2022-09-27 16:27:51
我更熟悉Beam,但听起来您需要的是使用动态目的地定义BigQuery接收器,这是一个存在于https://beam.apache.org/documentation/io/built-in/google-bigquery/#using-dynamic-destinations和Python中的概念。
动态目的地将允许您从消息本身中提取目标表的名称。您希望create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED允许创建表。BigQuery还有一个SchemaUpdateOptions的概念(至少对于加载作业),您可以通过additional_bq_parameters关键字参数来控制这个概念。
发布于 2022-09-28 08:59:52
我希望它能帮上忙
如果使用Avro作为来自Pub Sub的输入数据,则可以从Avro架构推断Bigquery架构。
如果要从Bigquery作业创建Beam表,则必须在BigqueryIO:create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED中设置此选项
Dataflow Java的一个例子:
https://cloud.google.com/architecture/streaming-avro-records-into-bigquery-using-dataflow
Python :Dict可以直接接收到Bigquery,而不必将当前元素转换为TableRow。对于TableSchema中的Python,我没有找到与useBeamSchema方法等价的
BigQueryIO.<GenericRecord>write()
.to(bigQueryTable)
.useBeamSchema()
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.optimizedWrites());但我认为可以从Avro模式自动推断Beam Bigquery模式。
如果不是这样,也许您必须实现您自己的从Avro模式到Beam Bigquery模式的转换。
Beam提出了Java:https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L206中模式转换的util方法
对于Python,util是从Bigquery到Avro,而不是相反的
我还看到了这个主题:Converting from Avro schema to Google BigQuery schema in python?
https://stackoverflow.com/questions/73869680
复制相似问题