python3--进程同步(multiprocess.Lock, Semaphore, Event)

socket基于tcp协议的多进程聊天(异步)

server端代码

import socket
from multiprocessing import Process
def chat(conn):
    while True:  # 接收消息,打印消息
        msg = conn.recv(1024).decode('utf-8')
        print(msg)
        conn.send((msg + '_sb').encode('utf-8'))
    conn.close()


if __name__ == '__main__':
    sk = socket.socket()
    sk.bind(('127.0.0.1', 9595))
    sk.listen()
    while True:  # 创建多进程
        conn, addr = sk.accept()
        Process(target=chat, args=[conn,]).start()
    sk.close()

client端代码

import socket
sk = socket.socket()
sk.connect(('127.0.0.1', 9595))

while True:
    inp = input('>>>').encode('utf-8')
    sk.send(inp)
    msg = sk.recv(1024).decode('utf-8')
    print(msg)
sk.close()

先运行server程序,在运行client程序,执行结果为

锁 —— multiprocess.Lock

通过之前的学习,实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。

当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题

多进程抢占输出资源,代码如下

import os
import time
import random
from multiprocessing import Process


def work(n):
    print('{} : {} is running'.format(n, os.getpid()))
    time.sleep(random.random())
    print('{} : {} is done'.format(n, os.getpid()))


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=work, args=(i, ))  
        p.start()

执行结果为:

0 : 2840 is running

1 : 13824 is running

2 : 16980 is running

1 : 13824 is done

0 : 2840 is done

2 : 16980 is done

上面的输出的结果是随机的,如果想有序的执行,就需要用到锁,代码如下:

由并发变成了串行,牺牲了运行效率,但避免了数据混乱

import os
import time
import random
from multiprocessing import Process
from multiprocessing import Lock

def work(n, lock):
    lock.acquire()  #给进程上锁
    print('{} : {} is running'.format(n, os.getpid()))
    time.sleep(random.random())
    print('{} : {} is done'.format(n, os.getpid()))
    lock.release()  #给进程下锁


if __name__ == '__main__':
    lock = Lock()
    for i in range(3):
        p = Process(target=work, args=(i, lock))
        p.start()

执行结果

0 : 16160 is running

0 : 16160 is done

1 : 12368 is running

1 : 12368 is done

2 : 13824 is running

2 : 13824 is done

锁的概念

如果锁加在进程的开始和结束,那么它和join,实现的效果是一样的(同步)

join的效果,代码如下

import os
import time
import random
from multiprocessing import Process
def work(n):
    print('{} : {} is running'.format(n, os.getpid()))
    time.sleep(random.random())
    print('{} : {} is done'.format(n, os.getpid()))


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=work, args=(i,))
        p.start()
        p.join()

执行结果:

0 : 10544 is running

0 : 10544 is done

1 : 8332 is running

1 : 8332 is done

2 : 11280 is running

2 : 11280 is done

总结:同步控制

只要用到了锁,锁之内的代码就变成同步的了

锁:控制一段代码,同一时间,只能被一个进程执行,数据更加安全

锁的应用场景(12306抢票的例子)

模拟数据库,创建一个文件ticket,内容如下:

{"count":3}  #计数(剩余多少张票) 要先创建这个文件

代码如下:

# 文件db的内容为:{"count":3}
# 注意一定要用双引号,不然json无法识别
# 并发运行,效率高,但竞争写同一文件,数据写入错乱
import json
import time
import random
from multiprocessing import Process,Lock

def check_ticket(i):
    with open('ticket') as f:
        ticket_count = json.load(f)
    print('person{}查询当前余票 : '.format(i), ticket_count['count'])

def buy_ticket(i, lock):
    check_ticket(i)  # 调用查询余票函数
    lock.acquire()  # 锁进程
    with open('ticket') as f:
        ticket_count = json.load(f)
    time.sleep(random.random())  # 模拟读取延迟
    if ticket_count['count'] > 0:
        print('person{}购票成功'.format(i))
        ticket_count['count'] -= 1
        with open('ticket', 'w') as f:
            json.dump(ticket_count, f)
    else:
        print('余票不足,person{}购票失败'.format(i))
    time.sleep(random.random())  # 模拟写入延迟
    lock.release()  # 解锁进程

if __name__ == '__main__':
    lock = Lock()  # 实例化锁这个类得到一个对象lock
    for i in range(5):
        Process(target=buy_ticket, args=(i, lock,)).start()

执行结果

#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但
牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理

#因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是
mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中
队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往
往可以获得更好的可获展性。

信号量 —— multiprocess.Semaphore(了解)

互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,
acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器
这样的有限资源。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

模拟ktv的房间数

示例代码如下:在同一时间,最多进去4批人,acquire()是一个阻塞行为

import time
import random
from multiprocessing import Process, Semaphore

def ktv(i,sem):
    sem.acquire()  # 锁住进程
    print('person {} 进来唱歌了'.format(i))
    time.sleep(random.randint(1, 5))  # 模拟唱歌时间
    print('person {} 从ktv出去了'.format(i))
    sem.release()  # 解锁进程


if __name__ == '__main__':
    sem = Semaphore(4)  # 模拟只有4个房间
    for i in range(5):  # 模拟5批人需要唱歌
        Process(target=ktv, args=(i, sem)).start()

执行结果

总结:信号量和锁有点类似,它们之间的区别,信号量,相当于计算器

信号量:

锁 + 计数器

.acquire() 计数器-1

计数器减为0 = 阻塞

.release() 计数器+1

事件 —— multiprocess.Event(了解)

事件介绍

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,
    如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
clear:将“Flag”设置为False
set:将“Flag”设置为True

事情的使用方法

示例1:

from multiprocessing import Event

# 创建一个事件的对象
e = Event()
print(e.is_set())  # 在事件的创始之初,状态为False

执行结果

False

例2:

from multiprocessing import Event

# 创建一个事件的对象
e = Event()
print(e.is_set())  # 在事件的创始之初,状态为False
e.wait()  # 阻塞,等待is_set()的值变成True,如果不为True,会一直等待
          # 后面的代码不会执行
print(1111)

执行结果

False  # 程序没有执行完毕,会一直阻塞

例3

from multiprocessing import Event

# 创建一个事件的对象
e = Event()
print(e.is_set())   # 打印is_set()的状态
                    # 在事件的创始之初,状态为False
e.set()  # 将is_set()的值改为True
e.wait()  # 阻塞,等待is_set()的值变成True,如果不为True,会一直等待
          # 后面的代码不会执行
print(e.is_set())  # 打印is_set()的状态

执行结果

False

True

例4:

from multiprocessing import Event

# 创建一个事件的对象
e = Event()
print(e.is_set())   # 打印is_set()的状态
                    # 在事件的创始之初,状态为False
e.set()  # 将is_set()的值改为True
e.wait()  # 阻塞,等待is_set()的值变成True,如果不为True,会一直等待
          # 后面的代码不会执行
print(e.is_set())  # 打印is_set()的状态
e.clear()  # 将is_set()的值改为False
print(e.is_set())  # 打印is_set()的值
e.wait()  # 阻塞,等待is_set()的值变成True
print(111)  # 代码不会被执行,上面被阻塞了

执行结果

False

True

False

模拟红绿灯

红绿灯:

    车 比喻是一个进程,wait()等红灯

       根据状态变化,wait遇到true信号,就非阻塞

                     遇到False,就阻塞

    交通灯 也是一个进程 红灯->False 绿灯->True

示例代码如下:

import time
import random
from multiprocessing import Process, Event
def car(i, e):  # 感知状态的变化
    if not e.is_set():  # 当前这个事件的状态如果是False
        print('car{}正在等待'.format(i))  # 这辆车正在等待通过路口
    e.wait()  # 阻塞,直到有一个e.set行为(True)  # 理解为等红灯
    print('car{}通过路口'.format(i))

def traffic_light(e):  # 修改事件的状态
    print('\033[1;31m红灯亮\033[0m')
    # 事件在创始之初的状态是False,相当于程序中的红灯

    time.sleep(2)  # 红灯亮2秒
    while True:
        if not e.is_set():  # False
            print('\033[1;32m绿灯亮\033[0m')
            e.set()  # 修改is_set()的值为 True
        elif e.is_set():
            print('\033[1;31m红灯亮\033[0m')
            e.clear()  # 修改is_set()的值为 False
        time.sleep(2)  # 等待2秒


if __name__ == '__main__':
    e = Event()
    Process(target=traffic_light, args=(e,)).start()
    for i in range(10):
        time.sleep(random.randrange(0, 5, 2))
        Process(target=car, args=(i, e)).start()

执行结果:

进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe)

进程间通信

IPC(Inter-Process Communication)

队列 

概念介绍

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递

Queue([maxsize]) 

创建共享的进程队列。

参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。

底层队列使用管道和锁定实现

方法介绍

Queue([maxsize]) 
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。
另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
Queue的实例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果
设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的
时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,
将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发
Queue.Full异常。

q.qsize() 
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加
或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。


q.empty() 
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和
使用结果之间,队列中可能已经加入新的项目。

q.full() 
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。

其它方法了解

q.close() 
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法
完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常
例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread() 
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread() 
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创
建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

Queue与之前学的queue之间的区别

import queue

它能维护一个先进先出的次序 它不能进行IPC(进程间的通信)

from multiprocessing import Queue

它能维护一个先进先出的次序 也能进行IPC(进程间的通信)

进程间的通信

示例代码:

from multiprocessing import Queue, Process
# 能维护一个先进先出的次序,也能进行IPC

def haha(q):
    # 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止
    print(q.get())


if __name__ == '__main__':
    q = Queue()
    p = Process(target=haha, args=(q, ))
    p.start()
    # 将[1, 2, 3]放入队列。如果队列已满,此方法将阻塞至有空间可用为止
    q.put([1, 2, 3])

执行结果

[1, 2, 3]

双向通信

import time
from multiprocessing import Queue, Process
# 能维护一个先进先出的次序,也能进行IPC

def haha(q):
    # 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止
    print(q.get())
    # 将'你好'放入队列。如果队列已满,此方法将阻塞至有空间可用为止
    q.put('你好')


if __name__ == '__main__':
    q = Queue()
    p = Process(target=haha, args=(q, ))
    p.start()
    # 将'hi'放入队列。如果队列已满,此方法将阻塞至有空间可用为止
    q.put('hi')
    time.sleep(0.5)
    # 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止
    print(q.get())

执行结果

hi

你好

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型

示例代码:

from multiprocessing import Process, Queue
import time, random, os


def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))


def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('\033[44m%s 生产了 %s\033[0m' % (os.getpid(), res))


if __name__ == '__main__':
    q = Queue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=(q,))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))

    # 开始
    p1.start()
    c1.start()
    print('主')

执行结果:

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

改良版--生产者消费模型

from multiprocessing import Process, Queue
import time, random, os


def consumer(q):
    while True:
        res = q.get()
        if res is None: break  # 收到结束信号则结束
        time.sleep(random.randint(1, 3))
        print('\033[1;32m%s 吃 %s\033[0m' % (os.getpid(), res))


def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('\033[1;31m%s 生产了 %s\033[0m' % (os.getpid(), res))
    q.put(None)  # 发送结束信号


if __name__ == '__main__':
    q = Queue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=(q,))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))

    # 开始
    p1.start()
    c1.start()
    print('主')

执行结果

注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Youngxj

[教程]黑客级别的批量处理文件

25630
来自专栏不想当开发的产品不是好测试

#测试框架推荐# test4j,数据库测试

# 背景 后端都是操作DB的,这块的自动化测试校验的话,是需要数据库操作的,当然可以直接封装方法来操作数据,那么有没有开源框架支持数据操作,让我们关注写sql语...

660120
来自专栏土豆专栏

Java面试之基本概念(二)

当两个线程竞争同一资源的时候,如果对资源的访问顺序敏感,就称存在竞态条件。导致竞态条件发生的代码区称作临界区。

23650
来自专栏知识分享

3-物联网开发标配方案(APP程序介绍)

我把MQTT的执行程序直接做了一个类,单例模式加回调函数的形式,内部还做成了断线重连的

27120
来自专栏北京马哥教育

Python imports指南

来源:Python程序员 ID:pythonbuluo 声明:如果你每天写Python,你会发现这篇文章中没有新东西。 这是专为那些像运维人员等偶尔使用Pyt...

26150
来自专栏ascii0x03的安全笔记

Linux下ls命令显示符号链接权限为777的探索

Linux下ls命令显示符号链接权限为777的探索                                                ——深入ls、...

1.1K50
来自专栏风中追风

java类的加载过程和类加载器的分析

我们知道,我们写的java代码保存的格式是 .java, java文件被编译后会转换为字节码,字节码可以在任何平台通过java虚拟机来运行,这也是java能够跨...

55980
来自专栏Python

python select模块详解

要理解select.select模块其实主要就是要理解它的参数, 以及其三个返回值。 select()方法接收并监控3个通信列表, 第一个是所有的输入的data...

52560
来自专栏JavaEdge

Java线程状态

实现Runnable接口和继承Thread可以得到一个线程类,new一个实例出来,线程就进入了初始状态 英文翻译过来是线程还是没有开始执行。 首先,既然已经...

550110
来自专栏技术博客

C#简单的面试题目(六)

76.HashMap和Hashtable的区别。 答:HashMap是Hashtable的轻量级实现(非线程安全的实现),他们都完成了Map接口,主要区别在于H...

9720

扫码关注云+社区

领取腾讯云代金券