前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程与多进程 | 多线程

多线程与多进程 | 多线程

作者头像
数据STUDIO
发布2021-06-24 10:32:37
9520
发布2021-06-24 10:32:37
举报
文章被收录于专栏:数据STUDIO

进程(Process)

是计算机中已运行程序的实体。进程与程序不同,程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行的实体。

计算机程序只不过是磁盘中可执行的二进制(或其他类型)的数据。它们只有在被读取到内存中,被操作系统调用时才开始它们的生命周期。

进程(亦称为重量级进程)是程序的一次执行。每个进程都有自己的地址空间、内存、数据栈及其他记录其运行轨迹的辅助数据。操作系统管理在其上运行所有的进程,并为这些进程公平分配时间、进程也可以通过fork和spawn操作来完成其他的任务。

不过进程有自己的内存空间,数据栈等,所以只能使用进程间通讯(interprocess communication, IPC),而不能直接共享信息。

线程(Thread)

是操作系统能够进行运算调度的最小单位。他被包含在进程中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

线程(亦称为轻量级进程)跟进程有些相似,不同的是:所有的线程运行在同一个进程中,共享相同的运行环境。它们可以被想象成是在主进程或“主线程”中并行运行的“迷你进程”。

线程有开始,顺序执行和结束三部分。它有一个自己的指令指针,记录自己运行到什么地方。线程的运行可能被抢占(中断)或暂时的被挂起(睡眠),让其他线程运行,这叫做让步。

一个进程中的各个线程之间共享同一片数据空间,所以线程之间可以比进程之间更方便地共享数据以及相互通讯。线程一般都是并发执行的,正是由于这种并行和数据共享的机制使得多个任务的合作变成可能。

进程好比一列火车,多个进程就是多列火车。线程好比车厢,多个线程就是一条火车的多个车厢。线程需要在进程下运行,就好比单独的车厢无法行驶一样。

创建线程

Python3 线程中常用的两个模块为:

  • _thread
  • threading(推荐使用)

thread 模块已被废弃。用户可以使用 threading 模块代替。所以,在 Python3 中不能再使用"thread" 模块。为了兼容性,Python3 将 thread 重命名为 "_thread"。本文中将介绍使用threading模块。

threading模块

表示控制线程的类。这个类可以以有限的方式安全地子类化。有两种方法 指定活动:通过传递一个可调用对象给构造函数,或通过重写子类中的run()方法。

代码语言:javascript
复制
threading.Thread(
    group=None,
    target=None,
    name=None,
    args=(),
    kwargs=None,
    *,
    daemon=None,
)

**group:**值为None,为以后版本而保留。 **target:**表示一个可调用对象,线程启动时,run()方法将调用此对象,默认为None,表示不用调用任何内容。 **name:**表示当前线程名称,默认创建一个"Thread-N"格式的唯一名称。 **args:**表示传递给target函数的参数元组。 **kwargs:**表示传递给target函数的参数字典。


threading模块对象

threading模块对象

描述

Thread

表示一个线程的执行的对象

Lock

锁原始对象(跟thread模块里的锁对象相同)

RLock

可重入锁对象。使单线程可以再次获得已经获得了的锁(递归锁定)

Condition

条件变量对象能让一个线程停下来,等待其他线程满足了某个“条件”。如状态的改变或值的改变

Event

通用的条件变量。多个线程可以等待某个时间的发生,在事件发生后,所有的线程都被激活

Semaphore

为等待锁的线程提供一个类似“等候室”的结构

BoundedSemaphore

与Semaphore类似,只是它不允许超过初始值

Timer

与thread类似,只是它要等待一段时间后才开始运行

Thread类

线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:

函数

描述

start()

启动线程活动。

run()

定义线程的功能的函数(一般会被子类重写),用以表示线程活动的方法。

join(timeout=None)

程序挂起,直到线程结束;如果给了timeout,则最多阻塞timeout秒。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。

getName()

返回线程的名字

setName(name)

设置线程的名字

isAlive()

布尔标志,表示这个线程是否还在运行中

isDaemon()

返回线程的daemon标志

setDaemon(daemonic)

把线程的daemon标志设为daemonic(一定要在调用start()函数前调用)

用Thread类,可以用多种方法来创建线程。

  • 创建一个Thread的实例,传给它一个函数;
  • 创建一个Thread的实例,传给它一个可调用的类对象;
  • 从Thread派生出一个子类,创建一个这个子类的实例。

threading模块创建线程

创建Thread实例,传给它一个函数

代码语言:javascript
复制
import threading
import time

def run():
   # 每个子线程执行输出三次
   for i in range(3):
       time.sleep(2)
        name = threading.current_thread().name
        _time = time.ctime(time.time())
        print("Thread name is {}, and time is {}\n".format(name, _time))
        
if __name__ == "__main__":
   print("主线程开始".center(15,'-'))
    # 创建四个线程,存入列表
    num = 4
    threads = [threading.Thread(target=run) for i in range(num)]
    for thread in threads:
       # 开启线程
       thread.start()
    for thread in threads:
       # 等待子线程结束
       thread.join()
    print("主线程结束".center(15,'-'))

结果

-----主线程开始----- Thread name is Thread-1, and time is Fri Dec 19 14:55:23 2020 Thread name is Thread-2, and time is Fri Dec 19 14:55:23 2020 Thread name is Thread-3, and time is Fri Dec 19 14:55:23 2020 Thread name is Thread-4, and time is Fri Dec 19 14:55:23 2020 Thread name is Thread-4, and time is Fri Dec 19 14:55:25 2020 Thread name is Thread-2, and time is Fri Dec 19 14:55:25 2020 Thread name is Thread-3, and time is Fri Dec 19 14:55:25 2020 Thread name is Thread-1, and time is Fri Dec 19 14:55:25 2020 Thread name is Thread-4, and time is Fri Dec 19 14:55:27 2020 Thread name is Thread-3, and time is Fri Dec 19 14:55:27 2020 Thread name is Thread-2, and time is Fri Dec 19 14:55:27 2020 Thread name is Thread-1, and time is Fri Dec 19 14:55:27 2020 -----主线程结束-----

创建一个Thread实例,传给它一个可调用的类对象

与传一个函数很相似,但它是传一个可调用的类的实例供线程启动的时候执行,这是多线程编程的一个更为面向对象的方法。

代码语言:javascript
复制
import threading
import time

class threadFunc:
    def __init__(self, threadID, counter):
        self.name = threadID
        self.counter = counter
    def __call__(self):
        print ("开始子线程{}\n".format(self.name))
        print_time(self.name, 5, self.counter)
        print ("结束子线程{}".format(self.name))
            
def print_time(threadName, delay, counter):
    while counter:
        time.sleep(delay)
        print ("子线程{}: {}\n".format(threadName, time.ctime(time.time())))
        counter -= 1

if __name__ == "__main__":
    print("主线程开始".center(15,'-'))
    # 创建四个线程,存入列表
    counter = 2
    num = 4
    threads = [threading.Thread(target=threadFunc(i, counter)) for i in range(num)]
    for thread in threads:
    # 开启线程
        thread.start()
    for thread in threads:
      # 等待子线程结束
        thread.join()
    print("主线程结束".center(15,'-'))

-----主线程开始----- 开始子线程0 开始子线程1 开始子线程2 开始子线程3 子线程0: Fri Dec 18 16:44:38 2020 子线程1: Fri Dec 18 16:44:38 2020 子线程2: Fri Dec 18 16:44:38 2020 子线程3: Fri Dec 18 16:44:38 2020 子线程0: Fri Dec 18 16:44:43 2020 子线程1: Fri Dec 18 16:44:43 2020 子线程2: Fri Dec 18 16:44:43 2020 子线程3: Fri Dec 18 16:44:43 2020 结束子线程0 结束子线程2 结束子线程3 结束子线程1 -----主线程结束-----

使用Thread子类创建线程

Thread派生一个子类,创建这个子类的实例

代码语言:javascript
复制
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
    def run(self):
        print ("开始线程:{}\n".format(self.name))
        print_time(self.name, 5, self.counter)
        print ("退出线程:{}\n".format( self.name))

def print_time(threadName, delay, counter):
    while counter:
        if exitFlag:
            threadName.exit()
        time.sleep(delay)
        print ("{}: {}\n".format(threadName, time.ctime(time.time())))
        counter -= 1

if __name__ == "__main__":
    # 实例化创建新线程
    thread1 = myThread(1, "Thread-1", 5)
    thread2 = myThread(2, "Thread-2", 5)

    # 开启新线程
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print ("退出主线程")

结果

开始线程:Thread-1 开始线程:Thread-2 Thread-1: Fri Dec 19 15:11:46 2020 Thread-2: Fri Dec 19 15:11:46 2020 Thread-1: Fri Dec 19 15:11:51 2020 Thread-2: Fri Dec 19 15:11:51 2020 Thread-1: Fri Dec 19 15:11:56 2020 Thread-2: Fri Dec 19 15:11:56 2020 Thread-1: Fri Dec 19 15:12:01 2020 Thread-2: Fri Dec 19 15:12:01 2020 Thread-1: Fri Dec 19 15:12:06 2020 Thread-2: Fri Dec 19 15:12:06 2020 退出线程:Thread-1 退出线程:Thread-2 退出主线程

退出线程

当一个线程结束计算,它就退出了。也可以使用Python退出进程的标准方法,如sys.exit()或抛出一个SystemExit异常等。不过,你不可以直接杀掉Kill一个线程。

threading模块能确保所有"重要的"子线程都退出后,进程才会结束。

主线程应该是一个好的管理者,它要了解每个线程都要做些什么事,线程都需要什么数据和什么参数,以及在线程结束的时候,它们都提供了什么结果。这样,主线程就可以把各个线程的结果组成一个有意义的最后结果。

输出

-----主线程开始----- 子线程Thread-5执行任务0 子线程Thread-4执行任务0 子线程Thread-5执行任务1 子线程Thread-4执行任务1 子线程Thread-4执行任务2 子线程Thread-5执行任务2 -----主线程结束----- An exception has occurred, use %tb to see the full traceback. SystemExit: 0

互斥锁(Mutex)

先看下面的例子:

代码语言:javascript
复制
from threading import Thread
import time

def plus():
    time.sleep(2)
    print('thread-1 start.'.center(20, '-'))
    global num
    num += 100
    print(f'num +=100 = {num}')
    print('thread-1 end.'.center(20, '-'))

def minus():
    time.sleep(2)
    print('thread-2 start.'.center(20, '-'))
    global num
    num -= 100
    print(f'num -=100 = {num}')
    print('thread-2 end.'.center(20, '-'))

num = 200
if __name__ == "__main__":
    print("MainThread Start.".center(30, '-'))
    print(f'num = {num}')
    thread1 = Thread(target=plus)
    thread2 = Thread(target=minus)
    thread1.start()
    thread2.start()
    
    thread1.join()
    thread2.join()
    print("MainThread End.".center(30, '-'))

结果

------MainThread Start.------- num = 200 --thread-2 start.--- num -=100 = 100 ---thread-2 end.---- --thread-1 start.--- num +=100 = 200 ---thread-1 end.---- -------MainThread End.--------

定义一个全球变量num = 100,两个线程分别对num进行处理,第一个num+100 = 200,第二个对num - 100 =200,说明两个线程之间是可以共享数据。

在上面的这种的情况下,就需要对全局变量通过一定的方式保护其不被随意修改,不然会造成多线程之间对全局变量使用的混乱。那么保护其不被任意修改,需要把这个资源"锁"住,只允许线程依次排队进去获取这个资源。

互斥锁(Mutual exclusion),互斥锁是一种简单的加锁的方法来控制对共享资源的访问,可以防止多个线程同时读写某一块内存区域。互斥锁只有两种状态,即上锁( lock )和解锁( unlock )

互斥锁的特点

  1. 原子性:把一个互斥量锁定为一个原子操作,这保证了如果一个线程锁定了一个互斥量,没有其他线程在同一时间可以成功锁定这个互斥量;
  2. 唯一性:如果一个线程锁定了一个互斥量,在它解除锁定之前,没有其他线程可以锁定这个互斥量;
  3. 非繁忙等待:如果一个线程已经锁定了一个互斥量,第二个线程又试图去锁定这个互斥量,则第二个线程将被挂起(不占用任何cpu资源),直到第一个线程解除对这个互斥量的锁定为止,第二个线程则被唤醒并继续执行,同时锁定这个互斥量。

互斥锁的操作流程

  1. 在访问共享资源后临界区域前,对互斥锁进行加锁;
  2. 在访问完成后释放互斥锁导上的锁。在访问完成后释放互斥锁导上的锁;
  3. 对互斥锁进行加锁后,任何其他试图再次对互斥锁加锁的线程将会被阻塞,直到锁被释放。对互斥锁进行加锁后,任何其他试图再次对互斥锁加锁的线程将会被阻塞,直到锁被释放。

互斥锁的使用

threading模块中使用Lock类处理锁定。

代码语言:javascript
复制
# 创建锁
lock = threading.Lock()
# 锁定
lock.acquire()
# 释放锁
lock.release()

acquire(blocking=True, timeout=-1) -> bool 获取锁定,必要时需要阻塞到锁定释放为止。如果设置"blocking=False",则当无法获取锁定时将立即返回False,成功则返回True。 release() 释放一个锁定。当锁定处于未锁定时将出现错误。

代码语言:javascript
复制
from threading import Thread, Lock
import time
num = 100

def task():
    global num
    lock.acquire()          # 上锁
    temp = num
    time.sleep(1)
    num = temp - 1          # 实际任务,将全局变量num减1
    print(f"Tasks have successfully completed, and now num is {num}")
    lock.release()           # 释放锁
    
if __name__ == "__main__":
    lock = Lock()
    # 实例化10个线程并已列表推导式形式保存在列表中
    threads = [Thread(target=task) for _ in range(10)]
    for thread in threads:
        thread.start()            # 创建线程
    for thread in threads:
        thread.join()        # 等待子线程结束

结果

Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 98 Tasks have successfully completed, and now num is 97 Tasks have successfully completed, and now num is 96 Tasks have successfully completed, and now num is 95 Tasks have successfully completed, and now num is 94 Tasks have successfully completed, and now num is 93 Tasks have successfully completed, and now num is 92 Tasks have successfully completed, and now num is 91 Tasks have successfully completed, and now num is 90

如果不使用锁,则结果为

Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99

没有加锁的程序中,十个任务同时竞争同一个资源,每个线程都是第一时间拿到num并都执行task()「num - 1」。使用lock.acquire()函数实现资源锁定后,第一获取资源的线程锁定后,其他线程等待lock.release()解锁。所以每次只有一个线程执行task()函数,其结果是每次在上次的结果下减去1。

死锁

死锁产生的4个必要条件:

  1. 互斥:一个资源同一时刻只允许一个线程进行访问。
  2. 占有未释放:一个线程占有资源,且没有释放资源。
  3. 不可抢占:一个已经占有资源的线程无法抢占到其他线程拥有的资源。
  4. 循环等待:两个或者两个以上的线程,本身拥有资源,不释放资源,并且同时尝试获得其他线程所持有的资源,这种资源的申请关系形成一个闭环的链条。
死锁的一个例子
代码语言:javascript
复制
# 线程死锁,在classA和classB相互调用对方的方法,并且相互等待对方释放资源
class ClassA:
    def __init__(self):
        self.lock=threading.RLock()

    # 得到classA的锁,试图得到classB的锁
    def infoA(self, b):
        self.lock.acquire()
        time.sleep(10)
        b.info()
        self.lock.release()

    def info(self):
        self.lock.acquire()
        print("this is ClassA info")
        self.lock.release()

class ClassB:
    def __init__(self):
        self.lock = threading.RLock()

    # 得到classB的锁,试图得到classA的锁
    def infoB(self, a):
        self.lock.acquire()
        time.sleep(10)
        a.info()
        self.lock.release()

    def info(self):
        self.lock.acquire()
        print("this is ClassB info")
        self.lock.release()


ca=ClassA()
cb=ClassB()

def funA():
    ca.infoA(cb)
def funB():
    cb.infoB(ca)

# 函数调用不是线程,不会死锁
# funA()
# funB()

t1=threading.Thread(target=funA).start()
t2=threading.Thread(target=funB).start()

多线程通信Queue队列

Queue队列

是一种特殊的结构,类似于列表。不过就像排队一样,队列中的元素一旦取出,那么就会从队列中删除。

生产者/消费者模式

某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。

单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。大概的结构如下图。

缓冲区的作用

1、解耦

假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

2、支持并发(concurrency)

生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。

使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。

其实当初这个模式,主要就是用来处理并发问题的。

3、支持忙闲不均

缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

队列Queue在线程间通信

定义一个生产者类Producet,定义一个消费者Consumer。生产者生产10件产品并依次存入队列,而消费者依次从队列中取出产品。

代码语言:javascript
复制
from queue import Queue
import random
import threading
import time

# 定义生产者类
class Producer(threading.Thread):
   def __init__(self, name, queue):
       threading.Thread.__init(self, name=name)
        self.data=queue
    def run(self):
       for i in range(5):
           print("生产者{}往队列中放入产品{}".format(self.getName(), i))   
            self.data.put(i)
            time.sleep(random.random())
        print("生产者{}完成任务。".format(self.getName())

# 定义消费者类
class Consumer(threading.Thread):
   def __init__(self, name, queue):
       threading.Thread.__init(self, name=name)
        self.data=queue
    def run(self):
       for i in range(5):
            val = self.data.get()
           print("消费者{}往队列中取出产品{}".format(self.getName(), i))   
            time.sleep(random.random())
        print("消费者{}完成任务。".format(self.getName())
        
if __name__ == "__main__":
  print("主线程开始".center(30,'-'))
    queue = Queue()
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
    print("主线程结束".center(30,'-'))
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据STUDIO 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 进程(Process)
  • 线程(Thread)
  • 创建线程
    • threading模块
      • threading模块对象
        • Thread类
          • threading模块创建线程
            • 使用Thread子类创建线程
              • 退出线程
          • 互斥锁(Mutex)
            • 互斥锁的特点
              • 互斥锁的操作流程
                • 互斥锁的使用
                  • 死锁的一个例子
              • 死锁
              • 多线程通信Queue队列
                • Queue队列
                  • 生产者/消费者模式
                    • 缓冲区的作用
                      • 1、解耦
                      • 2、支持并发(concurrency)
                      • 3、支持忙闲不均
                    • 队列Queue在线程间通信
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档