我使用AvroIO.<MyCustomType>writeCustomTypeToGenericRecords()将通用记录写入流数据流作业中的GCS。在前几分钟,一切似乎都正常,但是,大约10分钟后,作业开始抛出以下错误:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: org.apache.avro.AvroRuntimeException: not open
com.google.cloud.dataflow.worker.GroupAlsoByWindowsPar
我有一个很小的流水线,我正在尝试执行:
文件放入GCS存储桶> 2.云函数在文件放入GCS存储桶(不工作)时触发数据流作业> 3.写入大查询表(此部分工作)
我已经通过Dataprep创建了一个数据流作业,因为它有很好的UI,可以在写入GCS表之前执行所有转换(写入BigQuery很好),并且云函数会在文件上传到BigQuery存储桶时触发。然而,云函数不会触发数据流作业(我在Dataprep中编写的)。
请看一下我的云函数的下面的示例代码,如果我能得到任何关于为什么数据流作业没有触发的指针。
/**
* Triggered from a message on a Cloud
我是数据流新手,如果我犯了什么错误,请原谅,
最近,我使用dataflow/beam来处理来自pubsub的几个数据,我以云数据流-纽约出租车大亨为起点,但我将其升级到SDK2.2.0以使其与Big Table一起工作。我使用http云函数来模拟它,该函数将单个数据发送到pubsub,这样数据流就可以使用下面的代码来吸收它。
.apply("session windows on rides with early firings",
Window.<KV<String, TableRow>>into(
new GlobalWindow
我正在编写一个mapReduce作业来读取和处理Avrofile。输入文件为Avro,输出格式为Avro
当我执行Mapreduce作业时,我在reducer阶段得到以下异常。当reducer抛出IOException时,我无法在reducer中捕获和消除它。色调中的错误堆栈跟踪看起来
java.io.IOException: Invalid int encoding
at org.apache.avro.io.DirectBinaryDecoder.readInt(DirectBinaryDecoder.java:113)
at org.apache.avro.io.ValidatingDe
目前,这些是用来安排执行我所知道的Dataflow的任务的:
使用App服务或云函数。
- This [example](https://cloud.google.com/blog/products/gcp/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions) is with Java, There are any official example with Python as simple?
- This [example](https://zablo.net/blog/po
我已经创建了一个自定义模板,它使用BigQuery I/O连接器从ReadFromBigQuery读取。我就是这样用的:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_be
我正在运行一个作业的谷歌数据流编写的阿帕奇光束,从BigQuery表和文件读取。转换数据并将其写入其他BigQuery表。作业“通常”会成功,但有时当我从大型查询表中读取数据时,会随机得到空指针异常,并且作业会失败:
(288abb7678892196): java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.split(BigQuerySourceBase.java:98)
at com.google.cloud.dataflow.worker.runners.work
我们有一个作业,它可以写入Bigtable (通过HBase API)。不幸的是,由于以下原因,它失败了:
java.io.IOException: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the
我创建了一个Dataflow模板,它允许我将数据从Cloud中的CSV文件导入到BigQuery中。我每天都使用云函数Firebase从这个模板创建工作。这是函数中的代码(删除了一些不相关的部分)。
const filePath = object.name?.replace(".csv", "");
// Exit function if file changes are in temporary or staging folder
if (
filePath?.includes("staging") |