首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Airflow Dag Run停止中期运行

Airflow Dag Run停止中期运行
EN

Stack Overflow用户
提问于 2019-11-17 14:45:17
回答 1查看 302关注 0票数 0

我使用python 3.6.8在ubuntu上安装了airflow v1.10.6。并编写了一个简单的练习dag,它运行一组30个并行任务。我正在通过UI触发dag,因为这将不定期运行,我正在学习。我现在在airflow.cfg中配置为LocalExecutorparallelism=4 (完整的cfg文件如下)。dag运行开始并可在UI中查看,它恰好完成了4个任务实例,然后dag运行在UI中显示为停止,所有其他下游任务将永远保持scheduledno_status。如果我重置所有内容,在airflow.cfg中增加parallelism=10,然后重新触发运行,10个任务实例运行,dag运行再次以同样的方式停止。当UI上的运行似乎停滞时,我在调度程序日志或see服务器日志中看不到任何错误,cpus看起来确实像是在运行任务。在CeleryExecutor上运行这个dag和运行airflow worker时,我得到了完全相同的行为。无论我使用的是schedule_interval=None还是timedelta(days=1),似乎都会发生这种情况。

不是说显而易见,但据我所知,“并行性”意味着当前4个任务实例完成时,其他任务实例应该能够打开。你知道为什么我会有这种行为吗?

可能是附注,也可能不是:我还安装了rabbitmq-server用于芹菜,并在同一vpc中的AWS RDS实例上运行数据库。

DAG如下:

代码语言:javascript
运行
复制
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import time
default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime(2019, 6, 1),
    'email': ['airflow@example.com','evanburgess@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('example', default_args=default_args, schedule_interval=timedelta(days=1))

def do_a_process(flname, i, j):
    time.sleep(5)

mkarr = PythonOperator(
    task_id='make_random_array',
    default_args=default_args,
    python_callable=do_a_process,
    op_args=('filepath',10000,10000),
    dag=dag)

for i in range(30):

    smo = PythonOperator(
        task_id='gaussian_%i_sigma' % i,
        default_args=default_args,
        python_callable=do_a_process,
        op_args=('fileapth', i, i),
        dag=dag
    )

    stat = PythonOperator(
        task_id='stats_%i_sigma' % i,
        default_args=default_args,
        python_callable=do_a_process,
        op_args=('fileapth', i, i),
        dag=dag
    )
    mkarr >> smo
    smo >> stat

我的完整配置文件(删除注释)也如下所示:

代码语言:javascript
运行
复制
[core]
dags_folder = /home/ubuntu/airflow/dags
base_log_folder = /home/ubuntu/airflow/logs
remote_logging = False
remote_log_conn_id =
remote_base_log_folder =
encrypt_s3_logs = False
logging_level = INFO
fab_logging_level = WARN
logging_config_class =
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(re
set)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /home/ubuntu/airflow/logs/dag_processor_manager/dag_processor_manager.log
hostname_callable = socket:getfqdn
default_timezone = utc
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:mypassword@my_url_rds.amazonaws.com:5432/airflow
sql_engine_encoding = utf-8
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
sql_alchemy_schema =
parallelism = 4
dag_concurrency = 4
dags_are_paused_at_creation = True
max_active_runs_per_dag = 1
load_examples = False
plugins_folder = /home/ubuntu/airflow/plugins
fernet_key = bighexadecimal
donot_pickle = False
dagbag_import_timeout = 30
dag_file_processor_timeout = 50
task_runner = StandardTaskRunner
default_impersonation =
security =
secure_mode = False
unit_test_mode = False
task_log_reader = task
enable_xcom_pickling = True
killed_task_cleanup_time = 60
dag_run_conf_overrides_params = False
worker_precheck = False
dag_discovery_safe_mode = True
default_task_retries = 0
[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080
[api]
auth_backend = airflow.api.auth.backend.default
[lineage]
backend =
[atlas]
sasl_enabled = False
host =
port = 21000
username =
password =
[operators]
default_owner = airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
[hive]
default_hive_mapred_queue =
[webserver]
base_url = http://localhost:8080
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_ssl_cert =
web_server_ssl_key =
web_server_master_timeout = 120
web_server_worker_timeout = 120
worker_refresh_batch_size = 1
worker_refresh_interval = 30
secret_key = temporary_key
workers = 4
worker_class = sync
access_logfile = -
error_logfile = -
expose_config = False
authenticate = False
filter_by_owner = False
owner_mode = user
dag_default_view = tree
dag_orientation = LR
demo_mode = False
log_fetch_timeout_sec = 5
hide_paused_dags_by_default = False
page_size = 100
rbac = False
navbar_color = #007A87
default_dag_run_display_number = 25
enable_proxy_fix = False
cookie_secure = False
cookie_samesite =
default_wrap = False
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
smtp_port = 25
smtp_mail_from = airflow@example.com
[sentry]
sentry_dsn =
[celery]
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 16
worker_log_server_port = 8793
broker_url = pyamqp://test:test@localhost:5672/
result_backend = db+postgresql://airflow:snowSNOWsnowSNOW@processing-1.cdpofxkpfulr.us-west-2.rds.amazonaws.com:5432/airflow
flower_host = 0.0.0.0
flower_url_prefix =
flower_port = 5555
flower_basic_auth =
default_queue = default
sync_parallelism = 0
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
[celery_broker_transport_options]
[dask]
cluster_address = 127.0.0.1:8786
tls_ca =
tls_cert =
tls_key =
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
run_duration = -1
num_runs = -1
processor_poll_interval = 1
min_file_process_interval = 0
dag_dir_list_interval = 300
print_stats_interval = 30
scheduler_health_check_threshold = 30
child_process_log_directory = /home/ubuntu/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
catchup_by_default = True
max_tis_per_query = 512
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
statsd_allow_list =
max_threads = 2
authenticate = False
use_job_schedule = True
[ldap]
uri =
user_filter = objectClass=*
user_name_attr = uid
group_member_attr = memberOf
superuser_filter =
data_profiler_filter =
bind_user = cn=Manager,dc=example,dc=com
bind_password = insecure
basedn = dc=example,dc=com
cacert = /etc/ca/ldap_ca.crt
search_scope = LEVEL
ignore_malformed_schema = False
[mesos]
master = localhost:5050
framework_name = Airflow
task_cpu = 1
task_memory = 256
checkpoint = False
authenticate = False
[kerberos]
ccache = /tmp/airflow_krb5_ccache
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab
[github_enterprise]
api_rev = v3
[admin]
hide_sensitive_variable_fields = True
[elasticsearch]
host =
log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
end_of_log_mark = end_of_log
frontend =
write_stdout = False
json_format = False
json_fields = asctime, filename, lineno, levelname, message
[elasticsearch_configs]
use_ssl = False
verify_certs = True
[kubernetes]
worker_container_repository =
worker_container_tag =
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = True
worker_pods_creation_batch_size = 1
namespace = default
airflow_configmap =
dags_in_image = False
dags_volume_subpath =
dags_volume_claim =
logs_volume_subpath =
logs_volume_claim =
dags_volume_host =
logs_volume_host =
env_from_configmap_ref =
env_from_secret_ref =
git_repo =
git_branch =
git_subpath =
git_user =
git_password =
git_sync_root = /git
git_sync_dest = repo
git_dags_folder_mount_point =
git_ssh_key_secret_name =
git_ssh_known_hosts_configmap_name =
git_sync_credentials_secret =
git_sync_container_repository = k8s.gcr.io/git-sync
git_sync_container_tag = v3.1.1
git_sync_init_container_name = git-sync-clone
git_sync_run_as_user = 65533
worker_service_account_name =
image_pull_secrets =
gcp_service_account_keys =
in_cluster = True
affinity =
tolerations =
kube_client_request_args = {"_request_timeout" : [60,60] }
run_as_user =
fs_group =
[kubernetes_node_selectors]
[kubernetes_annotations]
[kubernetes_environment_variables]
[kubernetes_secrets]
[kubernetes_labels]

以下是我的要求:

代码语言:javascript
运行
复制
alembic==1.3.0
amqp==2.5.2
apache-airflow==1.10.6
apispec==3.1.0
argcomplete==1.10.0
asn1crypto==0.24.0
atomicwrites==1.3.0
attrs==19.3.0
Automat==0.6.0
Babel==2.7.0
bcrypt==3.1.7
billiard==3.6.1.0
blinker==1.4
boto3==1.7.84
botocore==1.10.84
cached-property==1.5.1
cachetools==3.1.1
celery==4.3.0
certifi==2019.9.11
cffi==1.13.2
chardet==3.0.4
Click==7.0
cloud-init==19.2
colorama==0.4.1
colorlog==4.0.2
command-not-found==0.3
configobj==5.0.6
configparser==3.5.3
constantly==15.1.0
croniter==0.3.30
cryptography==2.8
defusedxml==0.6.0
dill==0.3.1.1
distro-info===0.18ubuntu0.18.04.1
dnspython==1.16.0
docutils==0.15.2
ec2-hibinit-agent==1.0.0
eventlet==0.25.1
Flask==1.1.1
Flask-Admin==1.5.3
Flask-AppBuilder==1.13.1
Flask-Babel==0.12.2
Flask-Bcrypt==0.7.1
Flask-Caching==1.3.3
Flask-JWT-Extended==3.24.1
Flask-Login==0.4.1
Flask-OpenID==1.2.5
Flask-SQLAlchemy==2.4.1
flask-swagger==0.2.13
Flask-WTF==0.14.2
flower==0.9.3
funcsigs==1.0.0
future==0.16.0
gevent==1.4.0
google-api-core==1.14.3
google-api-python-client==1.7.11
google-auth==1.7.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.1
google-cloud-bigquery==1.21.0
google-cloud-bigtable==0.33.0
google-cloud-container==0.3.0
google-cloud-core==1.0.3
google-cloud-dlp==0.12.1
google-cloud-language==1.3.0
google-cloud-spanner==1.9.0
google-cloud-speech==1.2.0
google-cloud-storage==1.22.0
google-cloud-texttospeech==0.5.0
google-cloud-translate==2.0.0
google-cloud-videointelligence==1.11.0
google-cloud-vision==0.39.0
google-resumable-media==0.4.1
googleapis-common-protos==1.6.0
graphviz==0.13.2
greenlet==0.4.15
grpc-google-iam-v1==0.11.4
grpcio==1.25.0
grpcio-gcp==0.2.2
gunicorn==19.9.0
hibagent==1.0.1
httplib2==0.14.0
hyperlink==17.3.1
idna==2.8
importlib-metadata==0.23
incremental==16.10.1
iso8601==0.1.12
itsdangerous==1.1.0
Jinja2==2.10.3
jmespath==0.9.4
json-merge-patch==0.2
jsonpatch==1.16
jsonpointer==1.10
jsonschema==3.1.1
keyring==10.6.0
keyrings.alt==3.0
kombu==4.6.6
language-selector==0.1
lazy-object-proxy==1.4.3
librabbitmq==2.0.0
lockfile==0.12.2
Mako==1.1.0
Markdown==2.6.11
MarkupSafe==1.1.1
marshmallow==2.19.5
marshmallow-enum==1.5.1
marshmallow-sqlalchemy==0.18.0
monotonic==1.5
more-itertools==7.2.0
netifaces==0.10.4
numpy==1.17.4
oauthlib==3.1.0
ordereddict==1.1
packaging==19.2
PAM==0.4.2
pandas==0.25.3
pandas-gbq==0.11.0
paramiko==2.6.0
pendulum==1.4.4
pika==0.13.0
pluggy==0.13.0
prison==0.1.0
protobuf==3.10.0
psutil==5.6.5
psycopg2==2.7.7
py==1.8.0
PyAMQP==0.0.8.5
pyasn1==0.4.7
pyasn1-modules==0.2.7
pycparser==2.19
pycrypto==2.6.1
pydata-google-auth==0.1.3
Pygments==2.4.2
pygobject==3.26.1
PyJWT==1.7.1
PyNaCl==1.3.0
pyOpenSSL==19.0.0
pyparsing==2.4.5
pyrsistent==0.15.5
pyserial==3.4
pysftp==0.2.9
pytest==5.2.2
python-apt==1.6.4
python-daemon==2.1.2
python-dateutil==2.8.1
python-debian==0.1.32
python-editor==1.0.4
python3-openid==3.1.0
pytz==2019.3
pytzdata==2019.3
pyxdg==0.25
PyYAML==5.1.2
requests==2.22.0
requests-oauthlib==1.3.0
requests-unixsocket==0.1.5
rsa==4.0
s3transfer==0.1.13
scipy==1.3.2
SecretStorage==2.3.1
service-identity==16.0.0
setproctitle==1.1.10
six==1.13.0
SQLAlchemy==1.3.10
ssh-import-id==5.7
sshtunnel==0.1.5
systemd-python==234
tabulate==0.8.5
tenacity==4.12.0
termcolor==1.1.0
text-unidecode==1.2
thrift==0.11.0
tornado==5.1.1
Twisted==17.9.0
tzlocal==1.5.1
ufw==0.36
unattended-upgrades==0.1
unicodecsv==0.14.1
uritemplate==3.0.0
urllib3==1.25.6
vine==1.3.0
wcwidth==0.1.7
Werkzeug==0.16.0
WTForms==2.2.1
zipp==0.6.0
zope.deprecation==4.4.0
zope.interface==4.3.2
EN

回答 1

Stack Overflow用户

发布于 2019-11-20 14:02:34

我所能说的就是,如果你遇到这个问题,请重新安装。我使用pip3安装了除rabbitmq-server之外的所有服务器,因此我在ubuntu上使用apt安装。水蟒给了我很多麻烦。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58898177

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档