前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Centos7安装Airflow2.x redis

Centos7安装Airflow2.x redis

原创
作者头像
待你如初见
修改2022-01-12 10:09:35
1.8K0
修改2022-01-12 10:09:35
举报
文章被收录于专栏:待你如初见

Centos7下Airflow(2.0.X)+celery+redis 安装

安装环境及版本

centos7

Airflow 2.0.2

Python 3.8.3

Mysql 5.7.29

redis 5.0.8

安装

数据库安装

略(自行百度)

  • 注意开启远程连接(关闭防火墙)
  • 字符集统一修改为UTF8(utf8mb4也可以)防止乱码
  • 高版本的mysql 或者Maria DB 会出现VARCHAR(5000)的报错 建议低版本
    • 原因是高版本的数据库为了效率限制了VARCHER的最大长度
    • postgresql还没有试以后补充
  • python安装略(自行百度)
  • 请将python加入环境变量(方便)

airflow安装

代码语言:txt
复制
vim ~/.bashrc
# 添加一行环境变量
export AIRFLOW_HOME=/opt/airflow
source ~/.bashrc
  • 安装airflow及相关组件此环境变量仅需要设置成临时变量即可用来临时启动worker测试 并不需要配置成永久变量
代码语言:txt
复制
export SLUGIFY_USES_TEXT_UNIDECODE=yes

安装airflow

代码语言:txt
复制
# 可能会有一些报错请忽略,如果生成了配置文件,保证AIRFLOW_HOME目录下生成了.cfg及相关文件即证明本次执行成功
# 如果配置了pytho的环境变量直接执行`airflow`命令
# 没配置在${PYTHON_HOME}/lib/python3.6/sit-packages/airflow/bin目录下执行`./airflow`

pip install apache-airflow

安装airflow 相关依赖

代码语言:txt
复制
pip install 'apache-airflow[mysql]'
pip install 'apache-airflow[celery]'
pip install 'apache-airflow[redis]'
pip install pymysql

配置

修改配置文件

  • 修改${AIRFLOW_HOME}/airflow.cfg
代码语言:txt
复制
# sqlalchemy链接
sql_alchemy_conn = mysql+pymysql://root:root@10.1.49.71:3306/airflow?charset=utf8
# 配置执行器
executor=CeleryExecutor
# 配置celery的broker_url
broker_url = redis://lochost:5379/0
# 配置元数据信息管理
result_backend = db+mysql://username:password@localhost:3306/airflow

创建Linux用户(worker 不允许在root用户下执行)

代码语言:txt
复制
# 创建用户组和用户
groupadd airflow 
useradd airflow -g airflow
# 将 {AIRFLOW_HOME}目录修用户组
cd /opt/
chgrp -R airflow airflow

初始化数据库

初始化前请先创建airflow数据库以免报错

代码语言:txt
复制
airflow db init

创建airflow 用户

代码语言:txt
复制
# 用于登录airflow
airflow create_user --lastname user --firstname admin --username admin --email admin_user@mail.com --role Admin --password admin

启动

代码语言:txt
复制
# 前台启动web服务
airflow webserver 

# 后台启动web服务
airflow webserver -D

# 前台启动scheduler 
airflow schedule

# 后台启动scheduler
airflow scheduler -D

启动worker

  • 方法一
代码语言:txt
复制
# worker主机只需用普通用户打开airflow worker

# 创建用户airflow

useradd airflow

# 对用户test设置密码

passwd airflow

# 在root用户下,改变airflow文件夹的权限,设为全开放

chmod -R 777 /opt/airflow

# 切换为普通用户,执行airflow worker命令就行
# 启动时发现普通用户读取的~/.bashrc文件 不一致 重新加入AIRFLOW_HOME 就可以了
# 如果在新建普通用户前配置好环境变量可能没有这个问题了  本人是在创建用户后修改了环境变量
# 使用celery执行worker
airflow celery worker 
  • 启动成功显示如下
worker.png
worker.png
  • 方法二
代码语言:txt
复制
# 执行worker之前运行临时变量(临时的不能永久使用)
export C_FORCE_ROOT="true"

# 不需要切换用户
cd /usr/local/python3/bin/

# 前台启动worker服务
airflow celery worker

# 后台启动work服务
airflow celery  worker -D

修改时区

  • 修改airflow.cfg文件
代码语言:txt
复制
default_timezone = Asia/Shanghai

配置email报警在airflow配置文件airflow.cfg中修改

  • 参考aiflow官方文档 email_backend = airflow.utils.email.send_email_smtp
  • smtp在你要设置的邮箱服务器地址在邮箱设置中查看(此处为163
    • smtp_host = smtp.163.com
  • 邮箱通讯协议
    • smtp_starttls = False
    • smtp_ssl = True
  • 你的邮箱地址
    • smtp_user = demo@163.com
  • 你的邮箱授权码在邮箱设置中查看或百度
    • smtp_password = 16位授权码
  • 邮箱服务端口
    • smtp_port = 端口
  • 你的邮箱地址smtp_mail_from = demo@163.com
  • 在dag中default_args添加参数
代码语言:txt
复制
default_args = {
    # 接受邮箱
    'email': ['demo@qq.com''],
    # task失败是否发送邮件
    'email_on_failure': True,
    # task重试是否发送邮件
    'email_on_retry': False,

}

——————————————————————————————————————————————

补充

在跑任务时发现部分任务在并行时会出现数据的异常解决方案:

airflow的全局变量中设置

  • parallelism :这是用来控制每个airflow worker 可以同时运行多少个task实例。这是airflow集群的全局变量。在airflow.cfg里面配置
  • concurrency :每个dag运行过程中最大可同时运行的task实例数。如果你没有设置这个值的话,scheduler 会从airflow.cfg里面读取默认值 dag_concurrency

在DAG中加入参数用于控制整个dag

  • max_active_runs : 来控制在同一时间可以运行的最多的dag runs 数量。 假如我们一个DAG同一时间只能被运行一次,那么一定要指明 max_active_runs = 1 如果我们DAG中有10个Task,我们如果希望10个Task可以在触发后可以同时执行,那么我们的concurrency需要不小于10才行,若小于10,那么会有任务需要等待之前的任务执行完成才会开始执行。
代码语言:txt
复制
dag = DAG(f"dag_name",
          default_args=default_args,
          schedule_interval="0 12 * * *",
          max_active_runs = 1
          )

在每个task中的Operator中设置参数

  • task_concurrency:来控制在同一时间可以运行的最多的task数量 假如task_concurrency=1一个task同一时间只能被运行一次其他task不受影响
代码语言:txt
复制
t3 = PythonOperator(
    task_id='demo_task',
    provide_context=True,
    python_callable=demo_task,
    task_concurrency=1,
    dag=dag)

补充

在使用airflow scheduler -D命令时发现无法启动会报错

报错如下:

代码语言:txt
复制
Traceback (most recent call last):
  File "/opt/anaconda3/bin/airflow", line 37, in <module>
    args.func(args)
  File "/opt/anaconda3/lib/python3.8/site-packages/airflow/utils/cli.py", line 76, in wrapper
    return f(*args, **kwargs)
  File "/opt/anaconda3/lib/python3.8/site-packages/airflow/bin/cli.py", line 1213, in scheduler
    job.run()
  File "/opt/anaconda3/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 212, in run
    session.commit()
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1036, in commit
    self.transaction.commit()
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 503, in commit
    self._prepare_impl()
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl
    self.session.flush()
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2496, in flush
    self._flush(objects)
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2637, in _flush
    transaction.rollback(_capture_exception=True)
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.raise_(
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
    raise exception
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
    flush_context.execute()
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute
    persistence.save_obj(
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 205, in save_obj
    for (
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 373, in _organize_states_for_save
    for state, dict_, mapper, connection in _connections_for_states(
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1602, in _connections_for_states
    connection = uowtransaction.transaction.connection(base_mapper)
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 313, in connection
    return self._connection_for_bind(bind, execution_options)
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 420, in _connection_for_bind
    conn = self._parent._connection_for_bind(bind, execution_options)
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 432, in _connection_for_bind
    conn = bind._contextual_connect()
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2251, in _contextual_connect
    self._wrap_pool_connect(self.pool.connect, None),
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2285, in _wrap_pool_connect
    return fn()
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 363, in connect
    return _ConnectionFairy._checkout(self)
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 804, in _checkout
    result = pool._dialect.do_ping(fairy.connection)
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 138, in do_ping
    dbapi_connection.ping(False)
  File "/opt/anaconda3/lib/python3.8/site-packages/pymysql/connections.py", line 534, in ping
    self._execute_command(COMMAND.COM_PING, "")
  File "/opt/anaconda3/lib/python3.8/site-packages/pymysql/connections.py", line 763, in _execute_command
    self._write_bytes(packet)
  File "/opt/anaconda3/lib/python3.8/site-packages/pymysql/connections.py", line 703, in _write_bytes
    self._sock.settimeout(self._write_timeout)
OSError: [Errno 9] Bad file descriptor
Exception ignored in: <function _ConnectionRecord.checkout.<locals>.<lambda> at 0x7f99dae28940>
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 503, in <lambda>
  File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 701, in _finalize_fairy
  File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1463, in error
  File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1577, in _log
  File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1587, in handle
  File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1649, in callHandlers
  File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 950, in handle
  File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1182, in emit
  File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1172, in _open
NameError: name 'open' is not defined
代码语言:txt
复制
# 这个问题,它似乎源于“池”行为。可以通过禁用连接池来绕过它: sql alchemy pool enabled = False
sql_alchemy_pool_enabled = False

如有错误欢迎指正

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Centos7下Airflow(2.0.X)+celery+redis 安装
    • 安装环境及版本
      • 安装
        • 数据库安装
        • airflow安装
    • 安装airflow
    • 安装airflow 相关依赖
      • 配置
        • 修改配置文件
        • 创建Linux用户(worker 不允许在root用户下执行)
        • 初始化数据库
    • 创建airflow 用户
      • 启动
        • 启动worker
      • 修改时区
        • 配置email报警在airflow配置文件airflow.cfg中修改
          • 补充
            • 在跑任务时发现部分任务在并行时会出现数据的异常解决方案:
          • 补充
          • 如有错误欢迎指正
          相关产品与服务
          数据库
          云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档