目前没有S3ToBigQuery算子。
我的选择是:
GoogleCloudStorageToBigQueryOperator
这不是我渴望做的事。这意味着要支付双倍的存储费用。即使从任何一个仍然涉及支付的存储中删除该文件。S3下载到本地文件系统,并从文件系统加载到BigQuery -但是没有S3DownloadOperator,这意味着从头编写整个过程而不涉及气流。这就忽略了使用气流的意义。还有别的选择吗?你建议怎么做?
发布于 2018-09-03 14:26:12
如果第一个选项具有成本限制,您只需通过S3Hook 下载该文件使用PythonOperator:
from airflow.hooks.S3_hook import S3Hook
from datetime import timedelta, datetime
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
}
def download_from_s3(**kwargs):
hook = S3Hook(aws_conn_id='s3_conn')
hook.read_key(bucket_name='workflows-dev',
key='test_data.csv')
dag = DAG('s3_download',
schedule_interval='@daily',
default_args=default_args,
catchup=False)
with dag:
download_data = PythonOperator(
task_id='download_data',
python_callable=download_from_s3,
provide_context=True
)https://stackoverflow.com/questions/52149840
复制相似问题