首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Airflow -2警报已发送on_failure

在 Apache Airflow 中,您可以配置任务在失败时发送警报。通常,您可以使用 Airflow 的 on_failure_callback 参数来实现这一点。以下是一个详细的示例,展示如何在任务失败时发送电子邮件警报。

1. 配置 SMTP 服务器

首先,确保您的 Airflow 配置文件 (airflow.cfg) 中已正确配置 SMTP 服务器。以下是一个示例配置:

代码语言:javascript
复制
[smtp]
smtp_host = smtp.example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = your_email@example.com
smtp_password = your_password
smtp_port = 587
smtp_mail_from = your_email@example.com

2. 创建自定义失败回调函数

接下来,创建一个自定义的失败回调函数,该函数将在任务失败时发送电子邮件警报。

代码语言:javascript
复制
from airflow.utils.email import send_email
from airflow.hooks.base_hook import BaseHook

def task_failure_alert(context):
    dag_id = context.get('dag').dag_id
    task_id = context.get('task').task_id
    execution_date = context.get('execution_date')
    log_url = context.get('task_instance').log_url

    subject = f"Airflow alert: {dag_id}.{task_id} Failed"
    html_content = f"""
    <h3>Task Failed</h3>
    <p><strong>Dag:</strong> {dag_id}</p>
    <p><strong>Task:</strong> {task_id}</p>
    <p><strong>Execution Time:</strong> {execution_date}</p>
    <p><strong>Log URL:</strong> <a href="{log_url}">{log_url}</a></p>
    """

    send_email('alert@example.com', subject, html_content)

3. 将失败回调函数添加到任务中

在您的 DAG 文件中,将自定义的失败回调函数添加到任务中。

代码语言:javascript
复制
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'on_failure_callback': task_failure_alert
}

dag = DAG(
    'example_failure_alert',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 1, 1),
    catchup=False,
)

def failing_task():
    raise ValueError("This task is supposed to fail.")

start = DummyOperator(
    task_id='start',
    dag=dag,
)

fail = PythonOperator(
    task_id='fail',
    python_callable=failing_task,
    dag=dag,
)

start >> fail

4. 验证配置

确保您的 Airflow 实例正在运行,并且 DAG 已正确加载。您可以通过 Airflow Web 界面手动触发 DAG 以验证电子邮件警报是否在任务失败时发送。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

从0到1搭建大数据平台之调度系统

记得第一次参与大数据平台从无到有的搭建,最开始任务调度就是用的Crontab,分时日月周,各种任务脚本配置在一台主机上。crontab 使用非常方便,配置也很简单。刚开始任务很少,用着还可以,每天起床巡检一下日志。随着任务越来越多,出现了任务不能在原来计划的时间完成,出现了上级任务跑完前,后面依赖的任务已经起来了,这时候没有数据,任务就会报错,或者两个任务并行跑了,出现了错误的结果。排查任务错误原因越来麻烦,各种任务的依赖关系越来越负责,最后排查任务问题就行从一团乱麻中,一根一根梳理出每天麻绳。crontab虽然简单,稳定,但是随着任务的增加和依赖关系越来越复杂,已经完全不能满足我们的需求了,这时候就需要建设自己的调度系统了。

02

印尼医疗龙头企业Halodoc的数据平台转型之路:基于Apache Hudi的数据平台V2.0

数据平台已经彻底改变了公司存储、分析和使用数据的方式——但为了更有效地使用它们,它们需要可靠、高性能和透明。数据在制定业务决策和评估产品或 Halodoc 功能的性能方面发挥着重要作用。作为印度尼西亚最大的在线医疗保健公司的数据工程师,我们面临的主要挑战之一是在整个组织内实现数据民主化。Halodoc 的数据工程 (DE) 团队自成立以来一直使用现有的工具和服务来维护和处理大量且多样的数据,但随着业务的增长,我们的数据量也呈指数级增长,需要更多的处理资源。由于现代数据平台从不同的、多样化的系统中收集数据,很容易出现重复记录、错过更新等数据收集问题。为了解决这些问题,我们对数据平台进行了重新评估,并意识到架构债务随着时间的推移积累会导致大多数数据问题。我们数据平台的所有主要功能——提取、转换和存储都存在问题,导致整个数据平台存在质量问题。 现有数据平台 印尼医疗龙头企业Halodoc的数据平台转型之路:数据平台V1.0 在过去几年中为我们提供了很好的服务,但它的扩展性满足不了不断增长的业务需求。

02
领券