前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python 之进程与线程

python 之进程与线程

作者头像
keinYe
发布2019-08-01 10:40:47
3590
发布2019-08-01 10:40:47
举报
文章被收录于专栏:keinYekeinYe

进程和线程的概念

从系统调度和资源分配的角度来看,进程是 CPU 资源分配的最小单位,线程是 CPU 调度的最小单位。从 CPU 执行时间的角度来看,进程是包含了上下文切换的程序执行时间总和,线程是共享了进程的上下文环境的更为细小的 CPU 时间段。

多进程

Python 使用 multiprocessing 包来实现多进程。使用 multiprocessing 包我们只需要定义一个函数(Python 会完成其他所有事情)即可完成从单进程到多进程的转换。

使用 Process 类创建子进程

Python 使用 multiprocessing 模块提供的 Process 类来代表一个进程对象。

建立一个进程对象使用 Process(group=None, target=None, name=None, args=(), kwargs={}),其中group总是设置为None。target表示调用对象。name为别名。args表示调用对象的位置参数元组。kwargs表示调用对象的字典。

Process 类含有以下方法:

  1. start 启动进程
  2. join 等待进程执行完毕
  3. is_alive 进程是否处于活动状态
  4. terminate 终止进程,用在进程为死循环的情况下,手动终止进程。
代码语言:javascript
复制
from multiprocessing import Process
import os

def info(title):
    print title
    print 'module name:', __name__
    if hasattr(os, 'getppid'):
        print 'parent process:', os.getppid()
    print 'process id:', os.getpid()

def f(name):
    info('function f')
    print 'hello', name

if __name__ == '__main__':
    info('main line')
    # 建立一个进程对象
    p = Process(target=f, args=('bob',))
    # 启动新建的进程
    p.start()
    print p.is_alive()
    # 等待进程执行完毕
    p.join()
    print p.is_alive()

运行结果如下:

代码语言:javascript
复制
main line
module name: __main__
parent process: 657
process id: 767
True
function f
module name: __main__
parent process: 767
process id: 768
hello bob
False

使用 Pool 类启动多个子进程

若果需要批量启动多个子进程,可以使用进程池的方式批量创建子进程

代码语言:javascript
复制
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x * x

if __name__ == '__main__':
    pool = Pool(processes=4)

    print pool.map(f, range(10))

    for i in pool.imap_unordered(f, range(10)):
        print i

    res = pool.apply_async(f, (20,))
    print res.get(timeout = 1)

    res = pool.apply_async(os.getpid, ())
    print res.get(timeout = 1)

    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print [res.get(timeout=1) for res in multiple_results]

    res = pool.apply_async(time.sleep, (10,))
    try:
        print res.get(timeout = 1)
    except TimeoutError:
        print 'We lacked patience and got a multiporcessing.TimoutError'

运行结果如下:

代码语言:javascript
复制
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
9
16
25
36
64
49
81
400
944
[946, 945, 947, 944]
We lacked patience and got a multiporcessing.TimoutError

进程间通信

multiprocessing 支持 Queues 和 Pipes 两种类型进行进程间通信。

Pipe 既管道模式,调用 pipe 将返回管道的两端的两个链接。pipe 仅仅适用于只有两个进程且一读一写的单双工模式,既信息只能从一个方向向另一个方向流动。

pipe 适用于读写小于要求高的一读一写的单双工模式。

代码语言:javascript
复制
from multiprocessing import Process, Pipe

def f(conn):
    # 向主进程发送信息
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    # 建立 Pipe 并获取连接
    parent_conn, child_conn = Pipe()
    # 建立子进程
    p = Process(target=f, args=(child_conn,))
    p.start()
    # 获取子进程发送的信息
    print parent_conn.recv()
    p.join()

运行结果如下:

代码语言:javascript
复制
[42, None, 'hello']

Queue 在 python 中是基于 Pipe 实现的,Queue 也是一边发送一边接收,Queue 与 Pipe 最大的区别是 Queue 允许同时又多个进程发送和接收数据。

代码语言:javascript
复制
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

运行结果如下:

代码语言:javascript
复制
Process to write: 1046
Put A to queue...
Process to read: 1047
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Queue 是一个线程和进程安全的队列。 Queues are thread and process safe.

多线程

相比于进程,线程更加的轻量,可以实现并发。Python 中的多线程实时上并非真正的多线程,这要从全局解释器锁(GIL)说起,Python 使用的解释器 Cpython 的线程是操作系统的原生线程,在解释器内执行的 Python 代码,都需要获取全局解释器锁才能执行,只有在遇到 I/O 操作时会释放全局解释器锁,由于 Python 的进程做为一个整体,因此解释器进程内只有一个线程在执行,其它的线程都处于等待状态等着全局解释器锁的释放。

若要尽量使用 CPU 资源使用多进程是不错的选择,对于 I/O 密集型操作使用多线程是不错的选择,Python 不适用于计算密集型应用。

在 Python 中使用 Threading 模块(Threading 模块是 Thread 的增强版本,Threading 从 Python 1.5.2 版本开始加入)来实现多线程操作。

使用 Threading 创建线程

看以下简单示例:

代码语言:javascript
复制
import threading

def threadHandler(number):
    print 'thread %s is running...' % threading.current_thread().name
    print number * 2
    print 'thread %s ended.' % threading.current_thread().name

if __name__ == '__main__':
    print 'thread %s is runnging...' % threading.current_thread().name
    for i in range(5):
        my_thread = threading.Thread(target=threadHandler, args=(i,))
        my_thread.start()
        print 'thread %s ended.' % threading.current_thread().name

运行结果如下:

代码语言:javascript
复制
thread MainThread is runnging...
thread Thread-1 is running...
0
thread Thread-1 ended.
 thread MainThread ended.
thread Thread-2 is running...
2
thread Thread-2 ended.
thread MainThread ended.
thread Thread-3 is running...
4
thread Thread-3 ended.
thread MainThread ended.
thread Thread-4 is running...
6
thread Thread-4 ended.
 thread MainThread ended.
thread Thread-5 is running...
8
 thread MainThread ended.
thread Thread-5 ended.

由于任何进程默认都会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python 的 threading 模块的 current_thread() 函数,返回当前线程的实例。

线程锁与线程同步

由于线程共享了进程的上下文环境,所以在多线程中,所有变量都由所有线程共享,任何一个变量都可以被任何一个线程修改。因此,线程之间共享数据最大的风险在于多个线程同个更改一个变量,将内容给改乱了。

我们来看一个多线程操作变量的示例:

代码语言:javascript
复制
import threading, time

total = 0

def update():
    global total
    for i in range(10000):
        total = total + i
        total = total - i

if __name__ == '__main__':
    for i in range(10):
        thread = threading.Thread(target=update, args=())
        thread.start()
    print total

以上示例的设计预期最终 total 的值应该为 0,但是在实际运行中会发现 total 的实际运行结果每次均不相同。造成这种情况的原因是由于 total 在多个线程中被修改造成存储内容的混乱。

现在我们在该示例代码中加入线程锁,有两种方法可以实现 try/finally 和 with。

以下示例使用 try/finally 来实现增加线程锁。

代码语言:javascript
复制
import threading, time

total = 0
lock = threading.Lock()

def update():
    global total
    for i in range(10000):
        lock.acquire()
        try:
            total = total + i
            total = total - i
        finally:
            lock.release()

if __name__ == '__main__':
    for i in range(10):
        my_thread = threading.Thread(target=update, args=())
        my_thread.start()
    print total

以下示例使用 with来实现增加线程锁。

代码语言:javascript
复制
import threading, time

total = 0
lock = threading.Lock()

def update():
    global total
    for i in range(10000):
        with lock:
            total = total + i
            total = total - i

if __name__ == '__main__':
    for i in range(10):
        my_thread = threading.Thread(target=update, args=())
        my_thread.start()
    print total

在使用 try/finally 和 with 增加锁后,以上代码的运行结果均符合预期。

线程间通信

Queue 模块用来实现消息队列功能,可以实现线程间安全的消息交换。各个线程可以通过调用消息队列实例对消息队列进行操纵。

代码语言:javascript
复制
from Queue import Queue
import threading
import os, time, random

# 写数据线程执行的代码:
def write(q):
    print 'thread to write: %s' % threading.current_thread().name
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value
        q.put(value)


# 读数据线程执行的代码:
def read(q):
    print 'Process to read: %s' % threading.current_thread().name
    while q.empty() == False:
        value = q.get(True)
        print 'Get %s from queue.' % value

if __name__=='__main__':
    # 父线程创建Queue,并传给各个子进程:
    q = Queue()
    pw = threading.Thread(target=write, args=(q,))
    pr = threading.Thread(target=read, args=(q,))
    # 启动子线程pw,写入:
    pw.start()
    # 等待pw结束:
    pw.join()
    # 启动子线程pr,读取:
    pr.start()

运行结果如下:

代码语言:javascript
复制
thread to write: Thread-1
Put A to queue...
Put B to queue...
Put C to queue...
Process to read: Thread-2
Get A from queue.
Get B from queue.
Get C from queue.
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-02-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 keinYe 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 多进程
    • 使用 Process 类创建子进程
      • 使用 Pool 类启动多个子进程
        • 进程间通信
        • 多线程
          • 使用 Threading 创建线程
            • 线程锁与线程同步
              • 线程间通信
              相关产品与服务
              消息队列 CMQ 版
              消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档