前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kombu连接流程详解

kombu连接流程详解

作者头像
tunsuy
发布2022-10-27 09:40:04
3900
发布2022-10-27 09:40:04
举报
文章被收录于专栏:有文化的技术人

kombu的mq模型

因为 Kombu 是对 AMQP 进行抽象,所以它必定有抽象的模型,事实上,它大体上和 RabbitMQ 差不多,但是,不完全一样,有一些差别,下面就介绍一下 Konbu 的抽象模型。

在 Kombu 中,存在多个概念,其实我们在前边简单的生产/消费者样例中已经看到了了一些,他们分别是:

Message:生产消费的基本单位,其实就是我们所谓的一条条消息 Connection:对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接 Transport:真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例 Producers: 发送消息的抽象类 Consumers:接受消息的抽象类 Exchange:MQ 路由,这个和 RabbitMQ 差不多,支持 5 类型 Queue:对应的 queue 抽象,其实就是一个字符串的封装

Connection

使用方式:

代码语言:javascript
复制
self.connection = kombu.connection.Connection(
            self._url, ssl=self._fetch_ssl_params(),
            login_method=self.login_method,
            heartbeat=self.heartbeat_timeout_threshold,
            failover_strategy=self.kombu_failover_strategy,
            transport_options={
                'confirm_publish': True,
                'client_properties': {
                    'capabilities': {
                        'authentication_failure_close': True,
                        'connection.blocked': True,
                        'consumer_cancel_notify': True
                    },
                    'connection_name': self.name},
                'on_blocked': self._on_connection_blocked,
                'on_unblocked': self._on_connection_unblocked,
            },
        )

进入Connection的init方法:

代码语言:javascript
复制
def __init__(self, hostname='localhost', userid=None,
			 password=None, virtual_host=None, port=None, insist=False,
			 ssl=False, transport=None, connect_timeout=5,
			 transport_options=None, login_method=None, uri_prefix=None,
			 heartbeat=0, failover_strategy='round-robin',
			 alternates=None, **kwargs):
	alt = [] if alternates is None else alternates
	# have to spell the args out, just to get nice docstrings :(
	params = self._initial_params = {
		'hostname': hostname, 'userid': userid,
		'password': password, 'virtual_host': virtual_host,
		'port': port, 'insist': insist, 'ssl': ssl,
		'transport': transport, 'connect_timeout': connect_timeout,
		'login_method': login_method, 'heartbeat': heartbeat
	}

	if hostname and not isinstance(hostname, string_t):
		alt.extend(hostname)
		hostname = alt[0]
	if hostname and '://' in hostname:
		if ';' in hostname:
			alt.extend(hostname.split(';'))
			hostname = alt[0]
		if '+' in hostname[:hostname.index('://')]:
			# e.g. sqla+mysql://root:masterkey@localhost/
			params['transport'], params['hostname'] = \
				hostname.split('+', 1)
			self.uri_prefix = params['transport']
		else:
			transport = transport or urlparse(hostname).scheme
			if not get_transport_cls(transport).can_parse_url:
				# we must parse the URL
				url_params = parse_url(hostname)
				params.update(
					dictfilter(url_params),
					hostname=url_params['hostname'],
				)

			params['transport'] = transport

	self._init_params(**params)

注意这里的: self.transport_cls = transport 后续会通过transport名称调用相应的transport方法,比如:如果transport是rabbitmq,则使用librabbitmq.py中的方法

Transport

基类 transport/base.py

代码语言:javascript
复制
class Transport(object):
    """Base class for transports."""

Message

这是 Kombu 中生产/消费的基本单位

代码语言:javascript
复制
    def __init__(self, body=None, delivery_tag=None,
                 content_type=None, content_encoding=None, delivery_info=None,
                 properties=None, headers=None, postencode=None,
                 accept=None, channel=None, **kwargs):
        delivery_info = {} if not delivery_info else delivery_info
        self.errors = [] if self.errors is None else self.errors
        self.channel = channel
        self.delivery_tag = delivery_tag
        self.content_type = content_type
        self.content_encoding = content_encoding
        self.delivery_info = delivery_info
        self.headers = headers or {}
        self.properties = properties or {}
        self._decoded_cache = None
        self._state = 'RECEIVED'
        self.accept = accept

        compression = self.headers.get('compression')
        if not self.errors and compression:
            try:
                body = decompress(body, compression)
            except Exception:
                self.errors.append(sys.exc_info())

        if not self.errors and postencode and isinstance(body, text_t):
            try:
                body = body.encode(postencode)
            except Exception:
                self.errors.append(sys.exc_info())
        self.body = body

流程详解

回顾下oslo_messaging之RPC详解中提到的:如果是调用create()新创建一个连接,就会实例化一个Connection 实例,该Connection类的实现在消息队列驱动类所在的模块,对于Rabbit Driver,Connection 类位于oslo_messaging/_drivers/impl_rabbit.py。Connection 类封装了RabbitMQ 的相关操作,如:direct_send,topic_send,fanout_send, notify_send 等。这几种方法对应了RabbitMQ 中几种消息分发模式。

client调用的时候最终会执行到下面的代码:oslo_messaging/_drivers/impl_rabbit.py

代码语言:javascript
复制
def ensure(self, method, retry=None,
               recoverable_error_callback=None, error_callback=None,
               timeout_is_error=True):

        if retry is None:
            retry = self.max_retries # 虽然retry 没有接收任何传入值,但在此被赋值
        if retry is None or retry < 0:
            retry = None

        def execute_method(channel):
            self._set_current_channel(channel)
            # 正常情况,稍后会走到此,运行调用ensure() 传递进来的参数
            method()

        try:
           # self.connection 即kombu 库下的Connection 类实例
           # autoretry 调用较复杂,将在下文详细说明
            autoretry_method = self.connection.autoretry(
                #self.channel 只会在调用ensure_connection()方法时,被置为None
                execute_method, channel=self.channel,
                max_retries=retry,
                errback=on_error,
                interval_start=self.interval_start or 1,
                interval_step=self.interval_stepping,
                interval_max=self.interval_max,
                on_revive=on_reconnection)
            # autoretry_method 实际是一个_ensured 方法
            ret, channel = autoretry_method()
            self._set_current_channel(channel) #设置self.channel
            return ret
        except rpc_amqp.AMQPDestinationNotFound:
            # NOTE(sileht): we must reraise this without
            # trigger error_callback
            raise
        except Exception as exc:
            error_callback and error_callback(exc)
            self._set_current_channel(None)
            # NOTE(sileht): number of retry exceeded and the connection
            # is still broken
            info = {'err_str': exc, 'retry': retry}
            info.update(self.connection.info())
            msg = _('Unable to connect to AMQP server on '
                    '%(hostname)s:%(port)s after %(retry)s '
                    'tries: %(err_str)s') % info
            LOG.error(msg)
            raise exceptions.MessageDeliveryFailure(msg)

下面看一下self.connection.autoretry 调用流程,这个流程用法稍微用到了Python的高级特性——可调用对象。

首先,明确self.connection 为在上述Connection 类初始化中赋值的,实际为kombu/connection.py 中的Connection实例,在kombu 库中的Connection类中,同样有ensure_connection() 和ensure() 方法。

kombu/connection.py ,并且有很多方法使用了property 装饰器

代码语言:javascript
复制
class Connection(object):
    def autoretry(self, fun, channel=None, **ensure_options):#fun 为execute_method
        channels = [channel]
        class Revival(object): #通过__call__()实现了一个可调用对象
            __name__ = getattr(fun, '__name__', None) # 将参数fun 的名字置为Revival 类的名字
            __module__ = getattr(fun, '__module__', None)
            __doc__ = getattr(fun, '__doc__', None)

            def __init__(self, connection):
                self.connection = connection # 将Connection 类实例引用赋值给了self.connection

            def revive(self, channel):
                channels[0] = channel

            def __call__(self, *args, **kwargs): #将在调用Revival 类时被执行
                if channels[0] is None:
                     #default_channel 实际为Connection类中一个通过property 装饰的方法
                    self.revive(self.connection.default_channel)
                return fun(*args, channel=channels[0], **kwargs), channels[0]

        revive = Revival(self) #注意传递进了参数为自身的类实例引用
        return self.ensure(revive, revive, **ensure_options)

autoretry_method变量的赋值过程如下:

代码语言:javascript
复制
Connection.autoretry()->Connection.ensure()-> return ensure._ensure

autoretry_method变量的执行过程:

autoretry_method() [实际执行的是_ensure()方法]->Revival.call()。下面详细分析一下__call__() 的执行流程:

代码语言:javascript
复制
class Revival(object):
        __name__ = getattr(fun, '__name__', None)
        __module__ = getattr(fun, '__module__', None)
        __doc__ = getattr(fun, '__doc__', None)

        def __init__(self, connection):
            self.connection = connection

        def revive(self, channel):
            channels[0] = channel

        def __call__(self, *args, **kwargs):
            if channels[0] is None: # channel 存在,就不去判断是否存在TCP连接,及生成channel了。
                self.revive(self.connection.default_channel) #给channels[0]赋值
            return fun(*args, channel=channels[0], **kwargs), channels[0]

在__call__方法中,调用self.revive()方法给channels[0] 赋值,revive() 方法传入的参数实际上是通过执行了self.connection.default_channel 获取,default_channel 是一个被property 装饰的方法。

kombu/connetion.py

代码语言:javascript
复制
@property
def default_channel(self):
    conn_opts = {}
	transport_opts = self.transport_options
	if transport_opts:
		if 'max_retries' in transport_opts:
			conn_opts['max_retries'] = transport_opts['max_retries']
		if 'interval_start' in transport_opts:
			conn_opts['interval_start'] = transport_opts['interval_start']
		if 'interval_step' in transport_opts:
			conn_opts['interval_step'] = transport_opts['interval_step']
		if 'interval_max' in transport_opts:
			conn_opts['interval_max'] = transport_opts['interval_max']

	# make sure we're still connected, and if not refresh.
	self.ensure_connection(**conn_opts)
	if self._default_channel is None:
		self._default_channel = self.channel()
	return self._default_channel

def ensure_connection(self, errback=None, max_retries=None,
					  interval_start=2, interval_step=2, interval_max=30,
					  callback=None, reraise_as_library_errors=True,
					  timeout=None):
	def on_error(exc, intervals, retries, interval=0):
		round = self.completes_cycle(retries)
		if round:
			interval = next(intervals)
		if errback:
			errback(exc, interval)
		self.maybe_switch_next()  # select next host

		return interval if round else 0

	ctx = self._reraise_as_library_errors
	if not reraise_as_library_errors:
		ctx = self._dummy_context
	with ctx():
		retry_over_time(self.connect, self.recoverable_connection_errors,
						(), {}, on_error, max_retries,
						interval_start, interval_step, interval_max,
						callback, timeout=timeout)
	return self

kombu/utils/functional.py

代码语言:javascript
复制
def retry_over_time(fun, catch, args=None, kwargs=None, errback=None,
                    max_retries=None, interval_start=2, interval_step=2,
                    interval_max=30, callback=None, timeout=None):
    """Retry the function over and over until max retries is exceeded.

    For each retry we sleep a for a while before we try again, this interval
    is increased for every retry until the max seconds is reached.

    Arguments:
        fun (Callable): The function to try
        catch (Tuple[BaseException]): Exceptions to catch, can be either
            tuple or a single exception class.

    Keyword Arguments:
        args (Tuple): Positional arguments passed on to the function.
        kwargs (Dict): Keyword arguments passed on to the function.
        errback (Callable): Callback for when an exception in ``catch``
            is raised.  The callback must take three arguments:
            ``exc``, ``interval_range`` and ``retries``, where ``exc``
            is the exception instance, ``interval_range`` is an iterator
            which return the time in seconds to sleep next, and ``retries``
            is the number of previous retries.
        max_retries (int): Maximum number of retries before we give up.
            If neither of this and timeout is set, we will retry forever.
            If one of this and timeout is reached, stop.
        interval_start (float): How long (in seconds) we start sleeping
            between retries.
        interval_step (float): By how much the interval is increased for
            each retry.
        interval_max (float): Maximum number of seconds to sleep
            between retries.
        timeout (int): Maximum seconds waiting before we give up.
    """
    kwargs = {} if not kwargs else kwargs
    args = [] if not args else args
    interval_range = fxrange(interval_start,
                             interval_max + interval_start,
                             interval_step, repeatlast=True)
    end = time() + timeout if timeout else None
    for retries in count():
        try:
            return fun(*args, **kwargs)
        except catch as exc:
            if max_retries and retries >= max_retries:
                raise
            if end and time() > end:
                raise
            if callback:
                callback()
            tts = float(errback(exc, interval_range, retries) if errback
                        else next(interval_range))
            if tts:
                for _ in range(int(tts)):
                    if callback:
                        callback()
                    sleep(1.0)
                # sleep remainder after int truncation above.
                sleep(abs(int(tts) - tts))

在retry_over_time() 函数中,我们看到了一个用于尝试连接的for 循环,如果本次连接失败,通过在异常处理中判断retries 的次数 是否超过max_retries 来决定是否终止连接尝试。再看max_retries 参数的来源,max_retries 实际上来源于ensure_connection()

再看 retry_over_time() 中的正确执行流程 ,进入fun(*args, **kwargs) 一句,fun 变量实际为connect 方法。该方法内的调用多处用到了property 装饰特性。

kombu/connection.py

代码语言:javascript
复制
class Connection(object):
    def connect(self):
        """Establish connection to server immediately."""
        self._closed = False
        return self.connection # 用到property 装饰器
    
    @property
    def connection(self):
        if not self._closed: # 在connect()函数中被无条件置为False
            # self.connected 同样是一个被property装饰的函数,执行该函数确认连接是否已经建立
            if not self.connected:
                self.declared_entities.clear()
                self._default_channel = None
                self._connection = self._establish_connection() # self._connection 在此赋值
                self._closed = False
            return self._connection
    
    def _establish_connection(self): # 建立连接
        self._debug('establishing connection...')
        # self.transport 用到property 装饰器,实际为返回self._transport
        conn = self.transport.establish_connection()
        self._debug('connection established: %r', self)
        return conn
        
    @property
    def transport(self):
        if self._transport is None:
           # self._transport 的通过create_transport() 初始化,实际为kombu/transport 下某个
           # Transport 类实例。此处,为kombu/transport/pyampq.py 中的Transport 类实例。
           # 先不细看create_transport() 的实例化过程
            self._transport = self.create_transport()
        return self._transport 

返回到Revial 类的__call__()中 ,执行完self.revive() 方法后,接着执行autoretry 方法传递的函数fun。该函数是ensure() [impl_rabbit.py]方法内部定义的 execute_method().

代码语言:javascript
复制
def execute_method(channel):
     
     #设置当前使用的self.channel 为传递进来的channel
     #并且,如果为消费者channel ,设置channel的 QoS prefetch count
     self._set_current_channel(channel)
     
     #执行ensure()传递的fun,ensure在多处被调用,如topic_send, fanout_send,direct_sned,
     #ensure_connection等. 对于ensure_connection(),执行的方法还是上面的connect() 方法,但不会再次
     #建立建立,是直接返回
     method()

返回到impl_rabbit.py 中Connection 类的初始化方法,执行完self.ensure_connection()语句后,一个TCP连接和channel 就初始化完成了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 有文化的技术人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • kombu的mq模型
  • Connection
  • Transport
  • Message
  • 流程详解
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档