前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >day41(多线程) - 守护线程、信号量、递归锁、队列、事件、线程池、线程池的回调函数

day41(多线程) - 守护线程、信号量、递归锁、队列、事件、线程池、线程池的回调函数

原创
作者头像
少年包青菜
修改2023-07-10 10:31:06
5820
修改2023-07-10 10:31:06
举报
文章被收录于专栏:Python 学习Python 学习

1.守护线程.py

from threading import Thread
import time


def action():
    for item in range(40):
        print('我是子线程')
        time.sleep(0.5)


if __name__ == '__main__':
    th_obj = Thread(target=action, args=())
    # 设置 .daemon = True,子线程跟着主进程一起死
    # 在 start() 之前写!!!
    th_obj.daemon = True
    th_obj.start()

    # 主进程有限循环
    for i in range(10):
        time.sleep(1)
        print('父进程')

    # th_obj.join()  # 主进程等待子线程完成

2.线程信号量.py

from threading import Thread, Semaphore
import time
import random


def action(th, sem):
    # 控制只有 4 个线程在操作
    # 与 Lock() 一样上锁
    sem.acquire()
    print(th, '进入了程序')
    time.sleep(random.randint(3, 5))
    print(th, '结束了程序')
    sem.release()


if __name__ == '__main__':
    th_list = []
    # 只允许 5个 进程同时操作
    sem_obj = Semaphore(5)
    for i in range(1, 21):
        th_obj = Thread(target=action, args=('线程-> {}'.format(i), sem_obj))
        th_obj.start()
        th_list.append(th_obj)
    for th_obj in th_list:
        th_obj.join()

3.线程的递归锁

# 只要是同一个锁对象,都可以管控全局线程

# 不同的进程在不同的函数内做自己的事儿 # 线程先后顺序不随机 # 谁先拿到第一把锁,则其他的锁都会全部先给第一个拿到第一把锁的人 # 需要多把锁的时候,防止出现 A 一把锁,B 一把锁 造成全局的死锁

from threading import Thread, RLock
import time
import random


def func(th, lock_1, lock_2):
    time.sleep(3)
    lock_1.acquire()
    print(th, '拿到了 锁_1')
    lock_2.acquire()
    print(th, '拿到了 锁_2')
    time.sleep(random.randint(1, 5))
    print(th, '干了一些事')
    lock_1.release()
    print(th, '放回了 锁_1')
    lock_2.release()
    print(th, '放回了 锁_2')
    print('********************')


# 不同的进程在不同的函数内做自己的事儿
# 线程先后顺序不随机
# 谁先拿到第一把锁,则其他的锁都会全部先给第一个拿到第一把锁的人
# 需要多把锁的时候,防止出现 A 一把锁,B 一把锁 造成全局的死锁
def action_1(th, lock_1, lock_2):
    print('action_1 函数做一些别的事')
    func(th, lock_1, lock_2)


def action_2(th, lock_1, lock_2):
    print('action_2 函数做一些别的事')
    func(th, lock_1, lock_2)


def action_3(th, lock_1, lock_2):
    print('action_3 函数做一些别的事')
    func(th, lock_1, lock_2)


if __name__ == '__main__':
    lock1 = lock2 = RLock()
    Thread(target=action_1, args=('线程1', lock1, lock2)).start()
    Thread(target=action_2, args=('线程2', lock1, lock2)).start()
    Thread(target=action_3, args=('线程3', lock1, lock2)).start()

4.线程的队列,和进程一样

from threading import Thread
from queue import Queue
import time
import random


def producer(th_name, que):
    for num in range(1, 6):
        time.sleep(random.randint(1, 3))
        content = th_name + '创造了数据' + str(num)
        print('>>>', content, '<<<')
        # .put(),将生产的数据放进全局队列
        que.put(content)
    que.put(None)  # 最后放一个 None,告知生产者已经结束生产


def consumer(th_name, que):
    while 1:
        # .get(),从队列中拿数据
        # 拿一个队列中就少一个
        # 先放进队列的数据,就先被拿出来
        queue_content = que.get()
        if queue_content is None:
            que.task_done()
            break
        print('===', th_name, '拿到了', queue_content, '===')


if __name__ == '__main__':
    # 队列是全局的,在不同进程之间可传递参数
    que_obj = Queue()
    # 生产者进程
    for i in range(1, 4):
        th_producer = Thread(target=producer, args=('生产者-> {} '.format(i), que_obj))
        th_producer.start()

    # 消费者进程
    for i in range(1, 3):
        th_consumer = Thread(target=consumer, args=('消费者-> {} '.format(i), que_obj))
        th_consumer.start()

5.线程的事件(阻塞)

from threading import Thread, Event
import time
import random


# 模拟红绿灯,设置事件的阻塞状态
def light_action(event):
    while 1:
        # 事件的默认状态是 False
        print('>>>红灯亮了<<<')
        time.sleep(10)
        event.set()
        print('===绿灯亮了===')
        time.sleep(10)
        event.clear()


# 模拟车辆
def car_action(event, t_car):
    # 模拟绿灯
    while 1:
        if event.is_set():
            print(t_car, '===过去了===')
            break
        else:
            print(t_car, '>>>在等待<<<')
            event.wait()
            time.sleep(random.randint(1, 5))


if __name__ == '__main__':
    # 设置一个全局的事件,控制进程之间的阻塞
    event_obj = Event()
    # 一个进程用来模拟红绿灯,设置事件阻塞
    th_light = Thread(target=light_action, args=(event_obj,))
    th_light.start()

    for i in range(1, 101):
        # 随机来车辆
        time.sleep(random.randint(1, 3))
        th_car = Thread(target=car_action, args=(event_obj, '车辆-> {}'.format(i)))
        th_car.start()

6.timeout参数

# 事件为 True,event.wait() 直接返回 True # 事件为 False,event 会一直阻塞,不返回任何值 # 假设设置 timeout=1,则 event 等待一秒,如果一秒到了,事件依然为 False,则直接返回 False

import threading
import time
from threading import Thread


def worker(e: threading.Event):
    if e.wait(timeout=1):
        print("我干活了!")

    else:
        print("我等了一面,事件还没有变成 True,不等了!")


if __name__ == '__main__':
    event = threading.Event()
    t = Thread(target=worker, args=(event,))
    t.start()
    time.sleep(5)
    # event.set()

7.线程池和线程池的回调函数

# .add_done_callback() 使用回调函数, # 该回调无法产生返回值, 即使函数中 return 也不行 # 接收结果反而报错 # t_pool.submit(action1, item).add_done_callback(action2)

from concurrent.futures import ThreadPoolExecutor
import time
from threading import Thread, get_ident
from queue import Queue


def producer(que):  # 模拟动态创建数据(请求)
    i = 1
    while 1:
        que.put('数据--> {}'.format(i))
        if i == 20:
            que.put(None)
        i += 1


def consumer(item):
    time.sleep(3)  # 线程花了一些时间干了一些事
    print('线程号', get_ident(), '拿到了', item)
    return item + '的返回值'


def callback(item):
    print('callback : ', item)


if __name__ == '__main__':
    que_producer = Queue()  # 生产者队列
    th_producer = Thread(target=producer, args=(que_producer,))
    th_producer.daemon = True
    th_producer.start()

    pool_obj = ThreadPoolExecutor(max_workers=3)
    res_list = []
    while 1:
        data = que_producer.get()
        if data is None:
            pool_obj.shutdown()  # 关闭线程池,使进程池不再接受新的任务,相当于进程池中 close() + join()
            break
        # res = pool_obj.submit(consumer, data).add_done_callback(callback)  # 回调函数
        res = pool_obj.submit(consumer, data)  # 非阻塞的
        # print(res.result())  # 千万不要在这里打印结果,否则变成了单线程,先 append ,全部完成后 .result() 取值
        res_list.append(res)

    for res in res_list:
        print(res.result())  # 当有回调函数的时候,.result()取值会报错

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.守护线程.py
  • 2.线程信号量.py
  • 3.线程的递归锁
  • 4.线程的队列,和进程一样
  • 5.线程的事件(阻塞)
  • 6.timeout参数
  • 7.线程池和线程池的回调函数
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档