我对气流很陌生。我已经阅读了几次文档,在网上讨论了无数的S/O问题和许多随意的文章,但还没有解决这个问题。我有种感觉我做错了一件非常简单的事。我有Docker,我提取了puckel/docker-airflow映像并运行了一个端口公开的容器,这样我就可以从我的主机上点击UI了。我有另一个运行mcr.microsoft.com/mssql/server的容器,我在它上还原了WideWorldImporters示例db。从气流UI中,我能够成功地创建到这个db的连接,甚至可以从数据分析部分查询它。查看下面的图片:连接创建 成功查询连接
因此,当这起作用时,我的进程在第二个任务sqlData上失败了。以下是代码:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import timedelta, datetime
copyData = DAG(
dag_id='copyData',
schedule_interval='@once',
start_date=datetime(2019,1,1)
)
printHelloBash = BashOperator(
task_id = "print_hello_Bash",
bash_command = 'echo "Lets copy some data"',
dag = copyData
)
mssqlConnection = "WWI"
sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",
task_id="select_some_data",
mssql_conn_id=mssqlConnection,
database="WideWorldImporters",
dag = copyData,
depends_on_past=True
)
queryDataSuccess = BashOperator(
task_id = "confirm_data_queried",
bash_command = 'echo "We queried data!"',
dag = copyData
)
printHelloBash >> sqlData >> queryDataSuccess最初的错误是:
*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3
[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet
_fernet = Fernet(fernet_key.encode('utf-8'))
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 34, in __init__
key = base64.urlsafe_b64decode(key)
File "/usr/local/lib/python3.6/base64.py", line 133, in urlsafe_b64decode
return b64decode(s)
File "/usr/local/lib/python3.6/base64.py", line 87, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding*我注意到这与密码学有关,于是我开始运行pip install cryptography和pip install airflow[crytpo],它们都返回了完全相同的结果,告诉我需求已经满足了。最后,我发现了一些东西,上面写着我只需要生成一个fernet_key。我的airflow.cfg文件中的默认键是fernet_key = $FERNET_KEY。因此,从容器中的cli,我运行了:
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"得到了一个我用$FERNET_KEY替换的代码。我重新启动了容器并重新运行了这个守护进程,现在我的错误是:
[2019-02-22 16:22:13,641] {{models.py:1760}} ERROR -
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 106, in _verify_signature
h.verify(data[-32:])
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/primitives/hmac.py", line 69, in verify
ctx.verify(signature)
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 73, in verify
raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.从最初的密码文档扫描中,哪一个与兼容性有关?
我现在迷路了,我决定问这个问题,看看我在解决这个问题时是否走错了路。任何帮助都是非常感谢的,因为气流看起来很棒。
发布于 2019-02-25 05:32:41
多亏了@Tomasz的一些交流,我的DAG终于开始工作了。他建议我尝试使用对接组合,这也是在puckel/对接气流github回购中列出的。最后,我使用了docker-复合-LocalExecutor.yml文件,而不是芹菜执行器。有一些小的疑难解答和更多的配置,我也必须通过。首先,我使用了包含示例db的现有MSSQL容器,并使用docker commit mssql_container_name将其转换为图像。我这样做的唯一原因是为了节省恢复备份样例dbs的时间;如果需要,始终可以将备份复制到容器中,并在以后恢复备份。然后,我将我的新映像添加到现有的docker-Compost-LocalExecutor.yml文件中,如下所示:
version: '2.1'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
mssql:
image: dw:latest
ports:
- "1433:1433"
webserver:
image: puckel/docker-airflow:1.10.2
restart: always
depends_on:
- postgres
- mssql
environment:
- LOAD_EX=n
- EXECUTOR=Local
#volumes:
#- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3请注意,我将dw命名为基于mssql容器的新映像。接下来,我将文件重命名为docker-compose.yml,以便能够轻松地运行docker-compose up (不确定是否有命令直接指向不同的YAML文件)。一旦一切都启动和运行,我导航到气流UI和配置我的连接。注意:由于您使用的是坞-组合,您不需要知道其他容器的IP地址,因为它们使用的是DNS服务发现,我在这里中发现了这一点。然后,为了测试连接,我进行了数据分析,以执行一个即席查询,但连接并不存在。这是因为puckel/docker气流映像没有安装pymssql。因此,只需将其插入容器docker exec -it airflow_webserver_container bash并安装pip install pymssql --user即可。退出容器并使用docker-compose restart重新启动所有服务。过了一分钟,一切都开始运转了。我的连接出现在Ad查询中,我可以成功地选择数据。最后,我打开了我的DAG,调度器拿起它,一切都成功了!经过数周的谷歌搜索后,超级松了一口气。感谢@y2k-shubham的帮助,以及对@Tomasz的高度赞赏,他在r/datascience subreddit上发表了一篇关于气流的精彩而详尽的文章,我一开始接触过他。
https://stackoverflow.com/questions/54831314
复制相似问题