前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python 多进程学习总结

Python 多进程学习总结

作者头像
somenzz
发布2020-12-10 10:31:12
4640
发布2020-12-10 10:31:12
举报
文章被收录于专栏:Python七号Python七号

阅读本文大概需要 9 分钟。

运维的过程中我们可能需要编写并发的应用程序,多进程的学习是很有必要的。我们都知道进程是操作系统进行资源分配和调度的基本单位,在单核 CPU 里,同一时刻只能运维单个进程,虽然我们仍可以同时运行多个程序,但是进程之间通过轮流占用 CPU 来执行的。进程有三种状态,他们之间的转化关系如下图所示:

随着技术的不断迭代更新, CPU 也越来越强大,目前家用电脑的 4 核 CPU 已经算低配置,服务器的 CPU 更是强劲,从 4 核心到 28 核,有的甚至有 64 核。因此为了充分发挥多核 CPU 的优势,提高程序的并发度,我们要使用多进程。 Python 内置的 multiprocessing 模块提供了对多进程的支持,下面我们一一介绍其用法。

创建进程的类 Process

multiprocessing 模块提供了一个创建进程的类 Process ,其创建进程有两种方法:第一一种方法是创建一个 Process 类的实例,并指定目标任务函数;第二种方法是自定义一个类,继承 Process 类,重写其 init ()方法和 run ()方法。 首先让我们使用第一种方法创建两个进程,并与单进程运行的时间做比较。

例子 1 :定义耗时任务,并对比单进程和多进程耗时

代码语言:javascript
复制
from multiprocessing import Process
import os
import time
# 子进程要执行的代码
def task_process(delay):
    num  = 0
    for i in range(delay*100000000):
        num+=i
    print(f"进程pid为 {os.getpid()},执行完成")

if __name__=='__main__':
    print( '父进程pid为 %s.' % os.getpid())
    t0 = time.time()
    task_process(3)
    task_process(3)
    t1 = time.time()
    print(f"顺序执行耗时 {t1-t0} ")
    p0 = Process(target=task_process, args=(3,))
    p1 = Process(target=task_process, args=(3,))
    t2 = time.time()
    p0.start();p1.start()
    p0.join();p1.join()
    t3 = time.time()
    print(f"多进程并发执行耗时 {t3-t2}")

上面的代码首先定义了一个上亿次数据累加的耗时函数,在运行结束时打印调用此函数的进程 ID ,在第 14,15 行是单进程执行,在第 18 - 19 行分别实例化了 Process 类,并指定目标函数为 task _ process ,在 21 , 22 行是双进程并行执行,执行完成后打印耗时。其运行结果如下所示:

代码语言:javascript
复制
父进程pid为 2116.
进程pid为 2116,执行完成
进程pid为 2116,执行完成
顺序执行耗时 37.13105368614197
进程pid为 60624,执行完成
进程pid为 41016,执行完成
多进程并发执行耗时 24.04837417602539

我们发现多进程执行相同的操作次数,耗时更少。

接下来我们使用第二种方法来实现例 1

例子 2:自定义一个类,继承 Process 类

代码语言:javascript
复制

from multiprocessing import Process
import os
import time

class MyProcess(Process):
    def __init__(self, delay):
        super().__init__()
        self.delay = delay

    # 子进程要执行的代码
    def run(self):
        num = 0
        #for i in range(self.delay * 100000000):
        for i in range(self.delay * 100000):
            num += i
        print(f"进程pid为 {os.getpid()},执行完成")

if __name__ == "__main__":
    print("父进程pid为 %s." % os.getpid())
    p0 = MyProcess(3)
    p1 = MyProcess(3)
    t0 = time.time()
    print(p0.authkey)
    p0.start()
    p1.start()
    p0.join()
    p1.join()
    t1 = time.time()
    print(f"多进程并发执行耗时 {t1-t0}")

注:进程 p0 , p1 调用 start()时,自动调用其 run()方法。

运行结果如下所示:

代码语言:javascript
复制
父进程pid为 57228.
进程pid为 59932,执行完成
进程pid为 61288,执行完成
多进程并发执行耗时 24.03329348564148

下面让我们来看一下 Process 类还有哪些功能可以使用,其类的构造函数原型如下:

代码语言:javascript
复制
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

参数说明:

  • target 表示调用对象,一般为函数,也可以为类。
  • args 表示调用对象的位置参数元组。
  • kwargs 表示调用对象的字典。
  • name 为进程的别名。
  • group 参数不使用,可忽略。

类提供的常用方法:

  • is_alive():返回进程是否是激活的。
  • join([timeout]) :阻塞进程,直到进程执行完成或超时或进程被终止。
  • run() :代表进程执行的任务函数,可被重写。
  • start() :激活进程。
  • terminate():终止进程。

属性:

  • authkey:字节码,进程的谁密钥.
  • daemon:父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。
  • exitcode:退出码,进程在运行时为None,如果为–N,表示被信号N结束。
  • name:获取进程名称.
  • pid:进程id。 下面举一个daemon的例子。 例3:不设置daemon属性(multi_process_no_daemo.py)
代码语言:javascript
复制
from multiprocessing import Process
import os
import time
# 子进程要执行的代码
def task_process(delay):
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行开始。")
    print(f"sleep {delay}s")
    time.sleep(delay)
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行结束。")

if __name__=='__main__':
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行开始。")
    p0 = Process(target=task_process, args=(3,))
    p0.start()
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行结束。")

这里我们没有使用P0.join()来阻塞进程,运行结果如下所示:

代码语言:javascript
复制
2018-07-11 21:13:30 父进程执行开始。
2018-07-11 21:13:30 父进程执行结束。
2018-07-11 21:13:30 子进程执行开始。
sleep 3s
2018-07-11 21:13:33 子进程执行结束。

可以看出,父进程并没有等待子进程运行完成就打印了退出信息,程序依然会等待子进程运行完成。 例4:设置daemon属性(multi_process_daemo.py)

代码语言:javascript
复制
from multiprocessing import Process
import os
import time
# 子进程要执行的代码
def task_process(delay):
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行开始。")
    print(f"sleep {delay}s")
    time.sleep(delay)
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行结束。")

if __name__=='__main__':
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行开始。")
    p0 = Process(target=task_process, args=(3,))
    #设置 daemon属性为True
    p0.daemon = True
    p0.start()
    p0.join()
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行结束。")

运行结果如下

代码语言:javascript
复制
2018-07-11 21:17:33 父进程执行开始。
2018-07-11 21:17:33 父进程执行结束。
程序并没有等待子进程结束而结束,只要主程序运行结束,程序即退出。

进程并发控制之Semaphore

Semaphore用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数。 例5 多进程同步控制(multi_process_Semaphore.py):

代码语言:javascript
复制
import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(time.strftime('%H:%M:%S'),multiprocessing.current_process().name + " 获得锁运行");
    time.sleep(i)
    print(time.strftime('%H:%M:%S'),multiprocessing.current_process().name + " 释放锁结束");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(6):
        p = multiprocessing.Process(target = worker, args=(s, 2))
        p.start()

运行结果如下所示:

代码语言:javascript
复制
22:34:36 Process-1 获得锁运行
22:34:36 Process-2 获得锁运行
22:34:38 Process-1 释放锁结束
22:34:38 Process-3 获得锁运行
22:34:38 Process-2 释放锁结束
22:34:38 Process-4 获得锁运行
22:34:40 Process-3 释放锁结束
22:34:40 Process-5 获得锁运行
22:34:40 Process-4 释放锁结束
22:34:40 Process-6 获得锁运行
22:34:42 Process-5 释放锁结束
22:34:42 Process-6 释放锁结束

由于我们设置了 s = multiprocessing.Semaphore(2),因此同一时刻只有 2 个进程在执行操作。

进程同步之Lock

多进程目的是并发执行,提高资源利用率,从而提高效率,但有时候我们需要在某一时间只能有一个进程访问某个共享资源时,这时就需要使用锁 Lock 。 例6:多个进程输出信息,不加锁(multi_process__no_Lock.py)

代码语言:javascript
复制
import multiprocessing
import time

def task1():
    n = 5
    while n > 1:
        print(f"{time.strftime('%H:%M:%S')} task1 输出信息")
        time.sleep(1)
        n -= 1

def task2():
    n = 5
    while n > 1:
        print(f"{time.strftime('%H:%M:%S')} task2 输出信息")
        time.sleep(1)
        n -= 1

def task3():
    n = 5
    while n > 1:
        print(f"{time.strftime('%H:%M:%S')} task3 输出信息")
        time.sleep(1)
        n -= 1

if __name__ == "__main__":
    p1 = multiprocessing.Process(target=task1)
    p2 = multiprocessing.Process(target=task2)
    p3 = multiprocessing.Process(target=task3)
    p1.start()
    p2.start()
    p3.start()

上述代码未使用锁,生成3个子进程,每个进程都打印自己的信息,执行结果如下所示:

代码语言:javascript
复制
21:22:35 task1 输出信息
21:22:35 task2 输出信息
21:22:36 task3 输出信息
21:22:36 task1 输出信息
21:22:36 task2 输出信息
21:22:36 task3 输出信息
21:22:37 task1 输出信息
21:22:37 task2 输出信息
21:22:37 task3 输出信息
21:22:38 task1 输出信息
21:22:38 task2 输出信息
21:22:39 task3 输出信息

从结果可以看出同一时刻有两个进程都在打印信息,在实际的应用中,可能造成信息混乱,现在我们改下上面的程序,要求同一时刻仅有一个进程在输出信息。 例7:多个进程输出信息,加锁(multi_process_Lock.py)

代码语言:javascript
复制
import multiprocessing
import time

def task1(lock):
    with lock:
        n = 5
        while n > 1:
            print(f"{time.strftime('%H:%M:%S')} task1 输出信息")
            time.sleep(1)
            n -= 1

def task2(lock):
    lock.acquire()
    n = 5
    while n > 1:
        print(f"{time.strftime('%H:%M:%S')} task2 输出信息")
        time.sleep(1)
        n -= 1
    lock.release()

def task3(lock):
    lock.acquire()
    n = 5
    while n > 1:
        print(f"{time.strftime('%H:%M:%S')} task3 输出信息")
        time.sleep(1)
        n -= 1
    lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    p1 = multiprocessing.Process(target=task1, args=(lock,))
    p2 = multiprocessing.Process(target=task2, args=(lock,))
    p3 = multiprocessing.Process(target=task3, args=(lock,))
    p1.start()
    p2.start()
    p3.start()

上面的代码中,每一个子进程任务函数中都加了锁Lock,使用锁也非常简单,首先初始化一个锁的实例lock = multiprocessing.Lock(),然后在需要独占的代码前后加锁:先获取锁,即调用lock.acquire()方法,运行完成后释放锁,即调用lock.release()方法;也可以简单的使用上下文关键字with (见task1的代码),执行上述代码,运行结果如下:

代码语言:javascript
复制
21:27:14 task1 输出信息
21:27:15 task1 输出信息
21:27:16 task1 输出信息
21:27:17 task1 输出信息
21:27:18 task2 输出信息
21:27:19 task2 输出信息
21:27:20 task2 输出信息
21:27:21 task2 输出信息
21:27:22 task3 输出信息
21:27:23 task3 输出信息
21:27:24 task3 输出信息
21:27:25 task3 输出信息

从输出结果中可以看出,同一时刻,仅有一个进程在输出信息。

进程同步之Event

Event用来实现进程间同步通信。请看下面的例8(multi_process_Event.py)。

代码语言:javascript
复制
import multiprocessing
import time

def wait_for_event(e):
    e.wait()
    time.sleep(1)
    # 唤醒后清除Event状态,为后续继续等待
    e.clear()
    print(f"{time.strftime('%H:%M:%S')} 进程 A: 我们是兄弟,我等你...")
    e.wait()
    print(f"{time.strftime('%H:%M:%S')} 进程 A: 好的,是兄弟一起走")

def wait_for_event_timeout(e, t):
    e.wait()
    time.sleep(1)
    # 唤醒后清除Event状态,为后续继续等待
    e.clear()
    print(f"{time.strftime('%H:%M:%S')} 进程 B: 好吧,最多等你 {t} 秒")
    e.wait(t)
    print(f"{time.strftime('%H:%M:%S')} 进程 B: 我继续往前走了")

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(target=wait_for_event, args=(e,))
    w2 = multiprocessing.Process( target=wait_for_event_timeout, args=(e, 5) )
    w1.start()
    w2.start()
    # 主进程发话
    print(f"{time.strftime('%H:%M:%S')} 主进程: 谁等我下,我需要 8 s 时间")
    # 唤醒等待的进程
    e.set()
    time.sleep(8)
    print(f"{time.strftime('%H:%M:%S')} 主进程: 好了,我赶上了")
    # 再次唤醒等待的进程
    e.set()
    w1.join()
    w2.join()
    print(f"{time.strftime('%H:%M:%S')} 主进程:退出")

上述代码定义了两个进程函数,一个是等待事件发生。一个等待事件发生,并设置了超时时间。主进程调用事件的set()方法唤醒等待事件的进程,事件唤醒后调用clear()方法清除事件的状态,并重新等待,以此达到进程同步的控制,执行结果如下所示:

代码语言:javascript
复制
20:47:27 主进程: 谁等我下,我需要 8 s 时间
20:47:28 进程 A: 我们是兄弟,我等你...
20:47:28 进程 B: 好吧,最多等你 5 秒
20:47:33 进程 B: 我继续往前走了
20:47:35 主进程: 好了,我赶上了
20:47:35 进程 A: 好的,是兄弟一起走
20:47:35 主进程:退出

进程优先级队列Queue

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。 例9:使用多进程实现生产者-消费者模式:

代码语言:javascript
复制
from multiprocessing import Process,Queue
import time

def ProducerA(q):
    count = 1
    while True:
        q.put(f"冷饮 {count}")
        print(f"{time.strftime('%H:%M:%S')} A 放入:[冷饮 {count}]")
        count +=1
        time.sleep(1)

def  ConsumerB(q):
    while True:
        print(f"{time.strftime('%H:%M:%S')} B 取出 [{q.get()}]")
        time.sleep(5)
if __name__ == '__main__':
    q = Queue(maxsize=5)
    p = Process(target=ProducerA,args=(q,))
    c = Process(target=ConsumerB,args=(q,))
    c.start()
    p.start()
    c.join()
    p.join()

上述代码定义了生产者函数和消费者函数,设置其队列的最大容量是 5 ,生产者不定的生产冷饮,消费者不停的取出冷饮消费,当队列满时,生产者等待,当队列空时,消费者等待。他们放入和取出的速度可能不一致,但使用Queue可以让生产者和消费者有条不紊的一直进程下去。运行结果如下所示:

代码语言:javascript
复制
21:04:19 A 放入:[冷饮 1]
21:04:19 B 取出 [冷饮 1]
21:04:20 A 放入:[冷饮 2]
21:04:21 A 放入:[冷饮 3]
21:04:22 A 放入:[冷饮 4]
21:04:23 A 放入:[冷饮 5]
21:04:24 B 取出 [冷饮 2]
21:04:24 A 放入:[冷饮 6]
21:04:25 A 放入:[冷饮 7]
21:04:29 B 取出 [冷饮 3]
21:04:29 A 放入:[冷饮 8]
21:04:34 B 取出 [冷饮 4]
21:04:34 A 放入:[冷饮 9]
21:04:39 B 取出 [冷饮 5]
21:04:39 A 放入:[冷饮 10]
……

从结果可以看出生产者A生产的速度较快,当队列满时,等待消费者B取出后继续放入。

多进程之进程池Pool

在利用 Python 进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用 multiprocessing 中的 Process 动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。 Pool 可以提供指定数量的进程,供用户调用,当有新的请求提交到 pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。 例10:多进程使用进程池 Pool :

代码语言:javascript
复制
#coding: utf-8
import multiprocessing
import time

def task(name):
    print(f"{time.strftime('%H:%M:%S')}: {name} 开始执行")
    time.sleep(3)

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in range(10):
        #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        pool.apply_async(func = task, args=(i,))
    pool.close()
    pool.join()
    print("hello")

运行结果如下所示:

代码语言:javascript
复制
21:23:34: 0 开始执行
21:23:34: 1 开始执行
21:23:34: 2 开始执行
21:23:37: 3 开始执行
21:23:37: 4 开始执行
21:23:37: 5 开始执行
21:23:40: 6 开始执行
21:23:40: 7 开始执行
21:23:40: 8 开始执行
21:23:43: 9 开始执行

从运行结果上来看同一时刻只有3个进程在执行,使用Pool实现了对进程并发数的控制。

多进程之数据交换Pipe

在类Unix系统中我们经常使用管道(Pipe)命令来让一条命令的输出(STDOUT)做为另一条命令的输入(STDIN),来获取最终的结果。在Python多进程编程中也有一个Pipe方法来帮忙我们实现多进程之前的数据传输。我们可以将Unix系统一命令比做一个进程,一个进程的输出可以做为另一个进程的输入。用图来表示如图所示:

multiprocessing . Pipe()方法返回一个管道的两个端口,如 Command1 的 STDOUT 和 Command2 的 STDIN 。这样 Command1 的输出就做为 Command2 的输入,如果相反过来,让 Command2 的输出也可以做为 Command1 的输入,这就是全双工管道,默认就是全双工管道,如果想设置半双工管理,只需要给方法 Pipe()传递参数 duplex = False 即可。即 Pipe( duplex = False )。 Pipe()方法返回的对象具有发送消息 send()方法和接收消息 recv()方法,可以调用 Command1 . send( msg )发送消息, Command2 . recv()接收消息。如果没有消息可接收, recv()方法会一直阻塞。如果管道已经被关闭,那么 recv()方法会抛出异常 EOFError 。 例3.11:多进程全双工管道(multi_process_pipe.py)

代码语言:javascript
复制
import multiprocessing
import time

def task1(pipe):
    for i in range(5):
        str = f"task1-{i}"
        print(f"{time.strftime('%H:%M:%S')} task1 发送:{str}")
        pipe.send(str)
    time.sleep(2)
    for i in range(5):
        print(f"{time.strftime('%H:%M:%S')} task1 接收: { pipe.recv() }")

def task2(pipe):
    for i in range(5):
        print(f"{time.strftime('%H:%M:%S')} task2 接收: { pipe.recv() }")
    time.sleep(1)
    for i in range(5):
        str = f"task2-{i}"
        print(f"{time.strftime('%H:%M:%S')} task2 发送:{str}")
        pipe.send(str)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=task1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=task2, args=(pipe[1],))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

首先程序定义了两个子进程函数,task1先发送 5 条消息,再接收消息,task2 先接收消息,再发送消息,运行结果如下:

代码语言:javascript
复制
23:26:21 task1 发送:task1-0
23:26:21 task1 发送:task1-1
23:26:21 task1 发送:task1-2
23:26:21 task1 发送:task1-3
23:26:21 task1 发送:task1-4
23:26:21 task2 接收: task1-0
23:26:21 task2 接收: task1-1
23:26:21 task2 接收: task1-2
23:26:21 task2 接收: task1-3
23:26:21 task2 接收: task1-4
23:26:22 task2 发送:task2-0
23:26:22 task2 发送:task2-1
23:26:22 task2 发送:task2-2
23:26:22 task2 发送:task2-3
23:26:22 task2 发送:task2-4
23:26:23 task1 接收: task2-0
23:26:23 task1 接收: task2-1
23:26:23 task1 接收: task2-2
23:26:23 task1 接收: task2-3
23:26:23 task1 接收: task2-4

说明:代码中的 time.sleep() 操作可以让显示的结果不致于太混乱,没有这一步,并不影响进程接收和发送消息。

END

欢迎关注我 somenzz,您的关注,是我坚持的动力。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python七号 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 创建进程的类 Process
  • 进程并发控制之Semaphore
  • 进程同步之Lock
  • 进程同步之Event
  • 进程优先级队列Queue
  • 多进程之进程池Pool
  • 多进程之数据交换Pipe
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档