我正在尝试写一个BigTableAvroFn函数,它读取bigtable行,并使用apache光束将其转换为Avro通用记录.How我是否可以将行数据转换为一般记录?
pipeline.apply("Read from Bigtable", read)
.apply("Transform to Avro", ParDo(new BigtableToAvroFn()));
return pipeline.run();
}
静态类BigtableToAvroFn扩展了DoFn {
@ProcessElement
public void proces
正在运行java作业以读取Avro文件,但一直收到错误。我在寻求帮助- 这是代码- // Get Avro Schema
String schemaJson = getSchema(options.getAvroSchema());
Schema schema = new Schema.Parser().parse(schemaJson);
// Check schema field types before starting the Dataflow job
checkFieldTypes(schema);
// Create the Pipeline object with the
因为我不允许在同一个线程中问我的问题,而另一个人有同样的问题(但不使用模板),所以我正在创建这个新线程。
问题是:我创建了一个数据流作业,从gcp中的一个模板到把酒吧/潜艇中的数据摄取到BQ中。在作业执行之前,这一切都很好。这份工作被“卡住”了,没有写任何关于烧烤的东西。
我不能做这么多,因为我不能在模板中选择光束版本。这是一个错误:
Processing stuck in step WriteSuccessfulRecords/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 01h00m00s without
当试图编写avro时,我得到以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 35.0 failed 1 times, most recent failure: Lost task 7.0 in stage 35.0 (TID 110, localhost): java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.avro.mapred.AvroWrapper
我在一个
我尝试在Google Cloud Dataflow中运行Apache光束管道(Python),这是由Google Cloud Coomposer中的DAG触发的。 我的dags文件夹在各自的GCS存储桶中的结构如下: /dags/
dataflow.py <- DAG
dataflow/
pipeline.py <- pipeline
setup.py
my_modules/
__init__.py
commons.py <- the module I want to import in the pipeline se
Apache Avro能否在序列化期间处理参数化类型?
当我尝试序列化一个使用泛型的实例时,我发现Avro框架抛出了这个异常。
org.apache.avro.AvroTypeException: Unknown type: T
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:255)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:514)
at org.apache.avro.ref
我一直在编写一个数据流管道,并且正在使用flex模板。 我的代码从avro读取并处理它没有问题。但是当涉及到WriteToAvro或WriteToText时,数据流作业会失败,而且看起来像是在模板验证时失败。我完全没有理由这样做。 我试过很多方法。删除输出文件的参数并将其硬编码到中。为WriteToText切换WriteToAvro,但它还是失败了。 with beam.Pipeline(options=options) as p:
read_from_avro = p \
| 'ReadFromAvro
下面的代码简单地将数据写入avro格式,并从写入的avro文件中读取和显示相同的数据。我只是在尝试Hadoop权威指南中的示例。这是我第一次能够执行。然后我得到了下面的错误。它第一次确实起作用了。所以我不确定我犯了什么错误。
这是一个例外:
Exception in thread "main" java.io.EOFException: No content to map to Object due to end of input
at org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.
尝试在GCP云函数中创建GCP数据流。我已经部署了一个简单的apache函数,它工作得很好,但是当我试图读取文件时会出现路径错误。当我使用参数-runner从本地运行时,与Dataflowrunner一样,相同的脚本运行,有人建议我必须执行pip安装apache-beamgcp。我已经在当地做过了,而且效果很好。如果我试图在GCP中安装它,它会在一段时间后进行会话超时。下面是我的密码。
#import print library
# This script will read all avro files on a path and print them
import logging
imp
我的项目运行的是Python2.7(是的,我知道...)Google Dataflow上的Apache Beam 2.19。我们连接到BigQuery的方式与Apache光束教程中指定的方式相同:
p | 'Get data from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(
query=get_query(limit),
use_standard_sql=True)))
然而,此管道的读取步骤非常慢-很可能是由于读取.avro文件所致。不过,看起来fastavro似乎并没有真正被使用。AFA