我正在尝试安装一个名为Apache Airflow的程序。 这些说明说明了如何指定主文件夹 # airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install apache-airflow
# initialize the database
airflow initdb ht
我是新的气流,我试图应用DAG运行一个ETL脚本通过BashOperator。当新的数据出现时,这样的ETL脚本会更新熊猫的数据,并且输出是一个更新的.csv文件。
在Airflow webserver中,任务已经成功完成,但是没有生成.csv文件(看起来BashOperator并没有实际执行python脚本)。
你能帮我弄清楚发生了什么事吗?非常感谢!
下面是我的脚本和日志消息:
from airflow.operators.bash import BashOperator
from airflow.models import DAG
from airflow.operators.bash_
我尝试在Google Cloud Dataflow中运行Apache光束管道(Python),这是由Google Cloud Coomposer中的DAG触发的。 我的dags文件夹在各自的GCS存储桶中的结构如下: /dags/
dataflow.py <- DAG
dataflow/
pipeline.py <- pipeline
setup.py
my_modules/
__init__.py
commons.py <- the module I want to import in the pipeline se
我正在使用Airflow的HdfsSensor来检测hdfs目录。我们有kerberised集群。我的代码一直插入目录,没有检测到,如下所示 [2020-08-25 13:57:19,808] {hdfs_sensor.py:100} INFO - Poking for file /tmp/ayush/hive/sensor/event_date=2020-08-25
[2020-08-25 13:58:19,871] {hdfs_sensor.py:100} INFO - Poking for file /tmp/ayush/hive/sensor/event_date=2020-08-2
我试图在一个只有一个文件的文件夹中获取文件名。
FYI:$FOLDER_TMP中包含一个空格,这就是我使用printf的原因
function nameofkeyfile(){
FOLDER_TMP="${PWD%/*/*}/folder/"
FOLDER=$(printf %q "${FOLDER_TMP}")
FILENAME=ls "$FOLDER" # Error: No such file or directory
# or this: FILENAME=$(ls "$FOLDER") #
我第一次用postgreSQL安装了Airflow。但是,在web上激活DAG后,将同时创建大约16个DAG,如下图所示。enter image description here 作为测试,我制作了一个dag,它每秒向数据库中输入当前日期10秒一次。如果需要,它应该每秒生成一行,但上面的问题每秒创建多行。enter image description here 下面是DAG代码。 import pendulum
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operat
我试着运行airflow standalone,但是我得到了AirflowConfigException( airflow.exceptions.AirflowConfigException: Cannot use relative path: sqlite:///C:\Users\admin/airflow/airflow.db to connect to sqlite. Please use absolute path such as sqlite:////tmp/airflow.db
我试过使用set AIRFLOW_HOME=~/airflow,但它似乎不起作用。如果这样做有任何不同,
我想在我的GCP编写器环境中使用GCSToSFTPOperator,我们在GCP编写器环境中有ariflow版本1.10.3,composer-1.8.3-airflow-1.10.3(我已经从1.10.2升级到1.10.3)。GCSToSFTPOperator出现在最新版本的Airflow中。请参阅下面的参考- https://airflow.readthedocs.io/en/latest/_modules/airflow/operators/gcs_to_sftp.html 我也尝试了插件,我复制了插件文件夹中的GCSToSFTPOperator类源代码,然后导入到我的python D
首先,我对码头,气流和堆叠溢流相当陌生。
我有一个在Ubuntu (20.04.3) VM上的Docker中运行的气流实例。
我正在尝试将Openpyxl安装在build上,以便将它用作pd.read_excel的引擎。
下面是带有install命令的Dockerfile:
FROM apache/airflow:2.2.4
ENV AIRFLOW_HOME=/opt/airflow
USER root
RUN apt-get update -qq && apt-get install vim -qqq
COPY requirements.txt .
RUN pip in
我正在运行空气流量2.1.4使用码头-合成和芹菜执行器。到目前为止,我已经能够从芹菜工作容器启动和运行简单的DockerOperator任务,但是现在当我尝试将一个目录从共享驱动器挂载到任务容器时,我得到了一个错误(下面的日志文件)。如果我不定义挂载参数,Dag工作得很好。因此,我猜有些信息或特权不会传递给由芹菜工人容器创建的容器。有什么建议吗?接下来要尝试什么?
DAG档案:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetim
我试图在气流中运行一个简单的dag来执行python文件,它正在抛出错误,无法打开文件‘/User/.’。
下面是我正在使用的脚本。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime,ti
我正在尝试将我们的气流升级到1.10.0。当我这样做时,我会收到一个错误,抱怨它无法连接到mysql:
worker_1 | sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (2002, 'Can\'t connect to local MySQL server through socket \'/var/run/mysqld/mysqld.sock\' (2 "No such file or directory")') (Backgrou
我正在尝试设置气流,使用LocalExecutor作为后端MySQL。但在airflow initdb期间我犯了错误
FutureWarning
/home/ubuntu/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:631: DeprecationWarning: Specifying both AIRFLOW_HOME environment variable and airflow_home in the config file is deprecated. Please use
我试图使用码头操作员自动执行一些脚本使用气流。
气流版本:apache-airflow==1.10.12
我想要做的是“复制”我的项目的所有文件(文件夹和文件)到容器使用这段代码。
以下文件ml-intermediate.py位于此目录~/airflow/dags/ml-intermediate.py中
"""
Template to convert a Ploomber DAG to Airflow
"""
from airflow import DAG
from airflow.operators.bash_operator import B
我使用root帐户在集群上安装了Airflow。我知道这是不好的练习,但它只是测试环境。我创建了一个简单的DAG:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
dag = DAG('create_directory', description='simple create directory workflow', start_date=datetime(2017,
我的project中有这样的文件夹树
dagspython_scriptslibrariesdocker-compose.ymlDockerfiledocker_resources 项目
我在码头集装箱中创建一个气流服务:
dockerfile
#Base image
FROM puckel/docker-airflow:1.10.1
#Impersonate
USER root
#Los automatically thrown to the I/O strem and not buffered.
ENV PYTHONUNBUFFERED 1
ENV AIRFLOW_HOME=/
在DAG中,我使用的是一个DockerOperator,在其中我需要挂载一个临时目录来存储一些数据。容器必须为这个临时目录在主机上使用特定的路径,所以我尝试使用DockerOperator的“DockerOperator”参数,但这是行不通的。
考虑以下DAG示例:
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime
with DAG(dag_id="test_v1",
我有一个简单的气流工作流程,由两个任务组成。用户可以下载包含股票数据的csv文件。另一个提取最高股票价格,并将数据写入另一个文件。
如果我运行第一个任务,然后运行第二个任务,则一切正常,相反,如果运行execute: airflow run stocks_d get_max_share,则无法满足依赖项。
import csv
from datetime import datetime
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import Pyth