首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在Apache /Dataflow的WriteToBigQuery转换中,如何使用Method.FILE_LOADS和Avro temp_file_format启用死信模式

在Apache /Dataflow的WriteToBigQuery转换中,如何使用Method.FILE_LOADS和Avro temp_file_format启用死信模式
EN

Stack Overflow用户
提问于 2022-09-03 04:40:37
回答 2查看 351关注 0票数 0

在这个文档中,Apache建议在写入BigQuery时使用死信模式。此模式允许您用'FailedRows'标记从转换输出中获取未能写入的行。

然而,当我尝试使用它时:

代码语言:javascript
运行
复制
WriteToBigQuery(
    table=self.bigquery_table_name,
    schema={"fields": self.bigquery_table_schema},
    method=WriteToBigQuery.Method.FILE_LOADS,
    temp_file_format=FileFormat.AVRO,
)

我的一个元素中的架构不匹配导致以下异常:

代码语言:javascript
运行
复制
Error message from worker: Traceback (most recent call last):
File 
    "/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1630, 
    in write self._avro_writer.write(row) File "fastavro/_write.pyx", line 647,
    in fastavro._write.Writer.write File "fastavro/_write.pyx", line 376,
    in fastavro._write.write_data File "fastavro/_write.pyx", line 320,
    in fastavro._write.write_record File "fastavro/_write.pyx", line 374,
    in fastavro._write.write_data File "fastavro/_write.pyx", line 283,
    in fastavro._write.write_union ValueError: [] (type <class 'list'>) do not match ['null', 'double'] on field safety_proxy During handling of the above exception, another exception occurred: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1198,
    in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 718,
    in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 841,
    in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1334,
    in apache_beam.runners.common._OutputProcessor.process_outputs File "/my_code/apache_beam/io/gcp/bigquery_file_loads.py", line 258,
    in process writer.write(row) File "/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1635,
    in write ex, self._avro_writer.schema, row)).with_traceback(tb) File "/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1630,
    in write self._avro_writer.write(row) File "fastavro/_write.pyx", line 647,
    in fastavro._write.Writer.write File "fastavro/_write.pyx", line 376,
    in fastavro._write.write_data File "fastavro/_write.pyx", line 320,
    in fastavro._write.write_record File "fastavro/_write.pyx", line 374,
    in fastavro._write.write_data File "fastavro/_write.pyx", line 283,
    in fastavro._write.write_union ValueError: Error writing row to Avro: [] (type <class 'list'>) do not match ['null', 'double'] on field safety_proxy Schema: ...

据我所知,架构不匹配会导致fastavro._write.Writer.write失败并引发异常。相反,我希望WriteToBigQuery应用死信行为,并将格式错误的行作为FailedRows标记的输出返回。有办法做到这一点吗?

谢谢

编辑:添加更详细的示例,说明我要做的事情:

代码语言:javascript
运行
复制
from apache_beam import Create
from apache_beam.io.gcp.bigquery import BigQueryWriteFn, WriteToBigQuery
from apache_beam.io.textio import WriteToText

...

valid_rows = [{"some_field_name": i} for i in range(1000000)]
invalid_rows = [{"wrong_field_name": i}]

pcoll = Create(valid_rows + invalid_rows)

# This fails because of the 1 invalid row
write_result = (
    pcoll 
    |  WriteToBigQuery(
        table=self.bigquery_table_name,
        schema={
            "fields": [
                {'name': 'some_field_name', 'type': 'INTEGER', 'mode': 'NULLABLE'},
            ]
        },
        method=WriteToBigQuery.Method.FILE_LOADS,
        temp_file_format=FileFormat.AVRO,
    )
)

# What I want is for WriteToBigQuery to partially succeed and output the failed rows.
# This is because I have pipelines that run for multiple hours and fail because of 
# a small amount of malformed rows
(
    write_result[BigQueryWriteFn.FAILED_ROWS] 
    | WriteToText('gs://my_failed_rows/')
)
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73589662

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档