本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Hub 概念。
下面使用如下代码来进行说明。
本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。
def main(arguments):
hub = Hub()
exchange = Exchange('asynt_exchange')
queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
print('message sent')
def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # <-- exit after one message
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
def p_message():
print(' kombu ')
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(3, p_message)
hub.run_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
前文中,Consumer部分有一句代码没有分析:
hub.run_forever()
此时,hub与Connection已经联系起来,具体如下:
具体如下图:
+----------------------+ +-------------------+
| Consumer | | Channel |
| | | | +-----------------------------------------------------------+
| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| channel +--------------------> | | +-----------------------------------------------------------+
| | | pool |
| | +---------> | | <------------------------------------------------------------+
| queues | | | | |
| | | +----> | connection +---------------+ |
| | | | | | | | |
+----------------------+ | | +-------------------+ | |
| | | v |
| | | +-------------------+ +---+-----------------+ +--------------------+ |
| | | | Connection | | redis.Transport | | MultiChannelPoller | |
| | | | | | | | | |
| | | | | | | | _channels +--------+
| | | | | | cycle +------------> | _fd_to_chan |
| | | | transport +---------> | | | _chan_to_sock |
| +-------->+ | | | | | +------+ poller |
| | | +-------------------+ +---------------------+ | | after_read |
| | | | | |
| | | | +--------------------+
| | | +------------------+ +---------------+
| | | | Hub | |
| | | | | v
| | | | | +------+------+
| | | | poller +---------------> | _poll |
| | | | | | | +-------+
| | | | | | _poller+---------> | poll |
v | | +------------------+ | | +-------+
| | +-------------+
+-------------------+ | +----------------+
| Queue | | | | Exchange |
| _chann+l | +----+ | |
| | | |
| exchange +----------------> | channel |
| | | |
| | | |
+-------------------+ +----------------+
手机如下:
现在我们知道:
但是,我们只是大致知道 poll 是用来做什么的,但是不知道consumer,poll 究竟如何与Hub交互。我们本文就接着分析。
在linux系统中,使用Poll的一般步骤如下:
所以我们就需要在 Hub 代码中看看 kombu 如何使用 Poll。
在建立 Hub 这里会建立 Hub 内部的 Poller。
_get_poller, eventio.py:312
poll, eventio.py:328
_create_poller, hub.py:113
__init__, hub.py:96
main, hub_receive.py:23
<module>, hub_receive.py:46
具体代码是:
def _get_poller():
if detect_environment() != 'default':
# greenlet
return _select
elif epoll:
# Py2.6+ Linux
return _epoll
elif kqueue and 'netbsd' in sys.platform:
return _kqueue
elif xpoll:
return _poll
else:
return _select
这样,在 Hub内部就建立了 poller。
class Hub:
"""Event loop object.
Arguments:
timer (kombu.asynchronous.Timer): Specify custom timer instance.
"""
def __init__(self, timer=None):
self.timer = timer if timer is not None else Timer()
self.readers = {}
self.writers = {}
self.on_tick = set()
self.on_close = set()
self._ready = set()
self._running = False
self._loop = None
self._create_poller()
@property
def poller(self):
if not self._poller:
self._create_poller()
return self._poller
@poller.setter
def poller(self, value):
self._poller = value
def _create_poller(self):
self._poller = poll()
self._register_fd = self._poller.register
self._unregister_fd = self._poller.unregister
这里需要注意的是:
在 MultiChannelPoller 之中,也会生成一个 poller,但是在注册时候,Transport 会使用 hub 的 poller,而非 MultiChannelPoller 内部的 poller。
on_poll_init, redis.py:333
register_with_event_loop, redis.py:1061
register_with_event_loop, connection.py:266
main, hub_receive.py:38
<module>, hub_receive.py:46
在 kombu.transport.redis.Transport 代码如下:
def register_with_event_loop(self, connection, loop):
cycle = self.cycle
cycle.on_poll_init(loop.poller) # 这里赋值。
cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
on_readable = self.on_readable
继续深入,看到进一步赋值:
def on_poll_init(self, poller):
self.poller = poller # 这里赋值
for channel in self._channels:
return channel.qos.restore_visible(
num=channel.unacked_restore_limit,
)
hub.run_forever() 主要作用是:
比如维护如下变量:
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)
具体 run_forever 如下:
def run_forever(self):
self._running = True
try:
while 1:
try:
self.run_once()
except Stop:
break
finally:
self._running = False
于是又有调用如下,这里就进入了loop:
def run_once(self):
try:
next(self.loop)
except StopIteration:
self._loop = None
next(self.loop)
继续调用,建立loop。这就是Hub的作用。
调用stack如下:
create_loop, hub.py:279
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55
简化版代码如下:
def create_loop(self, ...):
while 1:
todo = self._ready
self._ready = set()
for tick_callback in on_tick:
tick_callback() # 这里回调用户方法
for item in todo:
if item:
item()
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
if readers or writers:
to_consolidate = []
events = poll(poll_timeout) # 等待消息
for fd, event in events or ():
if fd in consolidate and \
writers.get(fd) is None:
to_consolidate.append(fd)
continue
cb = cbargs = None
if event & READ:
cb, cbargs = readers[fd] # 读取redis
elif event & WRITE:
cb, cbargs = writers[fd] # 处理redis
if isinstance(cb, generator):
try:
next(cb)
else:
cb(*cbargs) # 调用用户代码
if to_consolidate:
consolidate_callback(to_consolidate)
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield
下面我们逐步分析。
循环最开始将启动 Poll。 tick_callback 的作用就是启动 Poll。就是建立一个机制,当 redis 有消息时候,得到通知。
while 1:
todo = self._ready
self._ready = set()
for tick_callback in on_tick:
tick_callback()
此时:tick_callback的数值为:<function Transport.register_with_event_loop.<locals>.on_poll_start >
,所以 tick_callback就调用到 Transport.register_with_event_loop.<locals>.on_poll_start
。
Transport方法如何注册,我们需要回顾,在前面代码这里会注册回调方法。
conn.register_with_event_loop(hub)
具体注册如下:
def register_with_event_loop(self, connection, loop):
cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
on_readable = self.on_readable
def _on_disconnect(connection):
if connection._sock:
loop.remove(connection._sock)
cycle._on_connection_disconnect = _on_disconnect
def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
loop.on_tick.add(on_poll_start)
on_poll_start就是在这里注册的,就是把 on_poll_start 注册到 hub 的 on_tick 回调之中。
loop.on_tick.add(on_poll_start)
所以前面的如下代码就调用到了 on_poll_start。
for tick_callback in on_tick:
tick_callback()
所以,我们回到on_poll_start。
def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
可以看到,有两部分代码:
redis queue ----> Channel ---> socket ---> poll ---> fd ---> 读取 redis
这条通路了,就是如果 redis 有数据来了,MultiChannelPoller 就马上通过 poll 得到通知,就去 redis 读取;让我们逐一看看。
这里就是把Channel对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作。
此时代码进入到MultiChannelPoller,数据如下:
self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f84e7928940>
after_read = {set: 0} set()
eventflags = {int} 25
fds = {dict: 0} {}
poller = {_poll} <kombu.utils.eventio._poll object at 0x7f84e75f4d68>
可以看出来,此处就是针对channel来进行注册,把所有的channel注册到 poll上。
def on_poll_start(self):
for channel in self._channels:
if channel.active_queues: # BRPOP mode?
if channel.qos.can_consume():
self._register_BRPOP(channel)
if channel.active_fanout_queues: # LISTEN mode?
self._register_LISTEN(channel)
对于 redis 的使用,有两种方法:BRPOP mode 和 LISTEN mode。分别对应 list 和 subscribe。
我们先来看看 _register_BRPOP
,这里做了两个判断,第一个是判断当前的 channel 是否放进了 epoll 模型里面,如果没有,那么就放进去;同时,如果之前这个 channel 不在 epoll 里面,那么这次放进去了,但是,这个 connection 还没有对 epoll 其效果,所以发送一个 _brpop_start
。
def _register_BRPOP(self, channel):
"""Enable BRPOP mode for channel."""
ident = channel, channel.client, 'BRPOP'
if not self._client_registered(channel, channel.client, 'BRPOP'):
channel._in_poll = False
self._register(*ident)
if not channel._in_poll: # send BRPOP
channel._brpop_start()
一个 Connection 对应一个 Hub,它们之间的枢纽是 MultiChannelPoller
,它负责找出哪个 Channel 是可用的,这些 Channel 都是来自同一个 Connection。具体注册代码如下:
def _register(self, channel, client, type):
if (channel, client, type) in self._chan_to_sock:
self._unregister(channel, client, type)
if client.connection._sock is None: # not connected yet.
client.connection.connect()
sock = client.connection._sock
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)
这里的client是Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
。
注意到这里client.connection._sock的数值是socket。
client.connection._sock = {socket} <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 52353), raddr=('127.0.0.1', 6379)>
family = {AddressFamily} AddressFamily.AF_INET
proto = {int} 6
timeout = {NoneType} None
type = {SocketKind} SocketKind.SOCK_STREAM
经过此阶段之后。
_fd_to_chan有意义,具体fd是 chanel 对应的 redis socket的fd。
def _register(self, channel, client, type):
if (channel, client, type) in self._chan_to_sock:
self._unregister(channel, client, type)
if client.connection._sock is None: # not connected yet.
client.connection.connect()
sock = client.connection._sock
self._fd_to_chan[sock.fileno()] = (channel, type)
self._chan_to_sock[(channel, client, type)] = sock
self.poller.register(sock, self.eventflags)
这里就是把channel与自己对应的socket联系起来,也把channel与socket的file联系起来。
变量如下:
self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>
after_read = {set: 0} set()
eventflags = {int} 25
fds = {dict: 1}
8 = {tuple: 2} (<kombu.transport.redis.Channel object at 0x7f9056a57278>, 'BRPOP')
__len__ = {int} 1
poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>
这样,从 socket fd 可以找到 对应的 channel,也能从 channel 找到 对应的 socket fd 。
如下图:
+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| +---------------------------------------+ |
| |
| |
| |
+----------------------------------------------------------------------------------+
继续处理register,就是把socket注册到poll
class _poll:
def __init__(self):
self._poller = xpoll()
self._quick_poll = self._poller.poll
self._quick_register = self._poller.register
self._quick_unregister = self._poller.unregister
def register(self, fd, events):
fd = fileno(fd)
poll_flags = 0
if events & ERR:
poll_flags |= POLLERR
if events & WRITE:
poll_flags |= POLLOUT
if events & READ:
poll_flags |= POLLIN
self._quick_register(fd, poll_flags)
return fd
此时如下,我们仅仅以 fd 3 为例:
下面就是 Channel ---> socket ---> poll ---> fd
这条通路。
+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| | + | |
| | | | |
| +---------------------------------------+ |
| | |
+----------------------------------------------------------------------------------+
|
|
v
poll with OS
若这个 connection 还没有对 epoll 其效果,就发送一个 _brpop_start
。作用为选择下一次读取的queue。
_brpop_start如下:
def _brpop_start(self, timeout=1):
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys)
此时stack如下:
_register, redis.py:296
_register_BRPOP, redis.py:312
on_poll_start, redis.py:328
on_poll_start, redis.py:1072
create_loop, hub.py:294
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55
此时如下,现在我们有两条通路:
Channel ---> socket ---> poll ---> fd
这条通路;MultiChannelPoller ---> 读取 redis
这条通路;Redis queue ----> Channel ---> socket ---> poll ---> fd
这条通路了。+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| connection | + | |
| + | | | |
| | +---------------------------------------+ |
| | | |
+----------------------------------------------------------------------------------+
| |
| |
v v
Redis Queue +-----------------> poll with OS
本文没有相关部分,如果有topic 相关则会调用这里。Celery event 就利用了这种方法。
def _register_LISTEN(self, channel):
"""Enable LISTEN mode for channel."""
if not self._client_registered(channel, channel.subclient, 'LISTEN'):
channel._in_listen = False
self._register(channel, channel.subclient, 'LISTEN')
if not channel._in_listen:
channel._subscribe() # send SUBSCRIBE
注册如下:
_subscribe, redis.py:656
_register_LISTEN, redis.py:322
on_poll_start, redis.py:330
on_poll_start, redis.py:1072
create_loop, hub.py:294
asynloop, loops.py:81
start, consumer.py:592
start, bootsteps.py:116
start, consumer.py:311
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:204
worker, worker.py:327
此时变量如下:
c = {PubSub} <redis.client.PubSub object at 0x7fb09e750400>
keys = {list: 1}
0 = {str} '/0.celery.pidbox'
self = {Channel} <kombu.transport.redis.Channel object at 0x7fb09e6c8c88>
上面可以看到,把所有的 channel 注册到 poll上,对所有的 queue 都发起了监听请求,也就是说任一个队列有消息过来,那么都会被响应到,那么响应给谁呢?需要看看 add_reader
这个函数做了啥:
就是说,前面那些注册到 poll,其实没有注册响应方法,现在需要注册。
复习下,add_reader 在 on_poll_start 这里。
def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
cycle.fds 具体是得到了所有fd。
@property
def fds(self):
return self._fd_to_chan
具体添加是在 Hub 类中。
class Hub:
def add_reader(self, fds, callback, *args):
return self.add(fds, callback, READ | ERR, args)
def add(self, fd, callback, flags, args=(), consolidate=False):
fd = fileno(fd)
try:
self.poller.register(fd, flags)
except ValueError:
self._remove_from_loop(fd)
raise
else:
dest = self.readers if flags & READ else self.writers
if consolidate:
self.consolidate.add(fd)
dest[fd] = None
else:
dest[fd] = callback, args
注意,这里设置的是:hub 的成员变量,self.readers ,其在后续 poll 消息产生的就用到了,就调用这些callback,就是 Transport.on_readable。
readers = {dict: 1}
8 = {tuple: 2} (<bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>, (8,))
0 = {method} <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>
1 = {tuple: 1} 8
stack为:
register, eventio.py:187
add, hub.py:164
add_reader, hub.py:213
<listcomp>, redis.py:1073
on_poll_start, redis.py:1073
create_loop, hub.py:294
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:51
<module>, testUb.py:55
所以此时为如下,依然不知道响应给谁:
+----------------------------------------------------------------------------------+
| |
| MultiChannelPoller |
| |
| +---------------------------------------+ |
| | socket fd 1 : [ Channel 1, 'BRPOP'] | |
| fds +------------------> | | |
| | socket fd 2 : [ Channel 2, 'BRPOP'] | |
| | | |
| | ...... | |
| | | |
| | socket fd 3 : [ Channel 3, 'BRPOP'] | |
| connection | + | |
| + | | | |
| | +---------------------------------------+ |
| | | |
+----------------------------------------------------------------------------------+
| |
| |
v v
Redis Queue +------------------> poll with OS
+---------------------------------------------------------------------------------+
| |
| Hub |
| +--------------------------------------+ |
| |fd 3 : [ Transport.on_readable, fd 3] | |
| | | |
| readers +------------------> | ...... | |
| | | |
| |fd 1 : [ Transport.on_readable, fd 1] | |
| +--------------------------------------+ |
| |
+---------------------------------------------------------------------------------+
因为这个流程十分复杂,为了简化,我们这里提前剧透,在 消费函数时候,Transport 会设置 自己的 _callbacks[queue] 为一个回调函数,所以 MultiChannelPoller 读取 queue 这部分也可以联系起来:
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback # 这里设置
self._consumers.add(consumer_tag)
self._reset_cycle()
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。