我在gcp中设置了一个composer环境,它正在运行一个DAG,如下所示 with DAG('sample-dataproc-dag',
default_args=DEFAULT_DAG_ARGS,
schedule_interval=None) as dag: # Here we are using dag as context
# Submit the PySpark job.
submit_pyspark = DataProcPySparkOperator(
task_id='run_dataproc_pyspark'
我正在尝试为DAG编写脚本。我已经运行了docker-airflow,并且通过localhost在浏览器上运行良好。但是,每当我尝试在vscode中打开一个dag文件时,它都会显示导入错误。 我已经安装和设置气流使用docker和激活docker使用芹菜执行器。 Docker Container for airflow正在运行。 from airflow import DAG
from airflow.operators.bash_operator import BashOperator ?
我按照说明初始化breeze环境:
镜像似乎已经构建好了,但是在我的环境中start.Something失败了?
docker 20.10.9的好版本。Python版本: 3.8
后台: mysql
不需要重新构建镜像:没有更改任何重要文件
Use CI image.
Branch name: main
Docker image: ghcr.io/apache/airflow/main/ci
我从他的官方网站下载了气流的docker-compose.yaml,并将其放入我的文件夹中,运行了运行良好的sudo docker compose up airflow-init。当我运行sudo docker compose up来运行服务器并试图访问localhost:8080时,我做不到。这是我运行sudo docker compose up时的日志
WARN[0000] The "AIRFLOW_UID" variable is not set. Defaulting to a blank string.
WARN[0000] The "AIRFLOW_UID
我在码头使用气流和PostgreSQL。
因此,我在端口5433上建立了一个PostgreSQL数据库。集装箱(384 Ea7b6efb)。这是我有我的数据,我想拿我的dag在气流中。
码头工人ps
CONTAINER ID IMAGE COMMAND CREATED STATUS
PORTS NAMES
384eaa7b6efb postgres "docker-entrypoint.s…
我有一个码头-合成管道与容器,用于气流和火花。我想安排一个SparkSubmitOperator作业,但是它在错误java.lang.IllegalArgumentException: Too large frame: 5211883372140375593中失败了。星火应用程序只包括创建一个火花会话(我已经注释掉了所有其他内容)。当我手动运行星火应用程序(通过进入星火容器的bash并执行星火提交)时,一切都很好!此外,当我不创建一个火花会话,而只是一个SparkContext,它工作!
这是我的船坞-复合。Here:
version: '3'
x-airflow-com
从官方网站下载了对接者-撰写:
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.0/docker-compose.yaml'
执行命令:
docker-compse up airflow-init
然后
docker-compose up -d
所有码头集装箱都是健康的:
Name Command State
Ports
问题
在使用DockerOperator、xcom_all=True和auto_remove=True运行STDOUT时,任务会引发一个错误,就好像容器在读取STDOUT之前被删除了一样。
示例
以下列DAG为例:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.python_operator import PythonOperator
#
我有一个简单的示例python脚本,我希望Airflow在计划的基础上运行。我可以为Airflow创建容器,并在该容器中成功安装Docker服务器。我可以手动访问CLI并"docker run“示例的容器,它就可以工作了。当我从本地主机管理员尝试它时,它只显示“运行”永远。在CLI中检查"docker ps“显示它还没有启动容器。我确信我错过了一些简单的东西,因为这里没有太多复杂的东西。请帮帮忙!?
以下是python脚本:
import time as ti
def main():
print('TEST')
ti.sleep(120)
我使用CLI创建了一个自定义包(使用Click构建)。这个包可以做两件事:运行预处理和运行机器学习模型。我创建了此客户包的Docker镜像,并将其推送到AWS (ECR)上的私有注册表。 现在我想用Airflow运行这个容器,我想在一个EC2实例上运行它。我正在用docker-compose运行它。 在本例中,我将只关注一项任务:运行容器进行预处理。 然而,现在我得到了‘上游失败’的t2。 from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_oper
我有一个pyspark脚本,它现在工作得很好,我想做的是,我想为每一分钟安排该作业,为此,我使用了Apache Airflow,我为airflow创建了一个.py文件,如下所示: from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import os
from builtins import range
import airflow
from airflow.models import DAG
from
我正在尝试使用docker和airflow提供的docker-compose文件在我的机器上设置airflow:https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
我已经用docker-compose设置了气流,如下所述。https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html 还有一个必须执行docker命令的气流任务,比如 BashOperator(
task_id='my',
bash_command="""
docker run ..............
""",
dag=dag,
) 这意味着Docker包需要in the airflow do
我试图通过Airflow运行一个码头容器,但得到了Permission Denied错误。我看过一些相关的帖子,一些人似乎已经通过sudo chmod 777 /var/run/docker.sock解决了这个问题,这充其量是一个有问题的解决方案,但它仍然不适用于我(即使在重启docker之后。如果有人设法解决了这个问题,请让我知道!
这是我的DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.docker_operator import DockerOpera
我的程序无法在运行apache airflow的docker容器中创建SSH隧道。只有在我的本地机器上运行函数才能正常工作。我有一个服务器列表,用于创建隧道、查询数据库和关闭连接。通常,我会这样做: for server in servers:
server_conn = sshtunnel.SSHTunnelForwarder(
server,
ssh_username=ssh_user,
ssh_password=ssh_password,
remote_bind_address=(localhost, db_por