前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Celery消息队列路由设置

Celery消息队列路由设置

作者头像
用户8644430
发布2022-04-02 16:29:10
1.4K0
发布2022-04-02 16:29:10
举报
文章被收录于专栏:celerycelery

celery路由配置后,启动服务之后执行当前配置路由任务

最简单的路由方式是使用 task_create_missing_queues 设置 (默认是开启的)。

这个设置开启后, 一个在task_queues中还未定义的命名队列会被自动创建。这让简单的路由任务变得很容易。 假如你有两台服务器x 和 y,来处理常规(regular)任务,一个服务器z只处理feed相关的任务。你可以使用这样的配置:

代码语言:shell
复制
task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}

这个路由启用( enabled)后import_feed任务会被路由给“feeds”队列, 并且所有其他任务会被路由到默认队列(由于历史原因这个队列叫“celery”)。

除此以外,你可以使用全局模式匹配,或者正则表达式,匹配所有在feed.tasks 命名空间中的任务:

代码语言:shell
复制
app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}

如果匹配模式的顺序很重要,你应该在条目列表中指定路由器:

代码语言:javascript
复制
task_routes = ([
(‘feed.tasks.*’, {‘queue’: ‘feeds’}),
(‘web.tasks.*’, {‘queue’: ‘web’}),
(re.compile(r’(video|image).tasks..*’), {‘queue’: ‘media’}),
],)
代码语言:text
复制
注意
task_routes设置既可以是一个字典,也可是路由器对象的列表,所以在这种情况下我们需要指定设置为一个包含列表的元组(tuple)。

配置好路由器后,你可以开启服务器z来只处理feeds队列:

代码语言:ruby
复制
user@z:/$ celery -A proj worker -Q feeds

你可以指定很多队列,所以你也可以让这个服务器处理默认队列:

代码语言:ruby
复制
user@z:/$ celery -A proj worker -Q feeds,celery
改变默认队列的名字

你可以通过下面的配置改变默认队列的名字:

代码语言:text
复制
app.conf.task_default_queue = 'default'
队列是怎么被定义的

这样处理是对用户隐藏了AMQP协议的复杂性,而只提供基本的需求。然而你可能仍然对于队列的声明感兴趣。 使用下面的设置,一个名为“video” 的队列会被创建:

代码语言:shell
复制
{'exchange': 'video',
 'exchange_type': 'direct',
 'routing_key': 'video'}

像Redis 或 SQS这样的非AMQP的后端,不支持交易所(exchange),所以他们需要exchange同队列有相同的名字。使用这个设计可以确保对他们也有效。

手动路由

假如你有两台服务器x 和 y,来处理常规(regular)任务,一个服务器z只处理feed相关的任务,你可以使用这样的配置:

代码语言:text
复制
from kombu import Queue

app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
task_default_exchange = 'tasks'
task_default_exchange_type = 'topic'
task_default_routing_key = 'task.default'

task_queues 是Queue 实例的列表.。如果你没有给一个键设置exchange或者exchange类型,这些会从task_default_exchange和task_default_exchange_type设置中取。 为了路由任务到feed_tasks 队列,你可以在task_routes 设置中添加一个容器:

代码语言:javascript
复制
task_routes = {
‘feeds.tasks.import_feed’: {
‘queue’: ‘feed_tasks’,
‘routing_key’: ‘feed.import’,
},
}

你也可以给Task.apply_async()或者send_task()使用routing_key参数覆盖这个配置:

代码语言:text
复制
>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
...                         queue='feed_tasks',
...                         routing_key='feed.import')

为了让服务器z从feed队列消费,你可以使用celery worker -Q 选项启动它:

代码语言:ruby
复制
user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h

服务器 x 和 y 必须被配置成从默认队列消费:

代码语言:ruby
复制
user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h

如果你想,甚至可以让你的处理feed的工人(worker)也处理正规(regular)任务:

代码语言:ruby
复制
user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h

如果你有另一个队列但想要绑定在另一个交易所,你可以指定一个自定义交易所和交易类型:

代码语言:text
复制
from kombu import Exchange, Queue
app.conf.task_queues = (
    Queue('feed_tasks',    routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
                           routing_key='image.compress'),
)

路由任务

定义队列

在Celery中可用的队列通过task_queues设置被定义。 这有一个队列配置的例子,它配置了三个队列;一个给video,一个给images还有一个给别的东西的默认队列:

代码语言:text
复制
default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')

app.conf.task_queues = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'

这里面, task_default_queue 会被用来路由那些没有明确路由的任务。 默认的交易所,交易类型和路由键会作为任务的默认路由值,并且作为task_queues中登记注册(entries)的默认值。

也支持多重绑定到单一队列。下面是两个路由键都绑定到相同队列的例子:

代码语言:text
复制
from kombu import Exchange, Queue, binding

media_exchange = Exchange('media', type='direct')

CELERY_QUEUES = (
    Queue('media', [
        binding(media_exchange, routing_key='media.video'),
        binding(media_exchange, routing_key='media.image'),
    ]),
)
指定任务终点Specifying task destination

一个任务的终点由下面来决定(按次序): 在task_routes中定义的Routers。 The Routers defined in task_routes. 给Task.apply_async()的路由参数。 The routing arguments to Task.apply_async(). 定义在Task自身中的路由相关的属性。 Routing related attributes defined on the Task itself. 一般最好不要硬编码这些设置,而是通过使用Routers把那个作为配置选项。这是最灵活的途径,但明确合理的默认值仍然可以被设置为任务属性。

路由器Routers

路由器是一个为任务决定路由选项的的函数。A router is a function that decides the routing options for a task.

定义一个新路由器时,你所要做的就是定义一个函数,带有签名 (name, args, kwargs, options, task=None, **kw):

代码语言:python
复制
def route_task(name, args, kwargs, options, task=None, **kw):
        if name == 'myapp.tasks.compress_video':
            return {'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}

如果你返回了队列的键,它会用task_queues中的那个队列已定义的设置进行扩展:

代码语言:shell
复制
{'queue': 'video', 'routing_key': 'video.compress'}

变成 –>

代码语言:shell
复制
{'queue': 'video',
 'exchange': 'video',
 'exchange_type': 'topic',
 'routing_key': 'video.compress'}

通过把他们添加到task_routes设置中来配置路由器类:

代码语言:text
复制
task_routes = (route_task,)

Router 函数也可以通过名字添加:

代码语言:shell
复制
task_routes = ('myapp.routers.route_task',)

对于简单的任务名For simple task name -> 路由映射就像上面的路由器例子,你可以简单的把一个字典放到task_routes中,获取同样的表现:

代码语言:shell
复制
task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
    },
}

路由器们(routers)会被按次序探查,在第一个返回真值的路由器处停止,选择它作为任务的最终路由。

你也可以在一个序列中定义多个路由器:

代码语言:javascript
复制

task_routes = [
route_task,
{
‘myapp.tasks.compress_video’: {
‘queue’: ‘video’,
‘routing_key’: ‘video.compress’,
},
]

路由器会被按次序访问,第一个返回值的会被选中。

广播Broadcast

Celery 也支持广播路由。这有一个 broadcast_tasks交易所的例子,它会传递任务的拷贝给所有连接到它的工人(workers)。:

代码语言:python
复制
from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
    'tasks.reload_cache': {
        'queue': 'broadcast_tasks',
        'exchange': 'broadcast_tasks'
    }
}

现在tasks.reload_cache任务会被发送给从这个队列中消费的每一个工人。 这个广播路由的另一个例子,这次有一个celery定时器:

代码语言:python
复制
from kombu.common import Broadcast
from celery.schedules import crontab

app.conf.task_queues = (Broadcast('broadcast_tasks'),)

app.conf.beat_schedule = {
    'test-task': {
        'task': 'tasks.reload_cache',
        'schedule': crontab(minute=0, hour='*/3'),
        'options': {'exchange': 'broadcast_tasks'}
    },
}

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 改变默认队列的名字
  • 队列是怎么被定义的
  • 手动路由
  • 路由任务
    • 定义队列
      • 指定任务终点Specifying task destination
        • 路由器Routers
          • 广播Broadcast
          相关产品与服务
          云数据库 Redis
          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档