前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python高阶教程-并行编程

python高阶教程-并行编程

作者头像
羽翰尘
修改2019-11-26 16:08:42
6890
修改2019-11-26 16:08:42
举报
文章被收录于专栏:技术向技术向

本文由腾讯云+社区自动同步,原文地址 https://stackoverflow.club/book/senior_python/parallel/

本篇内容来自原创小册子《python高阶教程》,点击查看目录

为什么要写并行代码

python的优势在于可以快速构建算法原型,但是执行效率不高。比如说实现一个图像的分类识别算法,我们需要对图像进行预处理。在海量数据面前,单线程明显会成为性能的瓶颈。

用函数实现多线程

如果只是简单的多线程任务,可以写成函数的形式。这主要用到了threading模块中的Thread类。

import threading 
import time 
import random 
def function(i):
      count = 0
      while count < 3:
            time.sleep(random.randint(0,3))
            count += 1
            print("thread %d, time %s" %(i, time.ctime(time.time())))
      return 
for i in range(2):
      t = threading.Thread(target=function, args=(i,))
      t.start()

这段代码主要由一个任务函数functionfor循环体构成。

在循环体中,我们以threading模块中的Thread类为模板,以function和循环体变量i为参数初始化一个实例,然后调用这个实例的start()方法。在function函数中,打印三次当前的时间,但是休眠的时间间隔是随机的。这主要是为了模拟不同的计算量,表明不同线程是并行执行的。

这段代码的执行结果如下:

thread 0, time Mon Jun 18 18:37:13 2018
thread 1, time Mon Jun 18 18:37:14 2018
thread 0, time Mon Jun 18 18:37:14 2018
thread 0, time Mon Jun 18 18:37:16 2018
thread 1, time Mon Jun 18 18:37:17 2018
thread 1, time Mon Jun 18 18:37:19 2018

观察执行结果,可以发现thread 0 和thread 1被随机调度。当然,我们这里两个线程属于同一个进程,微观上,在同一时刻还是只有一个线程被处于运行状态;宏观上,两个线程同时执行。

用类实现多线程

使用函数实现多线程,本质上是产生了多个实例。我们也可以定义一个类,用来继承threading.Thread. 代码如下:

import threading
import time
import random 
 
class myThread (threading.Thread):   #继承父类threading.Thread
    def __init__(self, threadID):
        threading.Thread.__init__(self)
        self.threadID = threadID
    def run(self):                   #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 
        count = 0
        while count < 3:
                time.sleep(random.randint(0,3))
                count += 1
                print("thread %d, time %s" %(self.threadID, time.ctime(time.time())))
        return 
 
# 创建新线程
thread1 = myThread(0)
thread2 = myThread(1)
 
# 开启线程
thread1.start()
thread2.start()

这段代码与使用函数实现多线程功能相同,执行结果如下:

thread 0, time Mon Jun 18 19:11:59 2018
thread 1, time Mon Jun 18 19:11:59 2018
thread 0, time Mon Jun 18 19:12:00 2018
thread 1, time Mon Jun 18 19:12:00 2018
thread 1, time Mon Jun 18 19:12:01 2018
thread 0, time Mon Jun 18 19:12:03 2018

使用lock实现线程互斥

import threading
import time
import random 
count = 0
lock = threading.Lock()
def use_lock(func):
    def wrapper(*arg, **kw):
        global lock 
        lock.acquire()
        res = func(*arg, **kw)
        lock.release()
        return res 
    return wrapper 
def inc(func):
    def wrapper(*arg, **kw):
        global count
        count += 1
        return func(*arg, **kw)
    return wrapper 
def dec(func):
    def wrapper(*arg, **kw):
        global count
        count -= 1
        return func(*arg, **kw)
    return wrapper 
@use_lock
@inc
def increment_lock():
    pass 
@use_lock
@dec 
def decrement_lock():
    pass 
@inc 
def increment():
    pass 
@dec 
def decrement():
    pass 
def thread1():
    count = 0
    while count < 999999:
        count += 1
        increment_lock()
def thread2():
    count = 0
    while count < 999999:
        count += 1
        decrement_lock()
def thread3():
    count = 0
    while count < 999999:
        count += 1
        increment()
def thread4():
    count = 0
    while count < 999999:
        count += 1
        decrement()
thread_ins_1 = threading.Thread(target=thread1, args=())
thread_ins_2 = threading.Thread(target=thread2, args=())
thread_ins_3 = threading.Thread(target=thread3, args=())
thread_ins_4 = threading.Thread(target=thread4, args=())
count = 0
thread_ins_1.start()
thread_ins_2.start()
thread_ins_1.join()
thread_ins_2.join()
print("wiht lock, count is ", count)
count = 0
thread_ins_3.start()
thread_ins_4.start()
thread_ins_3.join()
thread_ins_4.join()
print("without lock, count is ", count)

这段代码的执行结果如下所示:

wiht lock, count is  0
without lock, count is  32376

可以看出,没有使用锁的共享资源出现了错乱。

使用信号量实现线程同步

import threading 
import time 
import random 
# initial value is 0
# which means we have to release it before using it
semaphore = threading.Semaphore(0)
item = 0
def consumer():
    global item 
    time.sleep(random.randint(0,3))
    # If semaphore is not released, then wait
    semaphore.acquire()
    print("Consumer notify: consumed item number %s" %item)
def producer():
    global item 
    global semaphore 
    time.sleep(random.randint(0,3))
    item = random.randint(0,1000)
    print("Producer notify: produced item number %s" %item)
    # release semaphore, which is add it by 1
    semaphore.release()
# run for 3 times
for i in range(3):
    t1 = threading.Thread(target= producer)
    t2 = threading.Thread(target= consumer)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

这里我们把信号量的值初始化为0,意味着必须先释放才能获取。那么释放信号量的线程就可以先执行,如此完成两个线程之间的同步。

代码的执行结果如下:

Producer notify: produced item number 977
Consumer notify: consumed item number 977
Producer notify: produced item number 812
Consumer notify: consumed item number 812
Producer notify: produced item number 500
Consumer notify: consumed item number 500

GIL限制

GIL全称为Global Interpreter Lock,是CPython解释器中用来防止多线程并发执行机器码的一个互斥锁。GIL会造成python的CPU密集型程序的多线程效率低下。

采用多线程来测试GIL的代码如下:

from threading import Thread 
class threads_object(Thread):
    def run(self):
        function_to_run() 
class nothreads_object(object):
    def run(self):
        function_to_run()
def non_threaded(num_iter):
    funcs = []
    for i in range(int(num_iter)):
        funcs.append(nothreads_object())
    for i in funcs:
        i.run() 
def threaded(num_threads):
    funcs = [] 
    for i in range(int(num_threads)):
        funcs.append(threads_object())
    for i in funcs:
        i.start() 
    for i in funcs:
        i.join()
def function_to_run():
    count = 0 
    while count < 1e4:
        count += 1
def show_results(func_name, results):
    print("%-23s %4.6f seconds" %(func_name, results))
if __name__ == '__main__':
    import sys 
    from timeit import Timer 
    repeat = 1000
    number = 1
    num_threads = [1,2,4,8] 
    print("Starting tests") 
    for i in num_threads:
        t = Timer("non_threaded(%s)" \
                   %i, "from __main__ import non_threaded") 
        best_result = min(t.repeat(repeat= repeat, number= number)) 
        show_results("non_threaded (%s iters)" \
                   % i, best_result) 
        t = Timer("threaded(%s)" \
                    %i, "from __main__ import threaded") 
        best_result = min(t.repeat(repeat= repeat, number= number)) 
        show_results( "threaded (%s threads)" \
                    % i, best_result)

结果如下:

Starting tests
non_threaded (1 iters)  0.000940 seconds
threaded (1 threads)    0.001085 seconds
non_threaded (2 iters)  0.001863 seconds
threaded (2 threads)    0.002322 seconds
non_threaded (4 iters)  0.003775 seconds
threaded (4 threads)    0.004687 seconds
non_threaded (8 iters)  0.007560 seconds
threaded (8 threads)    0.009515 seconds
从执行结果上看,采用多线程方案会比顺序执行慢一些。

多进程

多进程的编程模式与多线程颇为相似。

import multiprocessing
import time 
import random 
def function(i):
      count = 0
      while count < 3:
            time.sleep(random.randint(0,3))
            count += 1
            print("process %d, time %s" %(i, time.ctime(time.time())))
      return 
if __name__ == '__main__':
    for i in range(2):
        t = multiprocessing.Process(target=function, args=(i,))
        t.start()

执行结果如下:

process 1, time Tue Jun 19 14:37:49 2018
process 1, time Tue Jun 19 14:37:49 2018
process 0, time Tue Jun 19 14:37:51 2018
process 1, time Tue Jun 19 14:37:52 2018
process 0, time Tue Jun 19 14:37:53 2018
process 0, time Tue Jun 19 14:37:53 2018
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么要写并行代码
  • 用函数实现多线程
  • 用类实现多线程
  • 使用lock实现线程互斥
  • 使用信号量实现线程同步
  • GIL限制
  • 多进程
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档