前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >第36天并发编程之进程篇

第36天并发编程之进程篇

作者头像
py3study
发布2020-01-20 14:41:07
3790
发布2020-01-20 14:41:07
举报
文章被收录于专栏:python3python3

目录:

1. 基础概念

  2. 创建进程和结束进程

  3. 进程之间内存空间物理隔离

  4. 进程的属性方法

  5. 守护进程

  6. 互斥锁

  7. IPC通信机制

  8. 生产者消费者模型

一. 基础概念

代码语言:javascript
复制
1. 什么叫做程序,什么叫做进程?
  
  程序就是程序员写的一堆代码文件。
  进程指的是程序正在执行的一个过程,是一个抽象的概念,起源于操作系统。

2. 什么是操作系统?
  
  操作系统是位于计算机硬件与应用程序之间的,用于协调,控制和管理计算机硬件和软件资源的控制程序

3. 操作系统的两大作用
  
  (1). 将复杂丑陋的硬件操作都封装成简单的接口,提供给应用程序使用,大大的提高了应用程序的开发效率
  (2). 把进程对硬件的竞争变得有序

4. 批处理系统
    
  将程序员写的程序攒成一堆,然后一个一个的读到内存里面进行执行。
  解决了第一代操作系统一个人独占计算机资源的问题,虽然节省了大量的时间,但是本质上还是串行
  此时已经出现了操作系统的概念,以及进程的雏形。
  
5. 多道技术
  
  产生背景:在单核下实现并发的效果。
  两大核心:
    空间复用:
      将多个程序同时读入到内存中,等待被cpu执行。此时也就出现了多进程的概念。
      特点:每个程序的内存空间都是物理隔离的。
    时间复用:  
      复用cpu的时间片
      cpu什么时候会切换进程的执行?
        1.在遇到i/0阻塞的时候就会暂时的挂起此进程,切换到另一个进程去执行
        2.正在执行的进程占用cpu的时间过长,或者有一个优先级更高的进程出现的时候也会切换执行
  优点:大大的提高了计算机cpu的利用率,实现了并发的效果。是当代计算机操作系统的雏形。

6. 串行,并行,并发,阻塞
  
  串行:程序从上到下依次执行。批处理系统典型的就是串行。
  并行:同一时刻运行多个程序。如果只有一个核是不可能实现并行的,只有多核的时候才能真正的实现并行。
  并发:同一时间段内运行多个程序,用户看起来就像是并行一样。
    并发实现的本质:保存当前的状态 + cpu的切换
  阻塞:遇到i/0就进入阻塞状态。

7. 进程的属性
  
  pid: 用来唯一的表示一个进程,就像是身份证号。
  name: 进程名称
  terminate: 杀死当前子进程
  is_alive: 查看子进程是否还活着
  join: 等待子进程结束
  start: 向系统发送一个创建子进程的系统调用
  daemon: 将子进程设置成守护进程

  方法:
    os.getpid()  获得当前进程的pid
    os.getppid() 获得当前进程的父进程pid

8. 进程相关的win的命令
  tasklist: 查看当前进程信息
  taskkill /F /PID 进程号    杀死一个进程
  tasklist |findstr pyth   通过管道查看相应的值

9. 僵尸进程和孤儿进程
  
  僵尸进程:子进程死了,但是父进程还没有死,此时的子进程就称之为僵尸进程。
    任何进程死了之后都会回收相应的资源,但是对于一些基本的信息是不会回收的,例如pid,name等,以被父进程所查看,这就存在一个问题。
    当父进程创建了大量的子进程,而父进程又很长的时间内不会不会死掉,对于内存空间的占用倒不是很严重,但是会占用大量的pid,而pid的资源是有限的。
    因此我们的程序中不应该出现大量的僵尸进程,如果父进程需要运行很长的时间,我们就需要在适当的时候回收子进程的资源,防止出现大量的无用僵尸进程。
    join方法就会提供回收僵尸进程的功能。 

  孤儿进程:父进程死了,但是子进程没有死,这是的子进程就称之为孤儿进程。
    孤儿进程没有害,因为孤儿进程会被init进程所接管,在一定的时间之后会被清理掉。但是程序中也不应该出现大量的孤儿进程。

 二. 创建进程和结束进程

代码语言:javascript
复制
进程创建的方式
  
  1.系统初始化,在开机的时候自动就会加载操作系统,此时就会出现一个根进程。
  2.用户的交互式请求,例如在电脑上双击暴风影音,就是创建了一个进程,再双击就又创建了一个进程。
  3.一个批处理作业的初始化(专用计算机的系统加载)
  
  4.一个进程运行的过程中通过模块重新开启一个子进程 (我们关注的是这种方式的进程的创建)
    linux: fork
      在初始化的时候子进程和父进程是完全一样的。 
    win:CreateProcess
      初始状态的时候子进程和父进程并不是完全一样的。

进程的结束方式
  
  1. 正常退出
  2. 出错退出
  3. 被另一个进程杀死

第一种创建进程的方式

代码语言:javascript
复制
from multiprocessing import Process
import time

def task(name):
    print('%s is running...' %name)
    # 模拟子进程执行了一系列的操作
    time.sleep(2)
    print('%s is ending...' %name)

# 在windows下面必须要这样写
# 这是因为在win下创建一个进程的时候会重新执行一遍此模块
# 为了防止循环创建,所以必须要在此地方创建子进程
if __name__ == '__main__':
    # 创建了一个进程对象
    # target代表的是子进程要执行的任务,一般是函数名称
    # args:里面的值是给函数传递的参数
    p = Process(target=task, args=('egon',))
    # 此处并不是直接创建子进程,而是向操作系统发送了一个系统调用
    # 操作系统会申请一个内存空间,创建一个子进程
    # 对于主进程而言,这行代码就像是平常的代码一样,主进程并不会等待子进程的创建,然后就去继续执行了
    p.start()
    # 因此结果是先打印下面的提示信息,之后才会去打印task函数内的东西
    print('p.start()代码一旦执行完,我是不会等系统创建子进程的,我立马就要执行')

第二种创建进程的方式

代码语言:javascript
复制
from multiprocessing import Process
import time


class MyProcess(Process):
    def __init__(self, name):
        # 重用父类的功能,然后传递自己函数所需要的参数
        super(MyProcess, self).__init__()
        self.name = name

    # p.start()其实就是系统去调用run函数,因此我们将之前的内容放到run函数里面
    def run(self):
        print('%s is running...' %self.name)
        # 模拟子进程执行了一系列的操作
        time.sleep(2)
        print('%s is ending...' %self.name)


if __name__ == '__main__':
    p = MyProcess('egon')
    p.start()
    print('主进程')

三. 进程之间内存空间物理隔离

代码语言:javascript
复制
from multiprocessing import Process
x = 100

def task():
    global x
    x = 0
    print('子进程结束...')

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    # 等待子进程结束之后在去打印x的值
    p.join()
    # 如果空间是共享的,等待子进程结束之后x的值应该是0
    # 如果空间是隔离的,子进程结束之后x的值还是100
    print(x)

四. 进程的属性方法

 join方法

代码语言:javascript
复制
from multiprocessing import Process
import time

def task():
    print('子进程开始...')
    time.sleep(2)

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    # 如果没有join,就会先打印x,然后才会去执行task函数
    # 有了join之后,主进程就会阻塞在这里,等待子进程p结束之后才会打印x
    p.join()
    print('主进程.....')

# 结果:
# 子进程开始...
代码语言:javascript
复制
from multiprocessing import Process
import time

def task(n):
    print('%s开始...' %n)
    time.sleep(n)

if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=(1, ))  # p1睡了1s
    p2 = Process(target=task, args=(2, ))  # p2睡了2s
    p3 = Process(target=task, args=(3, ))  # p3睡了3s

    p1.start()
    p2.start()
    p3.start()

    # 程序运行到这个地方的时候p1,p2, p3可能都已经开始执行了
    # 无论主进程是在等谁,所有的子进程都是会执行的
    # 也就是说在等待p1的过程中子进程p3也在执行,因此整个程序的执行时间应该是最耗时的子进程时间
    # 此处也就是三秒多
    p1.join()  # 等待子进程1s
    p2.join()  # 等待子进程2s
    p3.join()  # 等待子进程3s
    end= time.time()
    print('执行时间>>', start - end)


# 运行结果:
# 2开始...
# 1开始...
# 3开始...
# 执行时间>> -3.2581233978271484

join容易迷惑的地方

代码语言:javascript
复制
from multiprocessing import Process
import time

def task():
    print('子进程开始...')
    time.sleep(2)

if __name__ == '__main__':
    # 如果循环的创建了子进程,需要等待所有进程结束我们就需要通过一个循环去等待
    l = []
    for i in range(5):
        p = Process(target=task)
        l.append(p)
        p.start()

    for p in l:
        p.join()
    print('主')
# 结果:
# 子进程开始...
# 主进程.....

join循环等待子进程结束

自定义查看ppid的方法(待填)

五. 守护进程

代码语言:javascript
复制
1. 什么是守护进程
  
   obj = Process(target=lambda x: x + 1)  # 创建一个子进程
   obj.daemon = True   # 当设置了此属性之后,这个子进程就会变成一个守护进程
   效果: 当主进程结束之后,守护进程就会随着结束

2. 为什么要有守护进程?
  
  我们创建一个子进程就是为了并发的执行多个任务,有时候我们的子任务在主任务结束之后就没有存在的必要了,因此,在主程序结束之后,我们往往希望可以自动的结束掉这些子进程,因此就有了守护进程。
  例如:当我们通过qq在传一个文件的时候,qq是主进程,传文件是子进程,当qq退出去之后还应该会传文件吗,肯定不会,所以此时就应该把传文件设置成一个守护进程,当qq退出去的时候自动的关掉子进程。

3. 重点
  
  如果一个主进程中既有守护进程也有非守护进程,那么当主进程的代码执行完毕以后守护进程就会死掉,并不会等到主进程清理完非守护进程之后才死掉。

 例子:

代码语言:javascript
复制
from multiprocessing import Process
import time


def task(name):
    print(name, 'is running...')
    time.sleep(3)
    print("ending....")


if __name__ == '__main__':
    p = Process(target=task, args=('egon',))
    p.start()
    print('主进程over')


# 运行结果: 只有等待子进程完全结束之后才会结束掉主进程,防止
# H:\python_study\venv\Scripts\python.exe H:/python_study/day36/博客/守护进程.py
# 主进程over
# egon is running...
# ending....
#
# Process finished with exit code 0

正常产生的主进程会等待子进程结束之后才会结束

代码语言:javascript
复制
from multiprocessing import Process
import time


def task(name):
    print(name, 'is running...')
    time.sleep(3)
    print("ending....")


if __name__ == '__main__':
    p = Process(target=task, args=('egon',))
    p.daemon = True  # 将子进程变成一个僵尸进程
    p.start()
    print('主进程over')

# 结果:当主进程执行完print操作之后就直接结束了,守护进程也会随之而结束
# H:\python_study\venv\Scripts\python.exe H:/python_study/day36/博客/守护进程.py
# 主进程over
# 
# Process finished with exit code 0

主进程结束之后守护进程也就跟着结束了

代码语言:javascript
复制
from multiprocessing import Process
import time

def foo():
    print(123)
    time.sleep(0.1)
    print(456)

def bar():
    print(789)
    time.sleep(2)
    print('10002')


if __name__ == '__main__':
    p1 = Process(target=foo)
    p2 = Process(target=bar)
    p1.daemon = True
    p1.start()
    p2.start()
    print('主进程.....')  # 当这一行代码执行完毕之后,就代表着主进程相关的任务已经执行完毕,守护进程在此时就没有守护的必要的了,因此会被干掉

# 结果: 打印完主进程之后,p1进程作为守护进程就会被干掉
# 主进程.....
# 789
# 10002

守护进程和非守护进程都存在的情况下

六. 互斥锁

代码语言:javascript
复制
1. 什么叫做互斥锁
  
  对于同一个系统资源,如果一个进程加上了互斥锁,另一个进程也加上了同一个互斥锁,谁先抢到谁先执行,直到释放锁之后,另个一进程才能够使用此资源。

2. 互斥锁和join的区别
  
  原理都是一样的,都是为了将并发变成串行,从而保证有序。
  区别一:
    互斥锁:进程平等的竞争,谁先抢到谁先执行。
    join: 按照人为指定的顺序执行。
  区别二:
    互斥锁:将一部分代码进行串行
    join: 只能将代码整体

区别一:互斥锁和join

步骤一:创建一个py程序,用来打印三个人的信息,创建了三个函数,每个函数里面都有一个sleep来模拟网络延迟,因此我们写出了下面的代码

代码语言:javascript
复制
import time


def task1():
    print('task1: 名字, egon')
    time.sleep(0.1)
    print('task1: 性别, male')
    time.sleep(0.1)
    print('task1: 年龄, 13')

def task2():
    print('task2: 名字, alex')
    time.sleep(0.1)
    print('task2: 性别, male')
    time.sleep(0.1)
    print('task2: 年龄, 18')

def task3():
    print('task3: 名字, wxx')
    time.sleep(0.1)
    print('task3: 性别, male')
    time.sleep(0.1)
    print('task3: 年龄, 21')


if __name__ == '__main__':
    task1()
    task2()
    task3()

正常的打印三个人的信息

步骤二:这样写虽然解决了问题,但是运行效率太慢了,完全受不了,因此想着怎么让三个任务进行并发,从而提高运行效率,因此我们创建了三个进程,分别用来执行三个任务,所以写出来了下面这个代码

代码语言:javascript
复制
from multiprocessing import Process
import time


def task1():
    print('task1: 名字, egon')
    time.sleep(0.1)
    print('task1: 性别, male')
    time.sleep(0.1)
    print('task1: 年龄, 13')

def task2():
    print('task2: 名字, alex')
    time.sleep(0.1)
    print('task2: 性别, male')
    time.sleep(0.1)
    print('task2: 年龄, 18')

def task3():
    print('task3: 名字, wxx')
    time.sleep(0.1)
    print('task3: 性别, male')
    time.sleep(0.1)
    print('task3: 年龄, 21')


if __name__ == '__main__':
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)
    p1.start()
    p2.start()
    p3.start()

通过创建子进程的形式提高运行效率

步骤三:这样子写虽然是提高了运行效率,但是我们发现结果并不是我们想要的,我们希望的是无论哪个任务先执行,总是希望可以让这个任务的信息打印完成之后才去执行之后的任务。有两种解决方法1, 就是jion方法,2. 就是互斥锁,首先我们以join的方法让当前信息打印变得有序。

代码语言:javascript
复制
from multiprocessing import Process
import time

def task1():
    print('task1: 名字, egon')
    time.sleep(0.1)
    print('task1: 性别, male')
    time.sleep(0.1)
    print('task1: 年龄, 13')

def task2():
    print('task2: 名字, alex')
    time.sleep(0.1)
    print('task2: 性别, male')
    time.sleep(0.1)
    print('task2: 年龄, 18')

def task3():
    print('task3: 名字, wxx')
    time.sleep(0.1)
    print('task3: 性别, male')
    time.sleep(0.1)
    print('task3: 年龄, 21')


if __name__ == '__main__':
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)
    p1.start()
    p1.join()  # 在此处加上join方法,使得等待第一个程序执行完成之后在去执行第二个进程
    p2.start()
    p1.join()
    p3.start()
    p1.join()

join方法使得并发变得有序

步骤四:结果变得确实有序了,但是这样写是有问题的,1. 完全没有并发的效果  关于这个问题我们暂时忽略,只是为了讨论并发才拿出来这样一个例子的。2. 这样子写其实是人为的规定了让p1进程先执行,然后是p2进程,然后是p3进程。这样是非常不公平的,我们的初衷并不希望人为的规定哪个子进程先进行操作,因此我们可以使用互斥锁,这就需要引入另一个类Lock

代码语言:javascript
复制
# 锁的使用方法,使用比较简单,就两个函数

mutex = Lock()   # 创建一个互斥锁
mutex.acqure()   # 加锁
。。。。这是我们希望控制的代码
mutex.release()  # 释放锁

# 注意:

1. 在子进程中所使用的锁必须是同一把锁,就是锁必须要在if语句中创建,并且通过参数的形式传递给子进程。
2. 对于同一个进程锁只能加一次
3. 必要的代码执行完毕之后必须要释放锁
代码语言:javascript
复制
from multiprocessing import Process, Lock
import time


def task1(lock):
    lock.acquire()
    print('task1: 名字, egon')
    time.sleep(0.1)
    print('task1: 性别, male')
    time.sleep(0.1)
    print('task1: 年龄, 13')
    lock.release()

def task2(lock):
    lock.acquire()
    print('task2: 名字, alex')
    time.sleep(0.1)
    print('task2: 性别, male')
    time.sleep(0.1)
    print('task2: 年龄, 18')
    lock.release()


def task3(lock):
    lock.acquire()   # 加锁
    print('task3: 名字, wxx')
    time.sleep(0.1)
    print('task3: 性别, male')
    time.sleep(0.1)
    print('task3: 年龄, 21')
    lock.release() # 解锁


if __name__ == '__main__':
    mutex = Lock()   # 创建一个锁
    p1 = Process(target=task1, args=(mutex,))  # 然后将这一个锁当做参数进行传递
    p2 = Process(target=task2, args=(mutex,))
    p3 = Process(target=task3, args=(mutex,))
    p1.start()
    p2.start()
    p3.start()

互斥锁解决并发的乱序的问题

区别二:共享锁写一个简单的抢票小程序

 步骤一:创建一个db文件用来存放共享数据,也就是票的数量。因为进程之间的通信目前还没有学到,但是对于磁盘的访问每个进程都是可以访问的,因此先创建db文件

代码语言:javascript
复制
{"count": 1}

步骤二:写一个不加锁不加延迟的一个简单功能

代码语言:javascript
复制
from multiprocessing import Process
import os
import json
import time


def search():
    """查询当前还有几张票"""
    with open('db.json', 'r', encoding='utf-8') as f:
        print('%s  剩余票数 %s' %(os.getpid(), json.load(f)['count']))


def get():
    """购买票"""
    with open('db.json', 'r', encoding='utf-8') as f:
        dic = json.load(f)

    if dic['count'] > 0:
        dic['count'] -= 1
        with open('db.json', 'w', encoding='utf-8') as f:
            json.dump(dic, f)
        print('%s 购买票成功' % os.getpid())


def task():
    """抢票程序,包含一个查询票和购买票的功能"""
    search()
    get()


if __name__ == '__main__':
    for i in range(5):
        p = Process(target=task)
        p.start()

不加锁不加延迟

步骤三: 如果没有延迟的情况下,我们的程序目前来看已经具备了抢票的功能,因为在没有延迟的情况下,当所有进程建立完成之后就是一些基本的运算,cpu的执行是非常快的,因此cpu在执行get函数的时候基本上是不会切换到其他的进程中执行的。但是如果有延迟的情况就不一样了,加上延迟之后因为cpu会切换,所以导致结果不可控,有两种解决方案,1. join将整个进程变成串行的  , 虽然join可以解决问题,但是对于查询票数这个操作来说,我们并不希望是串行的,因此此方法并不合适 2. 用互斥锁

代码语言:javascript
复制
from multiprocessing import Process,Lock
import os
import json
import time
import random


def search():
    """查询当前还有几张票"""
    # 加上查询票的延迟
    time.sleep(random.randint(1,3))
    with open('db.json', 'r', encoding='utf-8') as f:
        print('%s  剩余票数 %s' %(os.getpid(), json.load(f)['count']))


def get():
    """购买票"""
    with open('db.json', 'r', encoding='utf-8') as f:
        dic = json.load(f)

    if dic['count'] > 0:
        dic['count'] -= 1
        # 加上购买票的延迟
        time.sleep(random.randint(1, 3))
        with open('db.json', 'w', encoding='utf-8') as f:
            json.dump(dic, f)
        print('%s 购买票成功' % os.getpid())


def task(lock):
    """抢票程序,包含一个查询票和购买票的功能"""
    search()
    lock.acquire()
    get()
    lock.release()


if __name__ == '__main__':
    metux = Lock()
    for i in range(10):
        p = Process(target=task, args=(metux,))
        p.start()

用互斥锁解决抢票问题

七. IPC通信机制

代码语言:javascript
复制
IPC: 进程之间的通信

问题: 两个进程之间的内存空间是物理隔离的,因此怎么通信呢?

进程之间通信的方式:

1. 上面讲到的创建一个共享文件
  文件的i/0操作太浪费时间,因此这个方式不建议使用
2. 通道
  之前学过subprocess也可以实现进程之间的通信,但是这个进程之间必须是父子进程,并且是半双工模式,因此也不推荐使用
3. 共享内存
  Manager,可以通过此类创建一个共享的字典或者列表,为了防止数据出错,在修改数据的时候我们需要自己添加锁,很麻烦,也不建议使用
  Queue: 队列,我们可以通过队列的方式实现进程之间的通信,队列在内部已经帮我们添加了锁。

IPC机制应该遵循的原则
  1. 所有进程都应该可以共享数据
  2. 共享的数据最好应该在内存中
  3. 并且我们不需要去操作锁,也就是IPC应该帮我们处理好锁的功能

Manager存在的问题:要自己定义锁才能对数据进行修改

代码语言:javascript
复制
# 创建共享内存空间,并进行修改

from multiprocessing import Process, Manager

def task(dic):
    dic['num'] -= 1

if __name__ == '__main__':
    m = Manager()
    dic = m.dict({'num': 10})
    l = []
    for i in range(10):
        p = Process(target=task, args=(dic, ))
        p.start()
        l.append(p)
    # 等待子进程结束
    for p in l:
        p.join()
    print(dic)
# 结果:
# {'num': 0}

# 创建共享内存空间,并进行修改

 步骤一:创建一个共享的内存空间,然后通过创建进程去修改共享字典的内容,我们发现我们想要的效果Manager确实是已经帮我们实现了,但是当我们在task函数做了以下修改之后,再打印信息就会发现所出来结果不是0而是9了

代码语言:javascript
复制
def task(dic):
    temp = dic['num']   # 和之前的dic['num'] = 1性质是一样的,但是为什么结果不一样呢
    time.sleep(0.1)
    dic['num'] = temp - 1

步骤二:这是因为在创建进程之后,所有的进程基本上都会在同一时间内拿到temp的值为10,也就是说10个进程的temp都是10, 当他们睡完0.1秒之后无论是谁进行修改dic的值都是10-1所以结果是0,因此对于Manager创建的内存空间默认在修改数据的时候是不会给我们加锁的,因此我们需要自己去加锁对数据进行修改,否则数据就会被损坏。

代码语言:javascript
复制
from multiprocessing import Process, Manager, Lock
import time
mutex = Lock()
def task(dic, lock):
    lock.acquire()  # 加锁,防止数据被破坏
    temp = dic['num']
    time.sleep(0.1)
    dic['num'] = temp - 1
    lock.release()

if __name__ == '__main__':
    m = Manager()
    dic = m.dict({'num': 10})
    l = []
    for i in range(10):
        p = Process(target=task, args=(dic, mutex))
        p.start()
        l.append(p)
    # 等待子进程结束
    for p in l:
        p.join()
    print(dic)
# 结果:
# {'num': 0}

加锁使得数据变得安全

Queue队列简介

代码语言:javascript
复制
特点
  1. 先进先出
  2. 队列只应该传送消息,数据量不应该过大
  3.创建队列的长度不应该过大,因为它占用的是内存的空间 

方法
  put: 往队列里面添加东西 
    参数一:obj,放到队列里面的对象
    参数二: block,如果队列满了是否阻塞
    参数三: timeout,超时时间,在队列的阻塞的状态下才有意义
  get: 从队列里面拿东西
    参数一:block, 如果队列为空是否阻塞,
    参数二:timeout,超时时间,在队列阻塞状态下才有意义 

例子:
    # 创建队列,队列长度为3
    q = Queue(3)
    
    q.put('first')
    q.put({'second': None})
    q.put('三')
    # q.put('四')  # 默认会阻塞
    q.get()
    q.get()
    q.get()

八:生产者消费者模型

代码语言:javascript
复制
生产者和消费者模型
1.模型指的是一种解决问题的套路

2.该模式下具备两种角色
  生产者: 生产数据
  消费者: 处理数据

3.该模型的运作方式
  生产者生产数据,放到一个共享的空间中,然后消费者取走进行处理

4.该模型的实现方式一
  生产者进程 + 队列 + 消费者进程 
  队列中存放的是一些消息,不应该存放大量的数据

5.该模型的应用场景
  如果程序中由明显的两类任务,一类任务是负责生产数据,另外一类是负责处理数据的
  就应该使用生产者和消费者模型

6.该模式的优点
  1. 实现了生产者和消费者解耦和
  2. 平衡了生产者的生产数据的能力与消费者处理数据的能力

案例:模拟一个生产者和消费者模型

代码语言:javascript
复制
import time
import random
from multiprocessing import Process, Queue


def consumer(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,2))
        print('\033[35m消费者==> %s 吃了 %s\033[0m' %(name, res))


def producer(name, q, food):
    for i in range(5):
        res = '%s %s' %(food, i)
        time.sleep(random.randint(1, 2))
        q.put(res)
        print('\033[34m生产者>> %s 生产了 %s \033[0m' %(name, res))


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=('egon', q, '包子'))
    c1 = Process(target=consumer, args=('alex', q))

    p1.start()
    c1.start()

简单的生产消费者模型代码

问题一:我们会发现虽然实现了生产者和消费者并发执行的效果,但是当消费者吃完包子之后主程序阻塞掉了。主程序阻塞原因有两个: 1. 主程序自己的代码没有执行完毕,2. 主进程在等待子进程执行完毕。此处主程序的代码很明显是阻塞在了消费者模型里面。如何去解决这样的问题呢?

解决方案一:因为程序是在子进程的获取队列的时候阻塞掉了,因此我们考虑将队列的修改成非阻塞状态,但是发现报错了,这是因为在c1和p1进程起来之后我们是不能确定谁先执行的,如果c1先执行了get,发现队列里面没有内容,又是非阻塞状态,就会报错。因此队列不能是非阻塞状态。

代码语言:javascript
复制
def consumer(name, q):
    while True:
        # 在此处阻塞掉了,因此我们考虑将队列设置为非阻塞
        res = q.get(block=False)
        time.sleep(random.randint(1,2))
        print('\033[35m消费者==> %s 吃了 %s\033[0m' %(name, res))

解决方案二:既然队列必须是阻塞状态,那么我们能不能设置一个超时时间,但是对于消费者而言,超时时间设置为多少才合适呢?我们并不能确定生产者每次生产数据的时间,因此如果设置成了4, 但是生产者过了5s才生产一个数据该怎么办呢?这个方式也是不合理的。

代码语言:javascript
复制
def consumer(name, q):
    while True:
        # 队列必须是阻塞状态,因此设置超时时间
        res = q.get(timeout=4)
        time.sleep(random.randint(1,2))
        print('\033[35m消费者==> %s 吃了 %s\033[0m' %(name, res))

解决方案三:既然从消费者的角度无法解决这样的问题,那么我们就从生产者的角度来解决这样的问题。当我生产完数据之后我额外的放一个None,当消费者收到这个标志的时候就代表生产的数据完了,如下

代码语言:javascript
复制
def consumer(name, q):
    while True:
        res = q.get()
        # 当收到一个None时就结束掉子进程
        if not res:
            break 
        time.sleep(random.randint(1,2))
        print('\033[35m消费者==> %s 吃了 %s\033[0m' %(name, res))


def producer(name, q, food):
    for i in range(5):
        res = '%s %s' %(food, i)
        time.sleep(random.randint(1, 2))
        q.put(res)
        print('\033[34m生产者>> %s 生产了 %s \033[0m' %(name, res))
    # 在我生产完数据之后就在队列里面设置一个None,然后消费者收到None之后就结束进程
    q.put(None)

问题二:虽然我们完美的解决了问题,但是当我们的生产者变多了之后就会出现下面的这个效果,有的生产者产生的数据并没有消费者去消费,那是因为每个生产者生产完数据之后都会往队列中放入一个None,当一个消费者收到一个None的时候就会结束掉子进程,因此当你的生产者的数量一旦大于了消费者的数量,肯定会出现生产的数据没有人去处理的问题。

代码语言:javascript
复制
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=('egon', q, '包子'))
    # 添加了一个生产者
    p2 = Process(target=producer, args=('hu', q, '甘蔗'))
    c1 = Process(target=consumer, args=('alex', q))

    p1.start()
    p2.start()
    c1.start()

解决方案一: 之前的初衷是想等生产者进程结束之后在队列的后面添加None,但是这个None的数量不能超过消费者的数量,因此我们可以通过在主进程中join来确定生产者模型结束之后,由子进程统一往队列中添加None。 这种方法虽然可以解决问题,但是不好的地方在哪里呢?我们有几个消费者就要往队列中添加几个None,浪费空间,而且还麻烦,因此这种方案也不推荐。

代码语言:javascript
复制
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=('egon', q, '包子'))
    # 添加了一个生产者
    p2 = Process(target=producer, args=('hu', q, '甘蔗'))
    p3 = Process(target=producer, args=('hu', q, '米饭'))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('alex', q))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    
    # 记得将之前生产者函数中的put给删除掉,搞了半天,我还以为我理解错了呢
    # 等待生产者生产完成之后再往队列中添加None,个数为消费者的个数
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    print('主进程..')

解决方案二:这个时候我们就要引入JoinableQueue队列

代码语言:javascript
复制
实现原理:
  1. 等待生产者生产完成之后,计入此时队列里的值,通过q.join()
  2. 每次消费者get一个内容之后都会通过 task_done将之前计入的值减1
  3. 当计入的值变成零的时候就代表队列为空了
  4. 在创建之处就设计消费者为守护进程
代码语言:javascript
复制
import time
import random
from multiprocessing import Process, Queue, JoinableQueue


def consumer(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,2))
        print('\033[35m消费者==> %s 吃了 %s\033[0m' %(name, res))
        # 每次取出一个就将队列计数减一
        q.task_done()


def producer(name, q, food):
    for i in range(5):
        res = '%s %s' %(food, i)
        time.sleep(random.randint(1, 2))
        q.put(res)
        print('\033[34m生产者>> %s 生产了 %s \033[0m' %(name, res))


if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=('egon', q, '包子'))
    p2 = Process(target=producer, args=('hu', q, '甘蔗'))
    p3 = Process(target=producer, args=('hu', q, '米饭'))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('alex', q))

    p1.start()
    p2.start()
    p3.start()
    # 设置守护进程
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()

    # 等待生产者生产完成之后再往队列中添加None,个数为消费者的个数
    p1.join()
    p2.join()
    p3.join()

    # 记录当前队列中还有值得数量
    q.join()
    
    # 当执行到这一个步骤的时候,就代表消费者已经将内容取完了,主进程代码执行完毕之后,守护进程也就被杀死了
    print('主进程..')

最终实现代码

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/03/31 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云开发 CloudBase
云开发(Tencent CloudBase,TCB)是腾讯云提供的云原生一体化开发环境和工具平台,为200万+企业和开发者提供高可用、自动弹性扩缩的后端云服务,可用于云端一体化开发多种端应用(小程序、公众号、Web 应用等),避免了应用开发过程中繁琐的服务器搭建及运维,开发者可以专注于业务逻辑的实现,开发门槛更低,效率更高。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档