首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何利用气流实现S3到BigQuery?

如何利用气流实现S3到BigQuery?
EN

Stack Overflow用户
提问于 2018-09-03 12:29:57
回答 3查看 2.7K关注 0票数 1

目前没有S3ToBigQuery算子

我的选择是:

  1. 使用S3ToGoogleCloudStorageOperator,然后使用GoogleCloudStorageToBigQueryOperator 这不是我渴望做的事。这意味着要支付双倍的存储费用。即使从任何一个仍然涉及支付的存储中删除该文件。
  2. 将文件从S3下载到本地文件系统,并从文件系统加载到BigQuery -但是没有S3DownloadOperator,这意味着从头编写整个过程而不涉及气流。这就忽略了使用气流的意义。

还有别的选择吗?你建议怎么做?

EN

回答 3

Stack Overflow用户

发布于 2018-09-24 20:48:01

这就是我最后的下场。应该将其转换为S3toLocalFile运算符。

代码语言:javascript
复制
def download_from_s3(**kwargs):
    hook = S3Hook(aws_conn_id='project-s3')    

    result = hook.read_key(bucket_name='stage-project-metrics',
                           key='{}.csv'.format(kwargs['ds']))

    if not result:
        logging.info('no data found')
    else:
        outfile = '{}project{}.csv'.format(Variable.get("data_directory"),kwargs['ds'])

        f=open(outfile,'w+')
        f.write(result)
        f.close()

    return result
票数 1
EN

Stack Overflow用户

发布于 2018-09-03 14:26:12

如果第一个选项具有成本限制,您只需通过S3Hook 下载该文件使用PythonOperator:

代码语言:javascript
复制
from airflow.hooks.S3_hook import S3Hook
from datetime import timedelta, datetime
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}
def download_from_s3(**kwargs):


    hook = S3Hook(aws_conn_id='s3_conn')

    hook.read_key(bucket_name='workflows-dev',
                   key='test_data.csv')

dag = DAG('s3_download',
          schedule_interval='@daily',
          default_args=default_args,
          catchup=False)

with dag:
download_data = PythonOperator(
        task_id='download_data',
        python_callable=download_from_s3,
        provide_context=True
    )
票数 0
EN

Stack Overflow用户

发布于 2018-09-04 08:37:57

相反,您可以使用S3ToGoogleCloudStorageOperator,然后使用带有external_table表标志的GoogleCloudStorageToBigQueryOperator,即传递external_table =True

这将创建一个指向GCS位置的外部数据,并且不会将数据存储在BigQuery中,但您仍然可以查询它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52149840

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档