首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >BigQuery /气流-不能创建分区表

BigQuery /气流-不能创建分区表
EN

Stack Overflow用户
提问于 2022-07-05 11:28:18
回答 1查看 178关注 0票数 0

我正在使用气流中的BigQueryCreateEmptyTableOperator在BigQuery项目中创建表。一切都很好,除了分区表。

当我试图将所需的字典(在此指定)传递给time_partitioning参数时,我会得到以下完全无意义的错误消息:

代码语言:javascript
运行
复制
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/MY_PROJECT/datasets/MY_DATASET/tables?prettyPrint=false: Invalid JSON payload received. Unknown name "timePartitioning" at 'table': Proto field is not repeating, cannot start list

我根本不知道从哪里开始调试这个。这是精确的运算符代码。如果没有通过time_partitioning,它就能正常工作。

代码语言:javascript
运行
复制
create = BigQueryCreateEmptyTableOperator(
   task_id="create",
   bigquery_conn_id='google_cloud',
   project_id="MY_PROJECT",
   dataset_id="MY_DATASET",
   table_id=table_name,
   schema_fields=self.get_schema(),
   time_partitioning={'field': 'ds', 'type' : 'DAY'},
)
EN

回答 1

Stack Overflow用户

发布于 2022-07-08 12:32:39

我使用下面的示例代码运行dag,它使用BigQueryCreateEmptyDatasetOperatorBigQueryCreateEmptyTableOperator创建数据集,并在BigQuery中使用time_partitioning参数创建分区表。

代码语言:javascript
运行
复制
import os
import time
import datetime
from datetime import datetime, date, time, timedelta

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateEmptyDatasetOperator,
    BigQueryCreateEmptyTableOperator,
)

from airflow.utils.dates import days_ago

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "testdataset")

TABLE_NAME = "partitioned_table"

SCHEMA = [
    {"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
    {"name": "ds", "type": "DATE", "mode": "NULLABLE"},
]

dag_id = "example_bigquery"


with models.DAG(
    dag_id,
    schedule_interval="@hourly",  # Override to match your needs
    start_date=days_ago(1),
    tags=["example"],
    user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME},
    default_args={"project_id": PROJECT_ID},
) as dag_with_locations:
    create_dataset = BigQueryCreateEmptyDatasetOperator(
        task_id="create-dataset", dataset_id=DATASET_NAME, project_id=PROJECT_ID
    )
    create_table = BigQueryCreateEmptyTableOperator(
        task_id="create_table",
        dataset_id=DATASET_NAME,
        table_id=TABLE_NAME,
        schema_fields=SCHEMA,
        time_partitioning={
            "type": "DAY",
            "field": "ds",
        },
    )
    
    create_dataset >> create_table

输出:

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72868680

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档