首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >合并文件并将其插入BigQuery表中

合并文件并将其插入BigQuery表中
EN

Stack Overflow用户
提问于 2022-10-11 04:23:53
回答 2查看 69关注 0票数 0

我有一个文件夹,可以获得大量的JSON文件,但是每个JSON只有1条记录。JSON文件记录示例:-

代码语言:javascript
运行
复制
{"ID":"3193559","Title":"Una Familia de Diez - El secreto","Description":"Martina escucha que la Nena tiene novio y la amenaza con decirles a todos si no hace todo lo que le pida, pero despu\u00e9s la familia descubre su gran secreto.","Program":"Una Familia de Diez","Season":"1","Episode":"16","Source":"Televisa","Category":"Comedy","Syndicator":"CSv2","[CSv2] external_id":"UFDD100023004","[CSv2] pub_win_US_begin":"1657166400","[CSv2] pub_win_US_end":"1924923600","[CSv2] language":"es","[CSv2] title":"Una Familia de Diez - El secreto","[CSv2] descriptive_title":"El secreto","[CSv2] description":"Martina escucha que la Nena tiene novio y la amenaza con decirles a todos si no hace todo lo que le pida, pero despu\u00e9s la familia descubre su gran secreto.","[CSv2] supplier":"Televisa","[CSv2] categories":"Comedy","[CSv2] rating":"TV-14","[CSv2] subratings":"D,L","[CSv2] program_type":"SERIES","[CSv2] entity":"","[CSv2] exception_countries":"US ,\tUM ,PR ,\tMX ,\tAR ,\tCL ,\tCO ,\tPE ,\tEC ,\tCR ,\tSV ,\tHN ,\tBO ,\tPA ,\tDO ,\tNI ,\tPY ,\tVE ,\tUY ,\tGT","[CSv2] episode_type":"","TMS ID":"EP009112420015","external_id":"UFDD100023004","Content Type":"Entertainment","Release Year":"2007","sports_event_ID":""}

我是Python和GCP的新手。在这个问题上需要帮助:-如何将Python中的所有JSON文件合并,然后将其数据插入到我需要在DAG中创建的BigQuery表中,然后将这些文件移动到另一个文件夹中,一旦插入到BQ表中。还需要根据id将错位表数据与最终表合并,一旦插入,需要删除错位表吗?这样,每当新文件出现时,它就会重复整个过程?

我试过用Python来读取JSON文件,但它不起作用:-

代码语言:javascript
运行
复制
def map_keys(
    bucket_name, file_path, list_of_files
):  # pass the folder as an argument
    logging.info(f"bucket_name: {bucket_name}")
    logging.info(f"file_path: {file_path}")
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    logging.info(f"list_of_files from the DAG: {list_of_files}")
    blobs = storage_client.list_blobs(
        bucket_or_name=bucket_name, prefix=mcp_source_folder
    )
    blobs = [blob for blob in blobs if "json" in blob.name]
    logging.info(f"The process found {len(blobs)} files to insert")
    if not os.path.exists("unprocessed"):
        os.makedirs("unprocessed")
    if blobs:
        for blob in blobs:
            json_content = blob.download_as_string()
            mcp_data = json.loads(json_content)
            file_name = blob.name.split("/")[-1]
            logging.info(
                f"file to store: {file_name} with {len(mcp_data)} rows"
            )
            path_unprocessed_file = f"unprocessed/{file_name}"
            unprocessed_blob = bucket.blob(path_unprocessed_file)
            with open(path_unprocessed_file, "w") as unprocessed_file:
                for datum in mcp_data:
                    model_datum = McpModel.parse_obj(datum)
                    unprocessed_file.write(json.dumps(model_datum.dict()))
                    unprocessed_file.write("\n")
            unprocessed_blob.upload_from_filename(path_unprocessed_file)
EN

回答 2

Stack Overflow用户

发布于 2022-10-11 09:40:22

我可以给你一个解决方案,希望能有所帮助。

您可以使用以下步骤:

  • 截断您的暂存表
  • 通过Json操作符将所有输入的GCS文件加载到Bigquery暂存表中
  • 在您的暂存和基于最终表的merge之间使用Airflow启动一个ID查询
  • 将输入Json处理文件移动到另一个文件夹

例子:

代码语言:javascript
运行
复制
import airflow
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator


with airflow.DAG(
        "your_dag",
        default_args=args,
        schedule_interval=None) as dag:

   truncate_staging_table = BigQueryInsertJobOperator(
        task_id='truncate_staging_table',
        configuration={
            "query": {
                "query": "TRUNCATE TABLE `project.dataset.staging_table`",
                "useLegacySql": False,
            }
        },
        location='EU'
    )

   load_json_to_staging_table = GCSToBigQueryOperator(
        task_id='load_json_to_staging_table',
        bucket='your_bucket',
        source_objects='your_folder/*.json'],
     destination_project_dataset_table='your_project:your_dataset.your_staging_table',
        source_format='NEWLINE_DELIMITED_JSON',
        compression='NONE',
        create_disposition='CREATE_NEVER',
        skip_leading_rows=1,
        write_disposition='WRITE_APPEND',
        autodetect=True
    )

   # In this example I used autodetect schema for files insertion 
   # on Bigquery, but you can also use a json schema for Bigquery with 
   # schema_fields

   merge_query = """
        MERGE `project.dataset.final_table` T
        USING `project.dataset.staging_table` S
        ON T.ID = S.ID
        WHEN MATCHED THEN
        
        UPDATE
        SET
            episode_type = S.episode_type,
            sports_event_ID = S.sports_event_ID
        
        WHEN NOT MATCHED THEN
        INSERT ROW;
        """

   merge_final_table = BigQueryInsertJobOperator(
        task_id='merge_staging_to_final_table',
        configuration={
            "query": {
                "query": merge_query,
                "useLegacySql": False,
            }
        },
        location='EU'
    )

   move_treaten_files = GCSToGCSOperator(
            task_id='move_treaten_files',
            source_bucket=your_source_bucket,
            source_object='source_folder/*',
            destination_bucket=your_dest_bucket,
            destination_object='dest_folder/',
            move_object=True
        )

   truncate_staging_table >> load_json_to_staging_table >> merge_final_table >> move_treaten_files 

我写了一篇关于Medium的文章,给出了一个关于Bigquery批处理管道和Airflow DAG的数据去重复的完整用例:

用气流和数据流消除Bigquery批处理管道中的重复

本文展示了使用Bigquery merge查询的许多方法。

票数 1
EN

Stack Overflow用户

发布于 2022-10-11 07:15:34

如果你已经有1条线,你是幸运的。所以,你有两个解决方案

  • 或者使用带有通配符的BigQuery加载作业功能来选择所有文件(例如,云存储中有一个公共前缀(也称为“目录”,但目录不存在)。
  • 或者从其中一个我的文章中获得灵感,查询Cloud,循环对象并使用compose特性将所有文件合并到一个文件中。然后,在这个单一作业上调用BigQuery加载作业特性。
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74022961

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档