我们在GCP项目中使用有管理的气流。
当我使用BigQueryInsertJobOperator在查询文件中执行查询时,它使用设置值自动替换这些文件中的user_defined_macros。
from airflow import DAG
from datetime import datetime
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
'test',
schedule_interval = None,
在开发新功能的过程中,我正在尝试使用命令行界面测试我的DAG,但我无法做到这一点。我的DAGs,DAG_ID=sample_dag,文件: sample_dag.py驻留在~/airflow/dags文件夹(Ubuntu)中,可以通过网页界面(点击播放图标)执行。在DAG中有一些BASH操作符调用,并且每个脚本都被正确执行,并产生记录的输出。 但是,我无法通过命令行访问从相同文件夹运行的相同DAG的功能,例如: airflow render sample_dag all 2019-01-14T06:04:05 上面命令的输出是: Test Dag Begin Test Dag End ***
我试图在PythonOperator、_etl_lasic之间将数据传递给另一个运行良好的PythonOperator _download_s3_data,但是当传递的值是None时,我想抛出一个异常,它应该将任务标记为失败。
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowFailException
def _etl_lasic(**context):
path_s3 =
在airflow.cfg中,我设置了电子邮件的外观,但在发送电子邮件时,未应用我指定的设置/布局。
subject_template = 'Airflow alert: {{ti}}'
# File that will be used as the template for Email content (which will be rendered using Jinja2).
# If not set, Airflow uses a base template.
# Example: html_content_template = /path/to/my_html_cont
我正在使用apache airflow和BashOperator。每个BashOperator执行一个python脚本。例如:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
with DAG(dag_id='dag_example', default_args=None,
schedule_interval='0 2 */2 * *',
catchup=False) as dag:
需要帮助在On_failure_callback中呈现jinja模板电子邮件ID。
我知道呈现模板在SQL文件中工作得很好,或者对于具有template_fields .How的操作符,我在下面得到呈现jinja模板变量的代码。
它适用于Variable.get('email_edw_alert'),但我不想使用变量方法来避免碰到DB
下面是Dag文件
import datetime
import os
from functools import partial
from datetime import timedelta
from airflow.models import
我有一个长期运行的Composer气流任务,它使用KubernetesPodOperator启动一个任务。有时,它在大约两个小时后成功完成,但更多的情况下,它被标记为失败,并在气流工作日志中出现以下错误:
[2019-06-24 18:49:34,718] {jobs.py:2685} WARNING - The recorded hostname airflow-worker-xxxxxxxxxx-aaaaa does not match this instance's hostname airflow-worker-xxxxxxxxxx-bbbbb
Traceback (most
我正在尝试使用Jinja模板变量,而不是使用Variable.get('sql_path'),以避免每次扫描数据文件时命中DB
原码
import datetime
import os
from functools import partial
from datetime import timedelta
from airflow.models import DAG,Variable
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
from alerts.email_opera
谁能告诉我如何在本地气流部署中配置kuberenetes执行器。我创建了一个名为airflow-cluster的集群,并创建了pod_template.yaml,并在airflow.cfg中进行了以下更改。
[kubernetes]
# Path to the YAML pod file that forms the basis for KubernetesExecutor workers.
pod_template_file = /home/caxe/airflow/logs/yamls/pod_template.yaml
worker_container_repository = apac
我正在尝试运行一个简单的select查询(从biqquery),并使用Composer将结果集加载到另一个bq表中。然而,我在代码的最后一行得到了一个错误。
损坏的DAG: /home/airflow/gcs/dags/es_tc_etl_wkf_mtly.py无效语法(es_tc_etl_wkf_mtly.py,第47行)
代码:
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators impor
我知道这个问题以前有人问过,但没有一个答案得到回答。我开始有点疯狂了!,我很困惑,所以我真的很想得到帮助。
我有一个带有python操作符的DAG,它运行一个SQL查询并输出到.csv。第二个操作符只返回true,以便生成DAG。我似乎无法访问函数中的ds变量。我想这样做是为了传递给查询。
from airflow.models import Variable, DAG
from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from datetime impor
我在Red Hat Linux上运行airflow 1.10.3。我使用的是LocalExecutor,via服务器和调度程序都是通过systemd启动的。
调度程序生成的日志文件是完全可读的(即“-rw-”模式)。创建的日志目录为drwxrwxrwx。
这无法通过我的组织进行的安全扫描。我需要能够限制这些文件的权限。
/etc/profile中的umask为077。我还将UMask=0007添加到服务的systemd单元文件中。但是,尽管这似乎适用于dags/ logs /scheduler/目录中的日志,但它不会影响DAG运行日志。
[root@server logs]# ls -la s
我动态地创建了一个dags集合(对所有对象使用相同的.py )。有一个build-DAG是我不能运行的:
airflow.exceptions.AirflowException: dag_id could not be found: `build-DAG`. Either the dag did not exist or it failed to parse.
at get_dag (/usr/local/lib/python2.7/site- packages/airflow/bin/cli.py:130)
at run (/usr/local/lib/python2.7/site-p
我使用的是头盔图,使用安装了kubernetes的v1.10.8 puckle/docker-airflow映像,并在舵图中使用该映像,但我一直在使用
File "/usr/local/bin/airflow", line 37, in <module>
args.func(args)
File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 1140, in initdb
db.initdb(settings.RBAC)
File
我有一个DAG,它创建一个集群并向它提交一个作业。
我希望能够通过dag_run.conf参数定制集群(工人数量)和作业(传递给它的参数)。
集群创建
对于集群创建,我编写了一个逻辑,如下所示:
DataprocCreateClusterOperator(...
cluster_config = {...
num_workers = "{% if 'cluster' is in dag_run.conf and 'secondary_worker_config' is in dag_run.conf['c
我正在研究阿帕奇气流码头。默认情况下,默认用户airflow是组root和airflow的成员。groups airflow命令的输出如下: + groups airflow
airflow : root airflow 但是,当我尝试从组root中删除airflow用户时,它失败了。 + gpasswd -d airflow root
gpasswd: user 'airflow' is not a member of 'root'
Removing user airflow from group root 这背后的原因可能是什么? PS:我在这里试图解决