我正在使用谷歌数据流上的阿帕奇光束。 我的流水线从BigQuery读取数据,但它依赖于执行参数。我应该能够用一个点(经度,纬度)和几个点来运行管道。 只有一点,解决方案很简单:我可以将查询设置为ValueProvider。 select *
from UserProfile
where id_ in ( select distinct userid
from locations
where ST_DWITHIN(ST_GeogPoint(longitude, latitude),
我想读取一个csv文件,并将其写入到BigQuery使用阿帕奇光束数据流。为此,我需要将数据以字典的形式呈现给BigQuery。如何使用apache beam转换数据以实现此目的?
我的输入csv文件有两列,我想在BigQuery中创建一个后续的两列的表。我知道如何在BigQuery中创建数据,这很简单,我不知道的是如何将csv转换成字典。下面的代码是不正确的,但应该给出了我想要做什么的想法。
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local
我将数据从卡夫卡流式传输到BigQuery,使用的是带有谷歌数据流运行器的阿帕奇光束。我想使用insertId进行重复数据删除,我在谷歌文档中找到了这一点。但是,即使这些插入在几秒钟内发生,我仍然可以看到许多具有相同insertId的行。现在我在想,也许我没有正确使用API来利用BQ提供的流式插入的重复数据删除机制。
我在beam中编写的代码如下所示:
payments.apply("Write Fx Payments to BQ", BigQueryIO.<FxPayment>write()
.withFormatFunction(ps -
我正在运行一个作业的谷歌数据流编写的阿帕奇光束,从BigQuery表和文件读取。转换数据并将其写入其他BigQuery表。作业“通常”会成功,但有时当我从大型查询表中读取数据时,会随机得到空指针异常,并且作业会失败:
(288abb7678892196): java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.split(BigQuerySourceBase.java:98)
at com.google.cloud.dataflow.worker.runners.work
我想读取一个GZIP压缩的帕奎特文件从GCS到BigQuery使用Python SDK for Apache光束。但是,apache_beam.io.parquetio.ReadFromParquet方法似乎不支持从压缩文件中读取。根据源代码,压缩类型被硬编码为UNCOMPRESSED。
有没有一个技巧来读取压缩的拼图文件,而不需要在GCS中预先解压缩文件?如果这是唯一的方法,有没有办法在GCS中直接解压缩文件?
我有一个DML指令,每小时在Bigquery中运行一次,类似于下面的指令: MERGE dataset.DetailedInventory T
USING dataset.Inventory S
ON T.product = S.product
WHEN NOT MATCHED AND quantity < 20 THEN
INSERT(product, quantity, supply_constrained, comments)
VALUES(product, quantity, true, ARRAY<STRUCT<cr
我的项目运行的是Python2.7(是的,我知道...)Google Dataflow上的Apache Beam 2.19。我们连接到BigQuery的方式与Apache光束教程中指定的方式相同:
p | 'Get data from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(
query=get_query(limit),
use_standard_sql=True)))
然而,此管道的读取步骤非常慢-很可能是由于读取.avro文件所致。不过,看起来fastavro似乎并没有真正被使用。AFA
因为我不允许在同一个线程中问我的问题,而另一个人有同样的问题(但不使用模板),所以我正在创建这个新线程。
问题是:我创建了一个数据流作业,从gcp中的一个模板到把酒吧/潜艇中的数据摄取到BQ中。在作业执行之前,这一切都很好。这份工作被“卡住”了,没有写任何关于烧烤的东西。
我不能做这么多,因为我不能在模板中选择光束版本。这是一个错误:
Processing stuck in step WriteSuccessfulRecords/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 01h00m00s without