主要步骤
1) Django项目的settings模块配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | INSTALLED_APPS = [ 'suit', # 添加suit支持 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'djcelery', # 添加djcelery 'frame', 'rest_framework', 'monitor_agent', 'monitor_master', 'webpage', ] |
---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | # celery import djcelery djcelery.setup_loader() CELERY_IMPORTS = ('monitor_agent.my_celery.tasks', ) # 任务定义所在的模块 CELERY_TIMEZONE = TIME_ZONE BROKER_URL = 'amqp://guest:guest@10.10.83.162:5672/' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 使用了django-celery默认的数据库调度模型,任务执行周期都被存在你指定的orm数据库中 CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_TASK_RESULT_EXPIRES = 1200 # celery任务执行结果的超时时间,我的任务都不需要返回结果,只需要正确执行就行 CELERYD_CONCURRENCY = 10 # celery worker的并发数 也是命令行-c指定的数目,事实上实践发现并不是worker也多越好,保证任务不堆积,加上一定新增任务的预留就可以 CELERYD_PREFETCH_MULTIPLIER = 4 # celery worker 每次去rabbitmq取任务的数量,我这里预取了4个慢慢执行,因为任务有长有短没有预取太多 CELERYD_MAX_TASKS_PER_CHILD = 200 # 每个worker执行了多少任务就会死掉 CELERY_DEFAULT_QUEUE = "default_wj" # 默认的队列,如果一个消息不符合其他的队列就会放在默认队列里面 CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_QUEUES = { "default_wj": { # 这是上面指定的默认队列 "exchange": "default_wj", "exchange_type": "direct", "routing_key": "default_wj" }, "topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列 "routing_key": "topictest.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "test2": { # test和test2是2个fanout队列,注意他们的exchange相同 "exchange": "broadcast_tasks", "exchange_type": "fanout", "binding_key": "broadcast_tasks", }, "test": { "exchange": "broadcast_tasks", "exchange_type": "fanout", "binding_key": "broadcast_tasks2", }, } |
---|
CELERY_IMPORTS这个引用自己定义的任务,本文是设置在app名为monitor_agent的my_celery下的tasks.py
2)编写tasks.py文件中的函数,如下例
1 2 3 4 5 6 7 | from __future__ import absolute_import from celery import shared_task,task @shared_task() def ping_net_task(): print “ping_net_task” |
---|
3)同步数据库
python manage.py makemigrations
python manage.py migrate
在数据库中生成几张表
可以通过django的admin页面进行数据创建
启动 /usr/bin/python manage.py celerycam --loglevel=INFO
admin后台查看 woker是不是在线
当worker可以监控后,在admin后台tasks表中可以查看每次任务的执行状态
或者自己导入from djcelery import models as celery_models,通过它提供的Model Query API来操作,同平常的数据库查询一样。
4)启动
启动 python manage.py celery worker -l info
如果有定时任务的话,还需要启动心跳 python manage.py celery beat
celery worker --help 启动参数帮助命令
celery beat --help 启动参数帮助命令