前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python 并行编程 多线程 锁 信号 条件 事件

python 并行编程 多线程 锁 信号 条件 事件

作者头像
用户5760343
发布2022-05-13 08:44:03
3330
发布2022-05-13 08:44:03
举报
文章被收录于专栏:sktj

共享内存

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}) import threading

def function(i): print ("function called by thread %i\n" % i) return

threads = []

for i in range(5): t = threading.Thread(target=function , args=(i, )) threads.append(t) t.start() t.join()

2、线程继承 import threading import time

exitFlag = 0

class myThread (threading.Thread): def init(self, threadID, name, counter): threading.Thread.init(self) self.threadID = threadID self.name = name self.counter = counter

代码语言:javascript
复制
def run(self):
    print("Starting " + self.name)
    print_time(self.name, self.counter, 5)
    print("Exiting " + self.name)

def print_time(threadName, delay, counter): while counter: if exitFlag: # 译者注:原书中使用的thread,但是Python3中已经不能使用thread,以_thread取代,因此应该 # import _thread # _thread.exit() thread.exit() time.sleep(delay) print("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1

Create new threads

thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2)

Start new Threads

thread1.start() thread2.start()

以下两行为译者添加,如果要获得和图片相同的结果,

下面两行是必须的。疑似原作者的疏漏

thread1.join() thread2.join() print("Exiting Main Thread")

3、threading.Lock() l.acquire() l.release()

-- coding: utf-8 --

import threading

shared_resource_with_lock = 0 shared_resource_with_no_lock = 0 COUNT = 100000 shared_resource_lock = threading.Lock()

有锁的情况

def increment_with_lock(): global shared_resource_with_lock for i in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock += 1 shared_resource_lock.release()

def decrement_with_lock(): global shared_resource_with_lock for i in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock -= 1 shared_resource_lock.release()

没有锁的情况

def increment_without_lock(): global shared_resource_with_no_lock for i in range(COUNT): shared_resource_with_no_lock += 1

def decrement_without_lock(): global shared_resource_with_no_lock for i in range(COUNT): shared_resource_with_no_lock -= 1

if name == "main": t1 = threading.Thread(target=increment_with_lock) t2 = threading.Thread(target=decrement_with_lock) t3 = threading.Thread(target=increment_without_lock) t4 = threading.Thread(target=decrement_without_lock) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join() print ("the value of shared variable with lock management is %s" % shared_resource_with_lock) print ("the value of shared variable with race condition is %s" % shared_resource_with_no_lock)

///rlock() 4| threading.Semaphore(0)

-- coding: utf-8 --

"""Using a Semaphore to synchronize threads""" import threading import time import random

The optional argument gives the initial value for the internal

counter;

it defaults to 1.

If the value given is less than 0, ValueError is raised.

semaphore = threading.Semaphore(0)

def consumer(): print("consumer is waiting.") # Acquire a semaphore semaphore.acquire() # The consumer have access to the shared resource print("Consumer notify : consumed item number %s " % item)

def producer(): global item time.sleep(10) # create a random item item = random.randint(0, 1000) print("producer notify : produced item number %s" % item) # Release a semaphore, incrementing the internal counter by one. # When it is zero on entry and another thread is waiting for it # to become larger than zero again, wake up that thread. semaphore.release()

if name == 'main': for i in range (0,5) : t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join() print("program terminated")

5\ condition() 生产者,消费者 from threading import Thread, Condition import time

items = [] condition = Condition()

class consumer(Thread):

代码语言:javascript
复制
def __init__(self):
    Thread.__init__(self)

def consume(self):
    global condition
    global items
    condition.acquire()
    if len(items) == 0:
        condition.wait()
        print("Consumer notify : no item to consume")
    items.pop()
    print("Consumer notify : consumed 1 item")
    print("Consumer notify : items to consume are " + str(len(items)))

    condition.notify()
    condition.release()

def run(self):
    for i in range(0, 20):
        time.sleep(2)
        self.consume()

class producer(Thread):

代码语言:javascript
复制
def __init__(self):
    Thread.__init__(self)

def produce(self):
    global condition
    global items
    condition.acquire()
    if len(items) == 10:
        condition.wait()
        print("Producer notify : items producted are " + str(len(items)))
        print("Producer notify : stop the production!!")
    items.append(1)
    print("Producer notify : total items producted " + str(len(items)))
    condition.notify()
    condition.release()

def run(self):
    for i in range(0, 20):
        time.sleep(1)
        self.produce()

if name == "main": producer = producer() consumer = consumer() producer.start() consumer.start() producer.join() consumer.join()

(译者在这里添加一段。乍一看这段代码好像会死锁,因为 condition.acquire() 之后就在 .wait() 了,好像会一直持有锁。其实 .wait() 会将锁释放,然后等待其他线程 .notify() 之后会重新尝试获得锁。但是要注意 .notify() 并不会自动释放锁,所以代码中有两行,先 .notify() 然后再 .release() 。

6、event事件使用

-- coding: utf-8 --

import time from threading import Thread, Event import random items = [] event = Event()

class consumer(Thread): def init(self, items, event): Thread.init(self) self.items = items self.event = event

代码语言:javascript
复制
def run(self):
    while True:
        time.sleep(2)
        self.event.wait()
        item = self.items.pop()
        print('Consumer notify : %d popped from list by %s' % (item, self.name))

class producer(Thread): def init(self, integers, event): Thread.init(self) self.items = items self.event = event

代码语言:javascript
复制
def run(self):
    global item
    for i in range(1):
        time.sleep(2)
        item = random.randint(0, 256)
        self.items.append(item)
        print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
        print('Producer notify : event set by %s' % self.name)
        self.event.set()
        print('Produce notify : event cleared by %s '% self.name)
        self.event.clear()

if name == 'main': t1 = producer(items, event) t2 = consumer(items, event) t1.start() t2.start() t1.join() t2.join()

7、with import threading import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def threading_with(statement): with statement: logging.debug('%s acquired via with' % statement)

def threading_not_with(statement): statement.acquire() try: logging.debug('%s acquired directly' % statement ) finally: statement.release()

if name == 'main': # let's create a test battery lock = threading.Lock() rlock = threading.RLock() condition = threading.Condition() mutex = threading.Semaphore(1) threading_synchronization_list = [lock, rlock, condition, mutex] # in the for cycle we call the threading_with e threading_no_with function for statement in threading_synchronization_list : t1 = threading.Thread(target=threading_with, args=(statement,)) t2 = threading.Thread(target=threading_not_with, args=(statement,)) t1.start() t2.start() t1.join() t2.join()

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-05-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Create new threads
  • Start new Threads
  • 以下两行为译者添加,如果要获得和图片相同的结果,
  • 下面两行是必须的。疑似原作者的疏漏
  • -- coding: utf-8 --
  • 有锁的情况
  • 没有锁的情况
  • -- coding: utf-8 --
  • The optional argument gives the initial value for the internal
  • counter;
  • it defaults to 1.
  • If the value given is less than 0, ValueError is raised.
  • -- coding: utf-8 --
相关产品与服务
日志服务
日志服务(Cloud Log Service,CLS)是腾讯云提供的一站式日志服务平台,提供了从日志采集、日志存储到日志检索,图表分析、监控告警、日志投递等多项服务,协助用户通过日志来解决业务运维、服务监控、日志审计等场景问题。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档