前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python3 多线程

Python3 多线程

作者头像
py3study
发布2020-01-02 16:32:45
7830
发布2020-01-02 16:32:45
举报
文章被收录于专栏:python3python3

两个概念:

  • 并发:假同时,一段时间内同时处理多个任务,单核都可以;
  • 并行:真同时,同时处理多个任务,必须多核。

主流操作系统上完成并发的手段有进程和线程,主流的编程语言提供了用户空间的调度:协程。Python 也不例外。

由于现在的操作系统上的进程越来越轻量,导致进程和线程之间的区别越来越少。事实上,Linux 并没有原生的线程,线程是通过进程实现的。

python 中每一个进程会启动一个解释器,而线程会共享一个解释器。

Python 中的线程是通过标准库 threading 实现的。而启动一个线程就是让这个线程执行一些逻辑,这些逻辑就对应一个函数。

代码语言:javascript
复制
>>> import threading
>>> def worker(): # 让多个线程来执行它
...     print('work')
...
>>> thread = threading.Thread(target=worker) # 创建了一个线程对象,target 参数是一个函数,即线程要执行的逻辑
>>> thread.start() # start 启动一个线程,执行完毕后,自动退出,Python 没有提供主动退出线程的方法
work

由于 python 没有提供退出线程的方法,因此我们一定不能在逻辑中定义死循环,不然线程无法退出。当然直接 kill -9 和刻意为之的另说。而像那种监听某个端口提供服务的进程,为了保证不退出,通常都会有一个 while True 的死循环。

上面只是启动了一个线程,很显然没什么屌用。启动多个线程的方式非常简单,就是在它的外面套一个 for 循环就可以了:

代码语言:javascript
复制
import time
import threading

def worker(num):
    time.sleep(1)
    print('work-{}'.format(num))

for i in range(5):
    t = threading.Thread(taret=worker, args=(i, )) # 启动了五个线程,要启动几个就循环几次
    t.start()

通过 args 给函数传递参数,也可以使用 kwargs 通过字典传递。结果是在等待一秒之后,所有线程同时输出了,并且在一个线程的换行符还没有打印出来的时候,下一个线程就输出了,这就涉及到线程安全的问题了。很显然,print 并不是线程安全的。

线程相比于进程更轻量,上下文切换的代价没有进程那么大,但即使如此,线程数量也不宜过多。

标识一个线程

threading.current_thread() 可以返回当前的线程对象。

代码语言:javascript
复制
>>> threading.Thread(target=lambda: print(threading.current_thread())).start()
<Thread(Thread-13, started 140007299499776)>

返回的线程对象我们可以通过一个变量进行接收:

代码语言:javascript
复制
thread = threading.current_thread()

它有很多属性和方法:

  • name:返回线程的名字;
  • ident:返回该线程的唯一标识符;
  • is_alive:告知该线程是否存活;
  • enumerate:可以通过循环它打印出所有的线程;

我们创建线程对象的时候是可以给它取名字的:

代码语言:javascript
复制
t = threading.Thread(target=worker, name='thread1')

这个 name 可以通过 logging 的 threadName 获得。

logging

前面提到过,print 并不是线程安全的,而 logging 模块线程安全。

代码语言:javascript
复制
>>> import logging
>>> logging.warning('hehe')
WARNING:root:hehe
>>> logging.info('hehe') # 默认只输出 warning 以上级别

我们可以对其进行一些基础的配置,让其记录 DEBUG 以上的级别,以及记录线程名:

代码语言:javascript
复制
>>> logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
>>> logging.info('hehe')
2017-09-23 15:41:36,868 INFO MainThread hehe

知道了它的简单用法之后,我们就可以使用多线程了:

代码语言:javascript
复制
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(lineno)d %(message)s')

def worker():
    logging.info(logging.info('work'))

for i in range(5):
    t = threading.Thread(target=worker)
    t.start()

使用 logging 就没有问题了,因此我们通常使用它来替代 print。

logging 还可以将异常的栈追踪信息记录下来,这在排查错误的时候非常方便:

代码语言:javascript
复制
import logging

try:
    config['DE']['xxx']
except Exception as e:
    logging.exception(e)
print('xxx')

daemon 与 non-daemon

daemon 在 linux 上是守护进程的意思,它始终在后台运行。而在 Python 中的 daemon 线程会在主线程退出之后退出。也就是说,如果不是 daemon 线程,主线程退出之后,非 daemon 线程还会继续执行,直到结束退出。

线程默认不是 daemon,如果想要设置为 daemon,那就在创建线程对象的时候,给它传递 daemon=True 即可。

代码语言:javascript
复制
>>> t = threading.Thread(target=worker, daemon=True)
>>> t.daemon
Out[20]: True

通过下面的例子证明之前的说法:

代码语言:javascript
复制
import time
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker():
    logging.info('start')
    time.sleep(2)
    logging.info('end')

if __name__ == '__main__':
    logging.info('start')
    t1 = threading.Thread(target=worker, name='non-daemon')
    t1.start()
    t2 = threading.Thread(target=worker, name='daemon', daemon=True)
    t2.start()
    logging.info('end')

# 执行结果
2017-09-24 04:08:49,027 INFO MainThread start
2017-09-24 04:08:49,028 INFO non-daemon start
2017-09-24 04:08:49,028 INFO daemon start
2017-09-24 04:08:49,028 INFO MainThread end
2017-09-24 04:08:51,031 INFO non-daemon end

执行上面的代码你会发现有的时候主线程退出了,但是 daemon 线程还会执行完成。这是因为虽然从日志中看到主线程退出,但是事实上主线程是没有退出的,它会等待非 daemon 线程执行完毕后才会退出,这样就给了 daemon 线程的执行时间了。当我们将 t1 给注释掉之后,就不可能出现主线程退出后,daemon 线程仍然执行的情况了。

如果我们在 t2.start() 之后增加一行 t2.join(),那即使它是 daemon 线程,主线程依然会等待它执行完毕后再退出。因为 join 会阻塞直到线程执行完毕。join 支持一个参数,那就是阻塞的秒数。t2.join(1) 表示只阻塞一秒,这个时候即使 t2 没有执行完成,主线程依然会退出。join 用的比较多,它并不占用 CPU 时间。

创建线程的另一种方法

上面创建线程的方法是通过实例化 Thread,我们还可以通过下面这种方式:

代码语言:javascript
复制
import logging
import threading

class Mythread(threading.Thread):
    def run(self):
        logging.warning('worker')

t = Mythread()
t.start()

通过继承 + 重写 run 方法来到达启动多线程的效果,run 等同于之前 target 指定的函数。但是 Python 中这种方法使用的很少。

当我们创建一个线程对象的时候,除了可以使用 start 启动它之外,还可以通过 run 来启动。如果不是以继承的方式创建线程,一个线程对象的 run 和 start 只能执行其中一个。

thread local

定义一个 thread local 对象。

代码语言:javascript
复制
ctx = threading.local()

这时的 ctx 没有任何属性,我们可以给它增加属性:

代码语言:javascript
复制
>>> ctx.data = 5
>>> ctx.data
Out[25]: 5

继续:

代码语言:javascript
复制
>>> data = 'abc' # 定义一个变量
>>> def worker():
...     logging.warning(data)
...     logging.warning(ctx.data)
...
>>> worker() # 执行没什么问题
WARNING:root:abc
WARNING:root:5
>>> threading.Thread(target=worker).start() # 但是通过线程执行就不行了
WARNING:root:abc # data 可以直接打出来
Exception in thread Thread-9:
Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/local/python3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-32-2e99199c517b>", line 3, in worker
    logging.warning(ctx.data)
AttributeError: '_thread._local' object has no attribute 'data' # 但是 ctx.data 提示没有

这是因为 ctx.data 是一个 thread local 的变量,我们可以给它赋值任意属性,但是只对当前线程可见。线程独享!

使用 run 方法,它会将 target 放在主线程中;start 则会将其放到子线程中,二者只能执行一个。

定时器

也可以称为延时执行。Python 中存在一种特殊的线程,可用于延迟执行。它继承自 Thread 类,因此它也是 Thread 对象。

代码语言:javascript
复制
>>> def worker():
...     logging.warning('worker')
...
>>> t = threading.Timer(interval=5, function=worker)
>>> t.start()
  • interval:延时多少秒执行,默认为 30;
  • function:等同于 target。

可以看到执行 start 方法后,五秒后才有输出。在等待的过程中,它可以通过 cancel() 终止。

它也可以设置线程名,只不过要这样:

代码语言:javascript
复制
>>> t = threading.Timer(interval=5, function=worker)
>>> t.name = 'Timer'
>>> t.deamon = True # 设置是否为 daemon

当 function 指定的函数开始执行的时候,无法通过 cancel() 终止。

Timer 的定时执行功能很弱,如果真的有这方面的需要,可以使用 APSchedule。

event

第一种线程同步的方式。同步意味着阻塞,如果线程之间没有联系,完全没有必要使用同步。有这么一种需求:worker 线程做一些事情,当它完成之后,通知 boss 线程,由 boss 完成处理后续工作。这可能并不难实现,但是 boss 线程要统计 worker 线程的执行时间呢?

这就要用到线程间通信的机制了,最简单的是 event:

代码语言:javascript
复制
>>> event = threading.Event()
>>> event.set()
>>> event.wait()
Out[8]: True

它是一个 threading.Event 的对象,有 set 和 wait 这两个方法。wait 会阻塞线程直到 set 方法被调用。 有了这两种方法之后,我们就可以完成上面的需求了:

代码语言:javascript
复制
import time
import random
import logging
import datetime
import threading

def worker(event: threading.Event):
    time.sleep(random.randint(1, 5))
    event.set()

def boss(event: threading.Event):
    start = datetime.datetime.now()
    event.wait()
    logging.warning('worker exit {}'.format(datetime.datetime.now() - start))

def start():
    event = threading.Event()
    b = threading.Thread(target=boss, args=(event,), name='boss')
    b.start()
    for x in range(5):
        threading.Thread(target=worker, args=(event,), name='worker').start()

start()

五个 worker 线程,谁先执行完成就谁执行 event.set(),一旦 event.set 被执行,boss 线程也就会继续执行并输出日志了。但是会有一个问题,由于是随机 sleep 时间,也就是说最快 boss 线程可以一秒就退出,但是还有四个 worker 线程还在执行,这四个线程拉长了整个脚本的执行时间。

再做修改:

代码语言:javascript
复制
import time
import random
import logging
import datetime
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(event: threading.Event):
    s = random.randint(1, 5)
    event.wait(s) # 先阻塞
    event.set() # 一下全放开了
    logging.info('sleep {}'.format(s))

def boss(event: threading.Event):
    start = datetime.datetime.now()
    event.wait()
    logging.info('worker exit {}'.format(datetime.datetime.now() - start))

def start():
    event = threading.Event()
    threading.Thread(target=boss, args=(event,), name='boss').start()
    for x in range(5):
        threading.Thread(target=worker, args=(event,), name='worker-{}'.format(x)).start()

start()

# 执行结果
2017-09-25 06:15:42,114 INFO worker-0 sleep 2
2017-09-25 06:15:42,115 INFO boss worker exit 0:00:02.004014
2017-09-25 06:15:42,116 INFO worker-1 sleep 5
2017-09-25 06:15:42,116 INFO worker-2 sleep 4
2017-09-25 06:15:42,116 INFO worker-3 sleep 3
2017-09-25 06:15:42,117 INFO worker-4 sleep 2

可以看到都在同一秒退出了,这是因为 wait 可以指定超时时间,时间一到它就不再阻塞。这样阻塞时间最短的那个线程就会执行 set,这样一来所有阻塞的线程同时放开了,于是同一时间都执行完成了。因此,wait 会阻塞线程直到 set 方法被调用,或者超时时间到。

event 可以被多个线程所持有,多个线程可以同时被阻塞,一旦其中一个线程执行了 set,那么所有的线程都不再阻塞。event 可以在线程之间发送信号,通常用于某个线程需要其他线程处理某些动作之后才能启动。

event 还有一个特性,如果先 set 然后 wait,不管有没有指定超时,它都瞬间返回 True(因为阻塞被放开,所以无法再阻塞);而如果直接 wait,且给它一个超时时间,那么超时完成之后,它会返回 False。我们可以根据这个特点来完成定时的操作。

代码语言:javascript
复制
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(event: threading.Event):
    while not event.wait(3):
        logging.info('running')

event = threading.Event()
threading.Thread(target=worker, args=(event,)).start()

每三秒会输出一次日志,会无限输出下去。但是如果执行 event.set() 就会终止死循环。

event 还有一些方法:

  • is_set:用来判断有没有 set 过;
  • clean:清除 set 标志,通常用来做线程退出的条件。 def worker(event): while not event.is_set(): pass

wait 会主动让出 CPU 时间片,time.sleep 却不会。假如它们分到了 10ms 的 CPU 时间,都使用了 5ms,那么剩余的 5ms wait 会让给别人,而 sleep 会自己用完。因此我们会使用 wait 而不是 sleep。

实现定时器

延时执行。

代码语言:javascript
复制
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Timer:
    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.event = threading.Event()
        self.thread = threading.Thread(target=self.__thread)

    def __thread(self):
        if not self.event.wait(self.interval):
            self.function(*self.args, **self.kwargs)

    def start(self):
        self.thread.start()

    def cancel(self):
        self.event.set()

def worker():
    logging.info('running')

t = Timer(interval=2, function=worker)
t.start()

Lock

第二种线程同步的方式。lock 用来保护共享资源,其余几种线程同步的方式都是用了它。

代码语言:javascript
复制
import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Counter:
    def __init__(self):
        self.__val = 0

    @property
    def value(self):
        return self.__val

    def inc(self):
        self.__val += 1

    def dec(self):
        self.__val -= 1

counter = Counter()

def fn():
    if random.choice([-1, 1]) > 0:
        logging.info('inc')
        counter.inc()
    else:
        logging.info('dec')
        counter.dec()

for x in range(10):
    threading.Thread(target=fn).start()

print(counter.value)

上面的代码即使你知道它加了多少次减了多少次,但你不能肯定它的结果,这是因为资源的争用。Lock 对象可用于解决这种问题:

代码语言:javascript
复制
>>> lock = threading.Lock()
>>> lock.acquire()
Out[4]: True

对于 lock 实例,只能调用一次 acquire 方法,再次调用会被阻塞,直到 release 方法被调用。根据它的这种特性,可用来改造之前的 Counter。

代码语言:javascript
复制
class Counter:
    def __init__(self):
        self.__val = 0
        self._lock = threading.Lock()

    @property
    def value(self):
        return self.__val

    def inc(self):
        self._lock.acquire()
        self.__val += 1
        self._lock.release()

    def dec(self):
        self._lock.acquire()
        self.__val -= 1
        self._lock.release()

这样一来,不管有多少个线程,同一时间只会有一个线程能够修改 __val。但是这样会有一个问题,如果执行加减的时候发生了异常(虽然这里不会),那么 release 永远就不会执行,那么就会形成死锁,因此我们要使用 try finally。

代码语言:javascript
复制
  def inc(self):
        try:
            self._lock.acquire()
            self.__val += 1
        finally:
            self._lock.release()

从上面这种结构中我们可以联想到 with,事实上它是支持 with 的,因此我们可以定义的更为简单:

def inc(self):

代码语言:javascript
复制
with self._lock:
    self.__val += 1

凡是用锁的地方,一定要在 finally 中使用 release,否则就会有锁死的可能性。

而对于读来说,如果不加锁,就会存在脏读的可能性,就看能不能忍受了。通过加锁之后,Counter 类就变成线程安全了,我们可以放心的使用。

锁是并发的难点,它会将并发变为串行,掌握了锁,并发就没有丝毫问题了。那么何时需要加锁?凡是有共享资源的地方都要加锁。

lock 对象可以接收两个参数:

  • blocking:当再次加锁时,如果它为 False,那么不会阻塞,而是返回 False;
  • timeout:如果 blocking 为 True,timeout 大于等于 0 会阻塞到超时,并返回 False。

预先启动 10 个线程处理一些任务,当其中一个线程在处理其中一个任务时,其他线程可以处理其他任务,这时候就可以用到非阻塞锁。第一个线程对该任务加非阻塞锁,由于之前没有加过锁,因此可以加上。第二个线程再加的时候就加不上了,并且返回 False,这时就可以让它跳过这个任务去执行下一个任务了。

代码语言:javascript
复制
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(tasks):
    for task in tasks:
        # 第一个执行加锁的线程可以锁,它的值为 True。由于锁住了,剩下的九个线程执行的时候它的值都为 False
        # 因此 loggi.info 语句只会执行 10 次
        if task.lock.acquire(False):
            logging.info(task.name)

class Task:
    def __init__(self, name):
        self.name = name
        self.lock = threading.Lock()

tasks = [Task(x) for x in range(10)]

for i in range(5):
    threading.Thread(target=worker, args=(tasks,), name='work-{}'.format(i)).start()

如果任务有先后顺序的话,就只能串行了。

RLock

可重入锁在同一个线程内可多次加锁,但是只能有一个线程成功,并且 acquire 几次,就需要 release 几次。

代码语言:javascript
复制
>>> rlock = threading.RLock()
>>> rlock.acquire()
Out[13]: True
>>> rlock.acquire()
Out[14]: True
>>> rlock.release()
>>> rlock.release()

condition

第三种线程同步的方式。通常用于生产者消费者模式,生产者生产消息之后,使用 notify 和 notify_all 通知消费者进行消费。而消费者使用 wait 方法阻塞等待生产者的通知。

代码语言:javascript
复制
import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond = threading.Condition()

    def consumer(self):
        while not self.event.wait(1):
            with self.cond:
                self.cond.wait() # 会阻塞,直到 notifyAll 被执行
                logging.info(self.data)

    def producer(self):
        for _ in range(10):
            data = random.randint(0, 100)
            logging.info(data)
            self.data = data
            with self.cond:
                self.cond.notify_all()
            self.event.wait(1)
        self.event.set()

d = Dispatcher()
p = threading.Thread(target=d.producer, name='producer')
c = threading.Thread(target=d.consumer, name='consumer')
p.start()
c.start()

有生产者修改共享资源,然后通知消费者进行消费。

  • wait:会阻塞,直到被 notify 唤醒;
  • notifyAll:老版的驼峰写法,现已改为下面的,但为了兼容仍然存在;
  • notify_all:用于通知所有 wait 的线程,可以理解为广播;
  • notify:接收一个数字,表示唤醒多少个 wait 线程,默认为 1。可以理解为单播。

比如下面的示例中,虽然启动了四个消费者进程,但是只允许两个同时消费,至于是哪两个就不得而知了。

代码语言:javascript
复制
import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond = threading.Condition()

    def consumer(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()
                logging.info(self.data)

    def producer(self):
        for _ in range(10):
            data = random.randint(0, 100)
            logging.info(data)
            self.data = data
            with self.cond:
                self.cond.notify(2)
            self.event.wait(1)
        self.event.set()

d = Dispatcher()
p = threading.Thread(target=d.producer, name='producer')
for i in range(4):
    threading.Thread(target=d.consumer, name='consumer-{}'.format(i)).start()
p.start()

按理来说,因为有锁的存在,所以只有在消费者的 with 代码块执行完毕,锁释放之后,生产者才能进入自己的 with 代码块。这样就能够保证,消费者只有在消费完毕之后生产者才能继续生产。但是我在运行过程中生产者根本不会等待消费者消费,它自己一个劲的跑。

无论 notify、notify_all 还是 wait,都必须先 acquire,完成之后必须确保 release,因此通常使用 with 语法。

barrier

第四种线程同步的方式,栅栏的意思,只有凑齐一拨人之后才往下走。从下面这段代码中就能理解它的作用:

代码语言:javascript
复制
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(barrier: threading.Barrier):
    logging.info('waiting for {} threads'.format(barrier.n_waiting))
    try:
        # 上面的代码各个线程什么时候执行,怎么执行都无所谓
        # 但是所有线程都会在这里同时等待,只有所有线程都执行到这了,才同时执行下面的代码
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        logging.warning('aborting')
    else:
        logging.info('after barrier {}'.format(worker_id))

# 实例化的时候指定拦多少个线程,如果启动了四个线程,只要三个到齐了就可以同时往下走了
barrier = threading.Barrier(3)
for i in range(3):
    threading.Thread(target=worker, args=(barrier,), name='worker-{}'.format(i)).start()
logging.info('start')

barrier 对象的一些属性和方法:

  • wait:阻塞线程,它可以指定超时时间,超时时间一到抛出 BrokenBarrierError 异常。如果执行过 abort 方法,那么再执行 wait 也会抛出 BrokenBarrierError 异常;
  • reset:清除对象执行 abort 的痕迹。执行 abort 后执行 rest,接着执行 wait 就不会抛异常了;
  • n_waiting:当前有多少个线程在等待;
  • abort:通知已经在等待的线程不必再等了,不能因为它一个而让其他线程在那傻等。而一旦执行了这个方法, wait 就会抛出 BrokenBarrierError 异常,因此不处于 wait 状态的线程是不会抛出这个异常的。

适用场景:比如有十种工作,每个线程负责一种,只有这十个线程都初始化完成后才能工作。

semaphore

最后五种线程同步的方式。信号量和锁很像,锁是为 1 的信号量。

代码语言:javascript
复制
# 创建一个为 3 的信号量
>>> s = threading.Semaphore(3)
>>> s.acquire()
Out[84]: True
>>> s.acquire(False)
Out[85]: True
>>> s.acquire(False)
Out[86]: True
>>> s.acquire(False)
Out[87]: False

它可以锁多次,上面锁了三次都没有问题,等到第四次的时候就不行了。由于锁只能锁一次,所以它是为 1 的信号量。RLock 也能锁多次,它是它只能用在同一个线程上,信号量却可以在多个线程中使用。

创建一个连接池的时候可以用到它:

代码语言:javascript
复制
import time
import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Pool:
    def __init__(self, num):
        self.num = num # 指定池子的连接数
        self.conns = [self._make_connect(x) for x in range(num)]
        self.s = threading.Semaphore(num)

    # 这个函数是拿到连接之后做的操作
    def _make_connect(self, name):
        return name

    # 从池子中取出一个连接
    def get(self):
        self.s.acquire()
        return self.conns.pop()

    def return_resource(self, conn):
        # 执行完毕后,将连接放回池子中
        self.conns.insert(0, conn)
        self.s.release()

def worker(pool):
    logging.info('started')
    name = pool.get()
    logging.info('get connect {}'.format(name))
    time.sleep(random.randint(1, 3))
    pool.return_resource(name)
    logging.info('return resource {}'.format(name))

pool = Pool(3)
for i in range(5):
threading.Thread(target=worker, args=(pool,), name='worker-{}'.format(i)).start()

如果不使用信号量的话,我们还需要对池子是否为空进行判断。为什么将连接放回连接池中的 insert 操作不需要加锁呢?这是因为 GIL 的影响。

信号量也是对资源的保护,但是和锁不一样的地方在于,锁限制只有一个线程可以访问共享资源,而信号量限制指定个线程可以访问共享资源。事实上我们只需要使用信号量就可以了,因为锁本身就是信号量的一种。

queue

队列,它是进程间通信的一种方式,队列有三种:

  • FIFO:Queue.Queue(maxsize=0),先进先出,线程安全;
  • LIFO:Queue.LifoQueue(maxsize=0),后进先出;
  • Priority:Queue.PriorityQueue(maxsize=0),优先队列。

创建一个先进先出队列:

代码语言:javascript
复制
>>> import queue
>>> q = queue.Queue() # 队列长度无限

对象的属性和方法:

  • empty():判断队列是否为空(不可靠)。因为等你获取队列的长度时,可能已经有人往里面放入了数据;
  • full():队列是否满了(不可靠);
  • maxsize:查看队列的最大长度;
  • qsize():看到队列当前长度(不可靠);
  • clear():清空队列;
  • join():等到队列为空的时候,才进行操作;
  • put():往队列里面添加内容,可以为任意数据结构。put(self, item, block=True, timeout=None),block 表示是否为队列是否为阻塞状态。队列满了,再往里面加内容,队列会阻塞。如果不阻塞会返回一个异常,默认为阻塞状态;timeout 是阻塞的时间,如果队列满了,再往队列里面添加数据时,timeout 时间后会抛出异常。如果为 None(默认),它会一直阻塞,直到有线程从队列中取出数据;
  • get():从队列中取内容。如果是先进先出队列,它会取出最先存进去的数据。get(self, block=True, timeout=None),如果队列是空的,并且 timeout 为 None,它会一直阻塞,直到有线程往队列里面存入数据;
  • put_nowait(item):等效于 put(item, block=False);
  • get_nowait():等效 get(item, block=False)。

我们可以通过它来重写生产者消费者模型:

代码语言:javascript
复制
#!/usr/local/python3/bin/python3

import queue
import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def producer(queue: queue.Queue, event: threading.Event):
    while not event.wait(3):
        data = random.randint(0, 100)
        logging.info(data)
        queue.put(data)

def consumer(queue: queue.Queue, event: threading.Event):
    while not event.is_set():
        logging.info(queue.get())

q = queue.Queue()
e = threading.Event()
threading.Thread(target=consumer, args=(q, e), name='consumer').start()
threading.Thread(target=producer, args=(q, e), name='producer').start()

通过 e.set() 就能停止它。相对 condition 实现的生产者消费者模型,它的优势在于可以暂存数据,这在生产者和消费者速率不一致的时候很好用;而它的缺陷在于无法广播,无法通知多个线程同时消费一条消息。因为我们通常可以将它们结合起来使用。

取出队列中所有数据:

代码语言:javascript
复制
while not q.enpty():
    q.get()

GIL

全局解释器锁,这是 Python 争议很大的一个点。正是由于它的存在,在操作内置容器时,解释器会在解释器级别增加一个锁,因此 Python 所有内置容器(字典、列表等)都是线程安全的,多线程环境下使用没有丝毫问题。而导致的后果就是 Python 的并发性能很差。

Python 中 collection, logging 等标准库都是线程安全的。

concurrent.futures

官网地址,Python3.2 引入的异步模块。 创建一个线程池:

代码语言:javascript
复制
from concurrent import futures
pool = futures.ThreadPoolExecutor(max_workers=5)

pool 对象有三个方法。submit 用于执行一个函数:

代码语言:javascript
复制
>>> fut = pool.submit(lambda: 1+1) # 执行一段逻辑,也就是一个函数
>>> fut.result() # 获取执行结果
Out[116]: 2
>>> fut.done() # 查看函数是否执行完成
Out[117]: True
>>> fut.running() # 是否处于运行状态
Out[118]: False
>>> fut.cancel() # 一个已经开始运行的线程是无法结束的,没开始的(比如 pool 满了在阻塞)可以
Out[119]: False
>>> fut.exception() # 如果函数中产生了异常,可以通过它来获取异常的实例

传递参数:

pool.submit(self.create_vm, vm_attributes, extra_attributes, conns)

通过这种方式使用线程,不需要将数据发送到队列中。

进程池由 ProcessPoolExecutor 实现,它们简化了进程和线程的操作,并且对返回值和异常进行了处理。

建议使用 futures,虽然它无法设置线程名(3.6 之后可以)、daemon 等属性,但是问题不大。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-10-12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 标识一个线程
  • logging
  • daemon 与 non-daemon
  • 创建线程的另一种方法
  • thread local
  • 定时器
  • event
  • 实现定时器
  • Lock
  • RLock
  • condition
  • barrier
  • semaphore
  • queue
  • GIL
  • concurrent.futures
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档