前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[源码分析] 消息队列 Kombu 之 Hub

[源码分析] 消息队列 Kombu 之 Hub

原创
作者头像
Java团长
修改2021-03-16 10:07:44
1.6K0
修改2021-03-16 10:07:44
举报

0x00 摘要

本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Hub 概念。

0x01 示例代码

下面使用如下代码来进行说明。

本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。

代码语言:javascript
复制
def main(arguments):
    hub = Hub()
    exchange = Exchange('asynt_exchange')
    queue = Queue('asynt_queue', exchange, 'asynt_routing_key')

    def send_message(conn):
        producer = Producer(conn)
        producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
        print('message sent')

    def on_message(message):
        print('received: {0!r}'.format(message.body))
        message.ack()
        # hub.stop()  # <-- exit after one message

    conn = Connection('redis://localhost:6379')
    conn.register_with_event_loop(hub)

    def p_message():
        print(' kombu ')

    with Consumer(conn, [queue], on_message=on_message):
        send_message(conn)
        hub.timer.call_repeatedly(3, p_message)
        hub.run_forever()

if __name__ == '__main__':
    sys.exit(main(sys.argv[1:]))

0x02 来由

前文中,Consumer部分有一句代码没有分析:

代码语言:javascript
复制
hub.run_forever()

此时,hub与Connection已经联系起来,具体如下:

具体如下图:

代码语言:javascript
复制
+----------------------+               +-------------------+
| Consumer             |               | Channel           |
|                      |               |                   |        +-----------------------------------------------------------+
|                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
|      channel  +--------------------> |                   |        +-----------------------------------------------------------+
|                      |               |    pool           |
|                      |   +---------> |                   | <------------------------------------------------------------+
|      queues          |   |           |                   |                                                              |
|                      |   |    +----> |    connection +---------------+                                                  |
|        |             |   |    |      |                   |           |                                                  |
+----------------------+   |    |      +-------------------+           |                                                  |
         |                 |    |                                      v                                                  |
         |                 |    |      +-------------------+       +---+-----------------+       +--------------------+   |
         |                 |    |      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
         |                 |    |      |                   |       |                     |       |                    |   |
         |                 |    |      |                   |       |                     |       |     _channels +--------+
         |                 |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |
         |                 |    |      |     transport +---------> |                     |       |     _chan_to_sock  |
         |       +-------->+    |      |                   |       |                     |    +------+ poller         |
         |       |              |      +-------------------+       +---------------------+    |  |     after_read     |
         |       |              |                                                             |  |                    |
         |       |              |                                                             |  +--------------------+
         |       |              |      +------------------+                   +---------------+
         |       |              |      | Hub              |                   |
         |       |              |      |                  |                   v
         |       |              |      |                  |            +------+------+
         |       |              |      |      poller +---------------> | _poll       |
         |       |              |      |                  |            |             |         +-------+
         |       |              |      |                  |            |    _poller+---------> |  poll |
         v       |              |      +------------------+            |             |         +-------+
                 |              |                                      +-------------+
    +-------------------+       |      +----------------+
    | Queue      |      |       |      | Exchange       |
    |      _chann+l     |       +----+ |                |
    |                   |              |                |
    |      exchange +----------------> |     channel    |
    |                   |              |                |
    |                   |              |                |
    +-------------------+              +----------------+

手机如下:

现在我们知道:

  • Consumers:接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。
  • Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
  • Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
  • Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接,是操作的抽象;

但是,我们只是大致知道 poll 是用来做什么的,但是不知道consumer,poll 究竟如何与Hub交互。我们本文就接着分析。

0x03 Poll一般步骤

在linux系统中,使用Poll的一般步骤如下:

  1. Create an epoll object——创建1个epoll对象;
  2. Tell the epoll object to monitor specific events on specific sockets——告诉epoll对象,在指定的socket上监听指定的事件;
  3. Ask the epoll object which sockets may have had the specified event since the last query——询问epoll对象,从上次查询以来,哪些socket发生了哪些指定的事件;
  4. Perform some action on those sockets——在这些socket上执行一些操作;
  5. Tell the epoll object to modify the list of sockets and/or events to monitor——告诉epoll对象,修改socket列表和(或)事件,并监控;
  6. Repeat steps 3 through 5 until finished——重复步骤3-5,直到完成;
  7. Destroy the epoll object——销毁epoll对象;

所以我们就需要在 Hub 代码中看看 kombu 如何使用 Poll。

0x04 建立 Hub

在建立 Hub 这里会建立 Hub 内部的 Poller。

代码语言:javascript
复制
_get_poller, eventio.py:312
poll, eventio.py:328
_create_poller, hub.py:113
__init__, hub.py:96
main, hub_receive.py:23
<module>, hub_receive.py:46

具体代码是:

代码语言:javascript
复制
def _get_poller():
    if detect_environment() != 'default':
        # greenlet
        return _select
    elif epoll:
        # Py2.6+ Linux
        return _epoll
    elif kqueue and 'netbsd' in sys.platform:
        return _kqueue
    elif xpoll:
        return _poll
    else:
        return _select

这样,在 Hub内部就建立了 poller。

代码语言:javascript
复制
class Hub:
    """Event loop object.

    Arguments:
        timer (kombu.asynchronous.Timer): Specify custom timer instance.
    """
    def __init__(self, timer=None):
        self.timer = timer if timer is not None else Timer()

        self.readers = {}
        self.writers = {}
        self.on_tick = set()
        self.on_close = set()
        self._ready = set()

        self._running = False
        self._loop = None

        self._create_poller()

    @property
    def poller(self):
        if not self._poller:
            self._create_poller()
        return self._poller

    @poller.setter
    def poller(self, value):
        self._poller = value

    def _create_poller(self):
        self._poller = poll()
        self._register_fd = self._poller.register
        self._unregister_fd = self._poller.unregister

这里需要注意的是:

在 MultiChannelPoller 之中,也会生成一个 poller,但是在注册时候,Transport 会使用 hub 的 poller,而非 MultiChannelPoller 内部的 poller

代码语言:javascript
复制
on_poll_init, redis.py:333
register_with_event_loop, redis.py:1061
register_with_event_loop, connection.py:266
main, hub_receive.py:38
<module>, hub_receive.py:46

在 kombu.transport.redis.Transport 代码如下:

代码语言:javascript
复制
def register_with_event_loop(self, connection, loop):
    cycle = self.cycle
    cycle.on_poll_init(loop.poller) # 这里赋值。
    cycle_poll_start = cycle.on_poll_start
    add_reader = loop.add_reader
    on_readable = self.on_readable   

继续深入,看到进一步赋值:

代码语言:javascript
复制
def on_poll_init(self, poller):
    self.poller = poller # 这里赋值
    for channel in self._channels:
        return channel.qos.restore_visible(
            num=channel.unacked_restore_limit,
        )

0x05 Forever in Hub

hub.run_forever() 主要作用是:

  • 建立loop
  • 因为Hub里面有Channel,有poll,所以现在就把Channel与poll联系起来,包括socket,socket的file等待。
  • 进行poll,有消息就相应处理;

比如维护如下变量:

代码语言:javascript
复制
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)

具体 run_forever 如下:

代码语言:javascript
复制
def run_forever(self):
    self._running = True
    try:
        while 1:
            try:
                self.run_once()
            except Stop:
                break
    finally:
        self._running = False

于是又有调用如下,这里就进入了loop:

代码语言:javascript
复制
def run_once(self):
    try:
        next(self.loop)
    except StopIteration:
        self._loop = None

5.1 建立loop

next(self.loop) 继续调用,建立loop。这就是Hub的作用

调用stack如下:

代码语言:javascript
复制
create_loop, hub.py:279
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55

简化版代码如下:

代码语言:javascript
复制
def create_loop(self, ...):

    while 1:
        todo = self._ready
        self._ready = set()

        for tick_callback in on_tick:
            tick_callback() # 这里回调用户方法

        for item in todo:
            if item:
                item()

        poll_timeout = fire_timers(propagate=propagate) if scheduled else 1

        if readers or writers:
            to_consolidate = []
            events = poll(poll_timeout) # 等待消息

            for fd, event in events or ():
                if fd in consolidate and \
                        writers.get(fd) is None:
                    to_consolidate.append(fd)
                    continue
                cb = cbargs = None

                if event & READ:
                    cb, cbargs = readers[fd] # 读取redis
                elif event & WRITE:
                    cb, cbargs = writers[fd] # 处理redis

                if isinstance(cb, generator):
                    try:
                        next(cb) 
                else:
                    cb(*cbargs) # 调用用户代码
            if to_consolidate:
                consolidate_callback(to_consolidate)
        else:
            # no sockets yet, startup is probably not done.
            sleep(min(poll_timeout, 0.1))
        yield

下面我们逐步分析。

0x06 启动Poll

循环最开始将启动 Poll。 tick_callback 的作用就是启动 Poll。就是建立一个机制,当 redis 有消息时候,得到通知

代码语言:javascript
复制
while 1:
    todo = self._ready
    self._ready = set()

    for tick_callback in on_tick:
        tick_callback()

此时:tick_callback的数值为:<function Transport.register_with_event_loop.<locals>.on_poll_start >,所以 tick_callback就调用到 Transport.register_with_event_loop.<locals>.on_poll_start

6.1 回顾如何注册回调

Transport方法如何注册,我们需要回顾,在前面代码这里会注册回调方法。

代码语言:javascript
复制
conn.register_with_event_loop(hub)

具体注册如下:

代码语言:javascript
复制
def register_with_event_loop(self, connection, loop):

    cycle_poll_start = cycle.on_poll_start
    add_reader = loop.add_reader
    on_readable = self.on_readable

    def _on_disconnect(connection):
        if connection._sock:
            loop.remove(connection._sock)
    cycle._on_connection_disconnect = _on_disconnect

    def on_poll_start():
        cycle_poll_start()
        [add_reader(fd, on_readable, fd) for fd in cycle.fds]
        
    loop.on_tick.add(on_poll_start)

on_poll_start就是在这里注册的,就是把 on_poll_start 注册到 hub 的 on_tick 回调之中

代码语言:javascript
复制
loop.on_tick.add(on_poll_start)

所以前面的如下代码就调用到了 on_poll_start。

代码语言:javascript
复制
for tick_callback in on_tick:
    tick_callback()

6.2 Transport启动

所以,我们回到on_poll_start。

代码语言:javascript
复制
def on_poll_start():
    cycle_poll_start()
    [add_reader(fd, on_readable, fd) for fd in cycle.fds]

可以看到,有两部分代码:

  • poll_start : 这部分是 把 Channel 对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作;
  • add_reader :这部分是 把 poll 对应的 fd 添加到 MultiChannelPoller 这里,这样 MultiChannelPoller 就可以 打通 redis queue ----> Channel ---> socket ---> poll ---> fd ---> 读取 redis 这条通路了,就是如果 redis 有数据来了,MultiChannelPoller 就马上通过 poll 得到通知,就去 redis 读取;

让我们逐一看看。

6.3 poll_start in MultiChannelPoller

这里就是把Channel对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作

此时代码进入到MultiChannelPoller,数据如下:

代码语言:javascript
复制
self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f84e7928940>
 after_read = {set: 0} set()
 eventflags = {int} 25
 fds = {dict: 0} {}
 poller = {_poll} <kombu.utils.eventio._poll object at 0x7f84e75f4d68>

可以看出来,此处就是针对channel来进行注册,把所有的channel注册到 poll上。

代码语言:javascript
复制
def on_poll_start(self):
    for channel in self._channels:
        if channel.active_queues:           # BRPOP mode?
            if channel.qos.can_consume():
                self._register_BRPOP(channel)
        if channel.active_fanout_queues:    # LISTEN mode?
            self._register_LISTEN(channel)

对于 redis 的使用,有两种方法:BRPOP mode 和 LISTEN mode。分别对应 list 和 subscribe。

6.3.1 _register_BRPOP

我们先来看看 _register_BRPOP,这里做了两个判断,第一个是判断当前的 channel 是否放进了 epoll 模型里面,如果没有,那么就放进去;同时,如果之前这个 channel 不在 epoll 里面,那么这次放进去了,但是,这个 connection 还没有对 epoll 其效果,所以发送一个 _brpop_start

代码语言:javascript
复制
def _register_BRPOP(self, channel):
    """Enable BRPOP mode for channel."""
    ident = channel, channel.client, 'BRPOP'
    if not self._client_registered(channel, channel.client, 'BRPOP'):
        channel._in_poll = False
        self._register(*ident)
    if not channel._in_poll:  # send BRPOP
        channel._brpop_start()
6.3.1.1 注册到MultiChannelPoller

一个 Connection 对应一个 Hub它们之间的枢纽是 MultiChannelPoller,它负责找出哪个 Channel 是可用的,这些 Channel 都是来自同一个 Connection。具体注册代码如下:

代码语言:javascript
复制
def _register(self, channel, client, type):
    if (channel, client, type) in self._chan_to_sock:
        self._unregister(channel, client, type)
    if client.connection._sock is None:   # not connected yet.
        client.connection.connect()
        
    sock = client.connection._sock
    self._fd_to_chan[sock.fileno()] = (channel, type)
    self._chan_to_sock[(channel, client, type)] = sock
    self.poller.register(sock, self.eventflags)

这里的client是Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>

注意到这里client.connection._sock的数值是socket。

代码语言:javascript
复制
client.connection._sock = {socket} <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 52353), raddr=('127.0.0.1', 6379)>
 family = {AddressFamily} AddressFamily.AF_INET
 proto = {int} 6
 timeout = {NoneType} None
 type = {SocketKind} SocketKind.SOCK_STREAM

经过此阶段之后。

_fd_to_chan有意义,具体fd是 chanel 对应的 redis socket的fd

代码语言:javascript
复制
def _register(self, channel, client, type):
    if (channel, client, type) in self._chan_to_sock:
        self._unregister(channel, client, type)
    if client.connection._sock is None:   # not connected yet.
        client.connection.connect()
    sock = client.connection._sock
    self._fd_to_chan[sock.fileno()] = (channel, type)
    self._chan_to_sock[(channel, client, type)] = sock
    self.poller.register(sock, self.eventflags)

这里就是把channel与自己对应的socket联系起来,也把channel与socket的file联系起来

变量如下:

代码语言:javascript
复制
self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>
 after_read = {set: 0} set()
 eventflags = {int} 25
 fds = {dict: 1} 
  8 = {tuple: 2} (<kombu.transport.redis.Channel object at 0x7f9056a57278>, 'BRPOP')
  __len__ = {int} 1
 poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>

这样,从 socket fd 可以找到 对应的 channel,也能从 channel 找到 对应的 socket fd 。

如下图:

代码语言:javascript
复制
+----------------------------------------------------------------------------------+
|                                                                                  |
|   MultiChannelPoller                                                             |
|                                                                                  |
|                                       +---------------------------------------+  |
|                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
|           fds   +------------------>  |                                       |  |
|                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
|                                       |                                       |  |
|                                       |             ......                    |  |
|                                       |                                       |  |
|                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
|                                       +---------------------------------------+  |
|                                                                                  |
|                                                                                  |
|                                                                                  |
+----------------------------------------------------------------------------------+
6.3.1.2 注册到Poll

继续处理register,就是把socket注册到poll

代码语言:javascript
复制
class _poll:

    def __init__(self):
        self._poller = xpoll()
        self._quick_poll = self._poller.poll
        self._quick_register = self._poller.register
        self._quick_unregister = self._poller.unregister

    def register(self, fd, events):
        fd = fileno(fd)
        poll_flags = 0
        if events & ERR:
            poll_flags |= POLLERR
        if events & WRITE:
            poll_flags |= POLLOUT
        if events & READ:
            poll_flags |= POLLIN
        self._quick_register(fd, poll_flags)
        return fd

此时如下,我们仅仅以 fd 3 为例:

下面就是 Channel ---> socket ---> poll ---> fd 这条通路。

代码语言:javascript
复制
+----------------------------------------------------------------------------------+
|                                                                                  |
|   MultiChannelPoller                                                             |
|                                                                                  |
|                                       +---------------------------------------+  |
|                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
|           fds   +------------------>  |                                       |  |
|                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
|                                       |                                       |  |
|                                       |             ......                    |  |
|                                       |                                       |  |
|                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
|                                       |      +                                |  |
|                                       |      |                                |  |
|                                       +---------------------------------------+  |
|                                              |                                   |
+----------------------------------------------------------------------------------+
                                               |
                                               |
                                               v

                                            poll with OS
6.3.1.3 _brpop_start

若这个 connection 还没有对 epoll 其效果,就发送一个 _brpop_start作用为选择下一次读取的queue

_brpop_start如下:

代码语言:javascript
复制
def _brpop_start(self, timeout=1):
    queues = self._queue_cycle.consume(len(self.active_queues))
    if not queues:
        return
    keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
            for queue in queues] + [timeout or 0]
    self._in_poll = self.client.connection
    self.client.connection.send_command('BRPOP', *keys)

此时stack如下:

代码语言:javascript
复制
_register, redis.py:296
_register_BRPOP, redis.py:312
on_poll_start, redis.py:328
on_poll_start, redis.py:1072
create_loop, hub.py:294
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55

此时如下,现在我们有两条通路:

  • Channel ---> socket ---> poll ---> fd 这条通路;
  • MultiChannelPoller ---> 读取 redis 这条通路;
  • 因为这个时候 下一次 读取的 queue 已经确定了,所以已经 打通 Redis queue ----> Channel ---> socket ---> poll ---> fd 这条通路了。
代码语言:javascript
复制
+----------------------------------------------------------------------------------+
|                                                                                  |
|   MultiChannelPoller                                                             |
|                                                                                  |
|                                       +---------------------------------------+  |
|                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
|           fds   +------------------>  |                                       |  |
|                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
|                                       |                                       |  |
|                                       |             ......                    |  |
|                                       |                                       |  |
|                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
|          connection                   |      +                                |  |
|              +                        |      |                                |  |
|              |                        +---------------------------------------+  |
|              |                               |                                   |
+----------------------------------------------------------------------------------+
               |                               |
               |                               |
               v                               v

        Redis Queue   +----------------->   poll with OS
6.3.2 _register_LISTEN

本文没有相关部分,如果有topic 相关则会调用这里。Celery event 就利用了这种方法

代码语言:javascript
复制
def _register_LISTEN(self, channel):
    """Enable LISTEN mode for channel."""
    if not self._client_registered(channel, channel.subclient, 'LISTEN'):
        channel._in_listen = False
        self._register(channel, channel.subclient, 'LISTEN')
    if not channel._in_listen:
        channel._subscribe()  # send SUBSCRIBE

注册如下:

代码语言:javascript
复制
_subscribe, redis.py:656
_register_LISTEN, redis.py:322
on_poll_start, redis.py:330
on_poll_start, redis.py:1072
create_loop, hub.py:294
asynloop, loops.py:81
start, consumer.py:592
start, bootsteps.py:116
start, consumer.py:311
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:204
worker, worker.py:327

此时变量如下:

代码语言:javascript
复制
c = {PubSub} <redis.client.PubSub object at 0x7fb09e750400>
keys = {list: 1}
 0 = {str} '/0.celery.pidbox'
     
self = {Channel} <kombu.transport.redis.Channel object at 0x7fb09e6c8c88>

6.4 注册 reader in MultiChannelPoller

上面可以看到,把所有的 channel 注册到 poll上,对所有的 queue 都发起了监听请求,也就是说任一个队列有消息过来,那么都会被响应到,那么响应给谁呢?需要看看 add_reader 这个函数做了啥:

就是说,前面那些注册到 poll,其实没有注册响应方法,现在需要注册

复习下,add_reader 在 on_poll_start 这里。

代码语言:javascript
复制
def on_poll_start():
    cycle_poll_start()
    [add_reader(fd, on_readable, fd) for fd in cycle.fds]

cycle.fds 具体是得到了所有fd。

代码语言:javascript
复制
@property
def fds(self):
    return self._fd_to_chan

具体添加是在 Hub 类中。

  • 这里会再次尝试添加。
  • 然后会把 fd 与 callback 联系起来。
代码语言:javascript
复制
class Hub:
    def add_reader(self, fds, callback, *args):
        return self.add(fds, callback, READ | ERR, args)

    def add(self, fd, callback, flags, args=(), consolidate=False):
        fd = fileno(fd)
        try:
            self.poller.register(fd, flags)
        except ValueError:
            self._remove_from_loop(fd)
            raise
        else:
            dest = self.readers if flags & READ else self.writers
            if consolidate:
                self.consolidate.add(fd)
                dest[fd] = None
            else:
                dest[fd] = callback, args

注意,这里设置的是:hub 的成员变量,self.readers ,其在后续 poll 消息产生的就用到了,就调用这些callback,就是 Transport.on_readable。

代码语言:javascript
复制
readers = {dict: 1} 
 8 = {tuple: 2} (<bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>, (8,))
  0 = {method} <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>
  1 = {tuple: 1} 8

stack为:

代码语言:javascript
复制
register, eventio.py:187
add, hub.py:164
add_reader, hub.py:213
<listcomp>, redis.py:1073
on_poll_start, redis.py:1073
create_loop, hub.py:294
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55

所以此时为如下,依然不知道响应给谁

代码语言:javascript
复制
+----------------------------------------------------------------------------------+
|                                                                                  |
|   MultiChannelPoller                                                             |
|                                                                                  |
|                                       +---------------------------------------+  |
|                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
|           fds   +------------------>  |                                       |  |
|                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
|                                       |                                       |  |
|                                       |             ......                    |  |
|                                       |                                       |  |
|                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
|          connection                   |      +                                |  |
|              +                        |      |                                |  |
|              |                        +---------------------------------------+  |
|              |                               |                                   |
+----------------------------------------------------------------------------------+
               |                               |
               |                               |
               v                               v

        Redis Queue   +------------------>  poll with OS



 +---------------------------------------------------------------------------------+
 |                                                                                 |
 |    Hub                                                                          |
 |                                     +--------------------------------------+    |
 |                                     |fd 3 : [ Transport.on_readable, fd 3] |    |
 |                                     |                                      |    |
 |       readers  +------------------> |       ......                         |    |
 |                                     |                                      |    |
 |                                     |fd 1 : [ Transport.on_readable, fd 1] |    |
 |                                     +--------------------------------------+    |
 |                                                                                 |
 +---------------------------------------------------------------------------------+

因为这个流程十分复杂,为了简化,我们这里提前剧透,在 消费函数时候,Transport 会设置 自己的 _callbacks[queue] 为一个回调函数,所以 MultiChannelPoller 读取 queue 这部分也可以联系起来

代码语言:javascript
复制
    def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
        """Consume from `queue`."""
        self._tag_to_queue[consumer_tag] = queue
        self._active_queues.append(queue)

        def _callback(raw_message):
            message = self.Message(raw_message, channel=self)
            if not no_ack:
                self.qos.append(message, message.delivery_tag)
            return callback(message)

        self.connection._callbacks[queue] = _callback # 这里设置
        
        self._consumers.add(consumer_tag)

        self._reset_cycle()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0x00 摘要
  • 0x01 示例代码
  • 0x02 来由
  • 0x03 Poll一般步骤
  • 0x04 建立 Hub
  • 0x05 Forever in Hub
    • 5.1 建立loop
    • 0x06 启动Poll
      • 6.1 回顾如何注册回调
        • 6.2 Transport启动
          • 6.3 poll_start in MultiChannelPoller
            • 6.3.1 _register_BRPOP
            • 6.3.2 _register_LISTEN
          • 6.4 注册 reader in MultiChannelPoller
          相关产品与服务
          云数据库 Redis
          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档