~$ airflow版本2.1.2 python 3.8 我正在尝试使用dag在我的redshift集群上执行一些基本查询,但任务失败并出现异常(日志中未显示) import datetime
import logging
from airflow import DAG
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOp
由于某些原因,宏'{{ ds }}‘不递增,执行日期递增,但它不适用于宏出于某种原因,我尝试了一个干净的新dag id,但仍然没有帮助。你知道吗?
from airflow import DAG
from datetime import datetime,timedelta
from airflow.operators.postgres_operator import PostgresOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import sys
sys.path.inser
我使用的是PostgresOperator,我想把表名后缀传递给我的SQL查询,这样当它查询数据时,它就会动态地从for循环迭代中读取数据。 for country in countries:
matchTimeStamp = ShortCircuitOperator(task_id='Match_Updated_dates_{}'.format(country), provide_context=True,
python_callable=match_dates,op_kwargs={'k
我已经创建了一个红移连接在气流中,附在屏幕截图中。在此之后,我将RedshiftToS3Operator导入到DAG中以运行红移查询,并将csv存储在s3中。
from datetime import timedelta, datetime
import pytz
import airflow
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import Redshift
我在一些ETL中使用气流,在某些阶段,我希望使用临时表(主要是为了使代码和数据对象保持独立,并避免使用大量元数据表)。
使用气流中的Postgres连接和" PostgresOperator“,我发现的行为是:对于,每次执行PostgresOperator,我们在数据库中有一个新的连接(或会话)。换句话说:我们丢失了DAG上一个组件的所有临时对象。
为了模拟一个简单的示例,我使用了以下代码(不要运行,只需查看对象):
import os
from airflow import DAG
from airflow.operators.postgres_operator import Pos
我有一个sql文件,我想使用PostGresOperator将参数传递给这个sql文件。 """select * from table_{} where id > ID """.format(mytable,myID) 我的postGresOperator mport_redshift_table = PostgresOperator(
task_id='copy_data_from_redshift_{}'.format(country),
postgres_conn_id
我希望将查询“从<table_name>中选择计数(*)”的结果从PostgresOperator中推送到PythonOperator,后者是pythonCallable,该值将用于python函数。我如何在其中使用XCom?我尝试过不同的方法,但每次我在python函数中得到一个未定义的变量时都会出错。
def <python_function_name>():
"""variable to be used in this code."""
python_airflow_step = PythonOperator(
我正在尝试创建一个气流管道,用于从API下载数据,处理数据,将其保存为CSV,然后将数据加载到Postgres数据库(所有数据都在码头容器中)。代码看起来像这样
from datetime import datetime, timedelta
import pandas as pd
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator
defaul
我正在尝试编写一个管道,postgres db应该使用csv的内容进行更新,当它被带到文件夹时。我已经编写了一个dag,它创建了表,并在从web中触发csv内容时推送它。下面是代码:
from datetime import datetime
from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator
我正在尝试运行一个气流管道,其中我有一个主要Postgres任务,然后是一个Python循环任务,然后是一个最终postgres任务,所以这个dag类似于如下所示
PythonOperator
|Task B.1|
Postgres |Task B. | Postgres
Task A ------|Task B. |-----> Task C
|Task B.n|
所以,我写了这篇文章:
import glob
import logging
import os
from datetim
我们可以在气流后置参数中传递数组列表吗?
我在尝试像这样的东西
Select hotel_name from hoteldetails where id in ({{ params.hotel_ids }});
在“s3_to_redshift.sql”中。
下面是我的运算符:
s3_to_redshift = PostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
para
因此,我试图在partial()中使用PostgresOperator方法,但是我得到这个错误显然是因为我无意中将schedule_interval传递给它。我查找了,BasicOperator的分部()方法没有这样的参数,我假设它是所有操作符类的父类。
所以我很困惑,我必须把这个参数传递给DAG,但是.partial()没有这样的参数,那么我应该如何创建这个进程和任务呢?我还没有找到任何关于如何实现它的信息。
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.
几个小时前,这段代码运行良好,但突然之间,我的伙伴们开始陷入“排队”状态。
下面是我试图运行的内容(我只是手动触发):
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime as dt
def test_function():
print('Hello there')
default_args = {
'owner': 'airflow',
我是airflow的新手。我根据官方文件设置了所有的东西。我使用pet example DAG,但是当我查看DAG日志时,它显示了以下错误:
第一错误从populate_pet_table开始
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type integer: "Maxy"
LINE 2: INSERT INTO pet VALUES ('Maxy', 'Dog', '2018-07-...
第二错误从get_birth
我正在使用气流2.0。我试图连接到红移,使用码头集装箱上的mac。
这是我的dag.py:
from airflow import DAG
from datetime import datetime,timedelta
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator,BranchPythonOperator
from airflow.providers.jdbc.hooks.jdbc import JdbcHook
#Creating
我需要使用copy命令将s3文件复制到红移。我对气流有点陌生,遇到了一些问题。有人能改正下面的代码吗?我可以这样调用rs.execute()吗? Error:
op.execute()
TypeError: execute() missing 1 required positional argument: 'context' 代码: import os
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator impor
我发现xcom实际上将数据写入数据库并从其他任务中提取数据。我的数据集很大,对它进行分类和写入数据库会造成一些不必要的延迟。是否有一种不使用xcom在同一气流Dag中的任务之间通信数据的方法?
下面是我尝试过的代码,上下文实际上没有被传递。我知道使用task_instance.xcom_push()可以工作,但它也会对数据进行筛选,并将其写入我不需要的数据库中。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, t
我想在get_birth_date中使用来自get_all_pets的结果。我如何在get_birth_date中访问它?此外,我还想打印get_all_pets的结果,我在哪里可以看到这样的print()?
在我的代码中哪里可以这样做?
import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
# create_pet_table, populate_pet_table, get_all_pets, and get_b