前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[1005]pika 线程不安全

[1005]pika 线程不安全

作者头像
周小董
发布2021-06-24 10:43:34
1.5K0
发布2021-06-24 10:43:34
举报
文章被收录于专栏:python前行者python前行者

先说结论:Pika is not thread safe. Use a BlockingConnection per-thread。

即 Pika 并不是线程安全的,应该在每个线程里,都使用各种的 BlockingConnection

相关 issue:https://github.com/pika/pika/issues/1237

示例一:线程外创建 connection,线程里创建 channel

来源:https://github.com/pika/pika/issues/1237

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wu

import pika
import threading

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))


def loop1():
    channel = connection.channel()
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(1)


def loop2():
    channel = connection.channel()
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(2)


threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

代码语言:javascript
复制
1
Exception in thread LoopThread:
Traceback (most recent call last):
  File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/wu/Work/youmi/projects/ym-crawler-ccs/dataparser/test12.py", line 19, in loop2
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
    self._flush_output()
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
    self._connection._flush_output(lambda: self.is_closed, *waiters)
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: IndexError('pop from an empty deque')

示例二:线程外创建 connection 和 channel

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wu

import pika
import threading

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
channel = connection.channel()


def loop1():
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(1)


def loop2():
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(2)


threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

代码语言:javascript
复制
Exception in thread LoopThread:
Traceback (most recent call last):
  File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/wu/Work/youmi/projects/ym-crawler-ccs/dataparser/test12.py", line 13, in loop1
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
    self._flush_output()
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
    self._connection._flush_output(lambda: self.is_closed, *waiters)
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: AssertionError(('_AsyncTransportBase._produce() tx buffer size underflow', -24, 1))

示例三:线程里创建 connection 和 channel

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wu

import pika
import threading


def loop1():
    connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
    channel = connection.channel()

    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(1)


def loop2():
    connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
    channel = connection.channel()

    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(2)


threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

代码语言:javascript
复制
2
1

可见,在线程里创建 connection 和 channel 是正常的,且线程执行顺序不一定。但是有个问题,我们不可能在每个线程里都创建一次 connection 和 channel ,这样其实是会浪费 cpu 的

示例四:线程加锁

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wu

import pika
import threading
import time

lock = threading.Lock()

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
channel = connection.channel()


def loop1():
    with lock:
        channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
        print(1)
        time.sleep(2)


def loop2():
    with lock:
        channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
        print(2)


threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

代码语言:javascript
复制
1
2

加锁之后,代码也是正常执行的,但是有个问题,加锁后,也起不到线程的所用了,且很可以明显看到, loop2 一定是在 loop1 之后执行的,且会受到 loop1 的阻塞影响。因为用了锁之后,loop1 函数的 with lock 下的全部逻辑可以看成是一个原子,整个原子被锁住了。与 python 多线程的 GIL 不一样,python 的 GIL 可能是在某个地方锁住的,这里的是在哪块逻辑下加锁,哪块就会被锁。

很明显,这种加锁起不到多线程的作用,也不是我们要的。

示例五:使用线程局部变量

关于线程局部变量:使用 threading 模块中的 local() 函数,可以为各个线程创建完全属于它们自己的变量(又称线程局部变量)。正是由于各个线程操作的是属于自己的变量,该资源属于各个线程的私有资源,因此可以从根本上杜绝发生数据同步问题。

使用线程池 + 线程局部变量

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 15:37
# @Author : wu
import pika
import threading
from concurrent.futures import ThreadPoolExecutor

local = threading.local()


def init():
    c = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
    channel = c.channel()
    return channel


def loop1(n):
    if not hasattr(local, 'channel'):
        channel = init()
        thread_id = threading.currentThread().ident
        print(f'线程:{thread_id} 创建 channel')
        local.channel = channel
    local.channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(n, end='\n')


with ThreadPoolExecutor(max_workers=5) as t:
    for i in range(10):
        t.submit(loop1, i)

输出

代码语言:javascript
复制
线程:123145590636544 创建 channel
2
5
6
7
8
9
线程:123145580126208 创建 channel
线程:123145574871040 创建 channel
1
0
线程:123145611657216 创建 channel
线程:123145601146880 创建 channel
4
3

可以看到,由于使用了线程池,且设置 max_workers=5,所以最多会有5个线程。而每个线程因为都有自己的局部变量锁,互不影响,因此需要分别创建5个 pika 的 channel 通道,但这样带来的好处就是,channel 的创建次数只会跟 max_workers 一致,因为在线程池中,一个线程执行任务后,会继续执行其他任务,还是同一个线程,而局部变量中已经存储了 channel这个值,因此可重复使用。

更重要的是,由于每个线程,都是自己创建的channel,互补影响,因此是安全的。这就达到了我们,既想线程安全,又不想每次都频繁创建 connectionchannel,我们需要做的只是,控制好线程池的数量即可。

示例六: 使用 deferToThread

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 15:37
# @Author : wu
import pika
import threading

from twisted.internet import reactor
from twisted.internet.threads import deferToThread

local = threading.local()


def init():
    c = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
    channel = c.channel()
    return channel


def loop1(n):
    thread_id = threading.currentThread().ident
    if not hasattr(local, 'channel'):
        channel = init()
        print(f'线程:{thread_id} 创建 channel')
        local.channel = channel
    local.channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(n, end='\n')
    return f'线程:{thread_id}发送成功'


def pprint(res):
    print(res)


def run():
    for i in range(50):
        d = deferToThread(loop1, i)
        d.addCallback(pprint)


if __name__ == '__main__':
    run()
    reactor.run()

这个示例与上一示例基本一致,只是将 concurrent.futures.ThreadPoolExecutor 换成了 twistedreactordeferToThread

这种方式,可创建一个 reactor 的环,通过将 deferToThread 的实例加入 reactor 去执行,然后成功后回调结果,这也是异步的一种方式。

总结

  • Pika 并不是线程安全的,应该在每个线程里,都使用各种的 BlockingConnection
  • 为了避免每次都创建 connection,在多线程中,最好是使用线程池+ threading.local() 结合使用,线程池可以避免线程的频繁创建,threading.local()避免了pika connection 的频繁创建。参考实例五

来源:http://www.jayden5.cn/2020/11/24/pika-%E7%BA%BF%E7%A8%8B%E4%B8%8D%E5%AE%89%E5%85%A8/

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-06-21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 示例一:线程外创建 connection,线程里创建 channel
  • 示例二:线程外创建 connection 和 channel
  • 示例三:线程里创建 connection 和 channel
  • 示例四:线程加锁
  • 示例五:使用线程局部变量
  • 示例六: 使用 deferToThread
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档