前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python多线程

python多线程

作者头像
Dean0731
发布2020-05-08 15:41:18
1.7K0
发布2020-05-08 15:41:18
举报
文章被收录于专栏:blog-技术博客blog-技术博客

1 多进程

代码语言:javascript
复制
# 多进程,
import os
import time
from multiprocessing import Process
# 启动时必须在 if __name__ 判断下,windows 必须,其他 无限制
# =================================================
# def func(args):
#     print("子进程:",os.getpid())
#     print("子进程的父进程:",os.getppid())
#     time.sleep(10)
#     print("子进程结束")
# if __name__ =="__main__":
#     p = Process(target=func,args=(1,))  # 注册 并传入元祖 元祖有一个参数要加逗号
#     # p是进程对象
#     p.start() # 开启子进程
#     print("主进程:", os.getpid())
#     print("主进程的父进程:",os.getppid()) # cmd 或者是 pycharm
# 生命周期
#   主进程长:自己执行完结束
#   子进程长:等待子进程结束
# =================================================
# 多进程中的方法
# join
# def fun(arg1,arg2):
#     print('*'*arg1)
#     # time.sleep(5)
#     print('*'*arg2)
# if __name__ == "__main__":
#     p = Process(target=fun,args=(10,20))
#     p.start()
#     # p.join() # 感知子进程结束
#     # time.sleep(1)
#     print("all is stop")
#
#     print("最后的语句")
#     os.walk(r"目录") # 返回 文件夹中文件名字
# =================================================
# def fun():
#     print("xxx")
# if __name__ == "__main__":
#     for i in range(10):
#         p = Process(target=fun)
#         p.start()
#         p.join()  # 停止for循环 进程结束后继续
#         print("for")
#     print("主进程")
# =================================================
# 第二种方法
# class MyProcess(Process):
#     def __init__(self,args):
#         super().__init__()  # 若要传递参数,需要调用父类init
#
#     def run(self):
#         print("子进程",self.__dict__)
#         print(self.pid)
# if __name__ == "__main__":
#     print("主进程:",os.getpid())
#     p1 = MyProcess()
#     p1.start()
# =================================================
# 进程之间数据是隔离,命名空间不通
# def fun():
#     global n
#     n= 0
#     print("pid:%s" %os.getpid(),n)
# if __name__ == "__main__":
#     n=100
#     p = Process(target=fun)
#     p.start()
#     p.join()
#     print(n) # -->100
# =================================================
# 多进程tcp连接
# import socket
# # 客户端
# sk = socket.socket()
# sk.connect(("127.0.0.1",8080))
# sk.send('N好'.encode("utf8"))
# msg = sk.recv(1024).decode("utf8")
# print(msg)
# sk.close()
#
# # 服务端
# def server(conn):
#     ret= "你好".encode("utf8")
#     conn.send(ret)
#     msg = conn.recv(1024).decode("utf8")
#     print(msg)
#     conn.close()
#
# sk = socket.socket()
# sk.bind(("127.0.0.1",8080))
# sk.listen()
# if __name__ == "__main__":
#     while True:
#         conn, addr = sk.accept()
#         p = Process(target=server,args=(conn,))
#         p.start()
# =================================================
# 守护进程
# 默认情况 父进程 等待子进程结束
# p.daemon = True 在start前,设置为守护进程,守护进程随父进程(代码执行完毕)结束
#   若父进程在等待 子进程(非守护进程时) ,若父进程代码完毕,守护进程应该结束
# p.is_alive() 判断进程是否存活
# p.terminate() 终止进程
# =================================================
# 锁
# 未加锁实例:
# 火车票
import json
import time
from multiprocessing import Process
# def show(i):
#     with open('ticket') as f:
#         dic = json.load(f)
#     print('余票: %s'%dic['ticket'])
​
def buy_ticket(i):
    with open('ticket') as f:
        dic = json.load(f)
        time.sleep(0.1)
    if dic['ticket'] > 0 :
        dic['ticket'] -= 1
        print('\033[32m%s买到票了\033[0m'%i)
    else:
        print('\033[31m%s没买到票\033[0m'%i)
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)
if __name__ == '__main__':
    # for i in range(10):
    #     p = Process(target=show,args=(i,))
    #     p.start()
    for i in range(10):
        p = Process(target=buy_ticket, args=(i))
        p.start()
# =================================================
# 锁
# 加锁实例
# 火车票
import json
import time
from multiprocessing import Process
from multiprocessing import Lock
​
# def show(i):
#     with open('ticket') as f:
#         dic = json.load(f)
#     print('余票: %s'%dic['ticket'])
​
def buy_ticket(i,lock):
    lock.acquire() #拿钥匙进门
    with open('ticket') as f:
        dic = json.load(f)
        time.sleep(0.1)
    if dic['ticket'] > 0 :
        dic['ticket'] -= 1
        print('\033[32m%s买到票了\033[0m'%i)
    else:
        print('\033[31m%s没买到票\033[0m'%i)
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)
    lock.release()      # 还钥匙
​
if __name__ == '__main__':
    # for i in range(10):
    #     p = Process(target=show,args=(i,))
    #     p.start()
    lock = Lock()
    for i in range(10):
        p = Process(target=buy_ticket, args=(i,lock))
        p.start()
# =================================================
# =================================================
# =================================================

2 信号量_事件

代码语言:javascript
复制
# 多进程中的组件
# 一个资源 同一时间 被n个人访问
import time
import random
from multiprocessing import Process,Event
# ==============================
# 未用信号量
# def ktv(i):
#     print('%s走进ktv'%i)
#     time.sleep(random.randint(1,5))
#     print('%s走出ktv'%i)
# if __name__ == '__main__' :
#     for i in range(20):
#         p = Process(target=ktv,args=(i))
#         p.start()
# ==============================
from multiprocessing import Semaphore
​
# sem = Semaphore(4)
# sem.acquire()
# print('拿到第一把钥匙')
# sem.acquire()
# print('拿到第二把钥匙')
# sem.acquire()
# print('拿到第三把钥匙')
# sem.acquire()
# print('拿到第四把钥匙')
# sem.acquire()
# print('拿到第五把钥匙')
# def ktv(i,sem):
#     sem.acquire()    #获取钥匙
#     print('%s走进ktv'%i)
#     time.sleep(random.randint(1,5))
#     print('%s走出ktv'%i)
#     sem.release()    # 释放钥匙
#
#
# if __name__ == '__main__' :
#     sem = Semaphore(4)
#     for i in range(20):
#         p = Process(target=ktv,args=(i,sem))
#         p.start()
​
# ==============================
# 事件
#   信号是控制进程阻塞与否
#   事件创建后,默认是阻塞状态
# e = Event() # 创建事件
# e.is_set()  # False 默认阻塞
# print("xx") # 可打印 e.set() 设置为True e.clear() 设置为False
# e.wait()
# print("xx") # 阻塞
# 遇到wait()会判断is_set() 为False 阻塞
# ==============================
# 红绿灯事件
def cars(e,i):
    if not e.is_set():
        print("car%i在等待" % i)
        e.wait()
    print("car%i通过" % i)
​
​
def light(e):
    while True:
        if e.is_set():
            e.clear()
            print("\033[31m红灯\033[0m")
        else:
            e.set()
            print("\033[32m绿灯\033[0m")
        time.sleep(2)
if __name__ == "__main__":
    e = Event()
    p =Process(target=light,args=(e,))
    p.start()
    for i in range(1,21):
        car = Process(target=cars,args=(e,i))
        car.start()
        time.sleep(random.random())
# ==============================
# ==============================
# ==============================
# ==============================
# def test(e):
#     e.set()
#     print("xxx")
# if __name__=="__main__":
#     e = Event()
#     print(e.is_set())
#     p = Process(target=test,args=(e,))
#     p.start()
#     e.wait()
#     print("注")
​

3 进程通信_队列管道

代码语言:javascript
复制
# IPC 内部进程通信,不能使用普通queue
from multiprocessing import Queue,Process
# ===============================
# q = Queue(5)  # 队列大小
# q.put(1)
# q.put(1)
# q.full()  # 若队列满了,阻塞等待
# q.get()
# q.empty()  # 若为空,阻塞等待有数据 后取值
# q.get()
# q.get_nowait()  # 用于跳过等待,需要用try
# ===============================
# def produce(q):
#     q.put('hello')
# def consume(q):
#     print(q.get())
# if __name__ =="__main__":
#     q = Queue()
#     p = Process(target=produce,args=(q,))
#     p.start()
#     p2 = Process(target=consume, args=(q,))
#     p2.start()
# ===============================
# 生产者消费者模型
# 若生产者,生产有数量,消费者,不停消费,最后消费进程会处于等待状态
# 可在主进程后边join生产进程,消费进程判断为空,但不准确
#   需要在队列put(None) 子进程判断,由于数据之间不能共享,需要put 消费数量的None
# ===============================
from multiprocessing import JoinableQueue
# consume :
#     ....
#     q.task_done()
# produce :
#     ...
#     q.join()
# ===============================
#  循环通知,致使进程结束
# JoinableQueue
# 生产者生产,不停不停消费,若q为空 一直等待,
#   生产者完毕后,会join等待消费值消费完毕,因为是同一个q,一个生产者完毕后,其他还没有完毕q会处于,他会处于阻塞
#   等待 消费者 全部消费完毕,q.join()会感知,因此 生产进程会结束,主进程最后join生产进程,生产结束
#   主进程就结束,身为守护进程的子进程也结束
# import time
# import random
# from multiprocessing import Process,JoinableQueue
# def consumer(q,name):
#     while True:
#         food = q.get()
#         print('\033[31m%s消费了%s\033[0m' % (name,food))
#         time.sleep(random.randint(1,3))
#         q.task_done()     # count - 1
#
# def producer(name,food,q):
#     for i in range(4):
#         time.sleep(random.randint(1,3))
#         f = '%s生产了%s%s'%(name,food,i)
#         print(f)
#         q.put(f)
#     q.join()    # 阻塞  直到一个队列中的所有数据 全部被处理完毕
#
# if __name__  == '__main__':
#     q = JoinableQueue(20)
#     p1 = Process(target=producer,args=('Egon','包子',q))
#     p2 = Process(target=producer, args=('wusir','泔水', q))
#     c1 = Process(target=consumer, args=(q,'alex'))
#     c2 = Process(target=consumer, args=(q,'jinboss'))
#     p1.start()
#     p2.start()
#     c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
#     c2.daemon = True
#     c1.start()
#     c2.start()
#     p1.join()
#     p2.join()      # 感知一个进程的结束
​
#  在消费者这一端:
    # 每次获取一个数据
    # 处理一个数据
    # 发送一个记号 : 标志一个数据被处理成功
​
# 在生产者这一端:
    # 每一次生产一个数据,
    # 且每一次生产的数据都放在队列中
    # 在队列中刻上一个记号
    # 当生产者全部生产完毕之后,
    # join信号 : 已经停止生产数据了
                # 且要等待之前被刻上的记号都被消费完
                # 当数据都被处理完时,join阻塞结束
​
# consumer 中把所有的任务消耗完
# producer 端 的 join感知到,停止阻塞
# 所有的producer进程结束
# 主进程中的p.join结束
# 主进程中代码结束
# 守护进程(消费者的进程)结束
# ===============================
# 管道 双向通信工具
from multiprocessing import Pipe
# conn,conn2 = Pipe()
# conn.send("123456")  # 不用字节
# print(conn2.recv())  # 不用指定大小
​
def fun(conn):
    conn.send("hello")
if __name__=="__main__":
    conn1,conn2  = Pipe()
    Process(target=fun,args=(conn1,)).start()
    print(conn1.recv())
# 管道返回2个连接
#    conn1, conn2
# P    发送 接受
# p2   接受 发送
# 若只发数据,可关闭一端,若取数据的时候,对面已关闭,则报错,看根据此 终止程序
from multiprocessing import Pipe,Process
# def func(conn1,conn2):
#     conn2.close()
#     while True:
#         try :
#             msg = conn1.recv()
#             print(msg)
#         except EOFError:
#             conn1.close()
#             break
#
# if __name__ == '__main__':
#     conn1, conn2 = Pipe()
#     Process(target=func,args = (conn1,conn2)).start()
#     conn1.close()
#     for i in range(20):
#         conn2.send('吃了么')
#     conn2.close()
# ===============================
# from multiprocessing import Lock,Pipe,Process
# def producer(con,pro,name,food):
#     con.close()
#     for i in range(100):
#         f = '%s生产%s%s'%(name,food,i)
#         print(f)
#         pro.send(f)
#     pro.send(None)
#     pro.send(None)
#     pro.send(None)
#     pro.close()
#
# def consumer(con,pro,name,lock):
#     pro.close()
#     while True:
#             lock.acquire()  # 不安全主要是recv的时候,因此两端加锁即可
#             food = con.recv()
#             lock.release()
#             if food is None:
#                 con.close()
#                 break
#             print('%s吃了%s' % (name, food))
# if __name__ == '__main__':
#     con,pro = Pipe()
#     lock= Lock()
#     p = Process(target=producer,args=(con,pro,'egon','泔水'))
#     c1 = Process(target=consumer, args=(con, pro, 'alex',lock))
#     c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock))
#     c3 = Process(target=consumer, args=(con, pro, 'wusir',lock))
#     c1.start()
#     c2.start()
#     c3.start()
#     p.start()
#     con.close()
#     pro.close()
​
# from multiprocessing import Process,Pipe,Lock
#
# def consumer(produce, consume,name,lock):
#     produce.close()
#     while True:
#         lock.acquire()
#         baozi=consume.recv()
#         lock.release()
#         if baozi:
#             print('%s 收到包子:%s' %(name,baozi))
#         else:
#             consume.close()
#             break
#
# def producer(produce, consume,n):
#     consume.close()
#     for i in range(n):
#         produce.send(i)
#     produce.send(None)
#     produce.send(None)
#     produce.close()
#
# if __name__ == '__main__':
#     produce,consume=Pipe()
#     lock = Lock()
#     c1=Process(target=consumer,args=(produce,consume,'c1',lock))
#     c2=Process(target=consumer,args=(produce,consume,'c2',lock))
#     p1=Process(target=producer,args=(produce,consume,30))
#     c1.start()
#     c2.start()
#     p1.start()
#     produce.close()
#     consume.close()
​
# pipe 数据不安全性
# IPC
# 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象
​
# 队列 进程之间数据安全的
#   管道 + 锁

3_进程通信__数据共享

代码语言:javascript
复制
from multiprocessing import Manager,Process,Lock
# 进程之间不能传递数据,通过方法的参数可以传过去,但修改后,无反应,不知道原因
def main(dict,lock):
    lock.acquire()
    dict['count']-=1
    lock.release()
if __name__ == "__main__":
    m = Manager()
    l = Lock()
    dict = m.dict({"count":100})
    p_list=[]
    for i in range(50):
        p = Process(target=main,args=(dict,l))
        p.start()
        p_list.append(p)
    for i in p_list:i.join()
    print(dict['count'])

4_进程池

代码语言:javascript
复制
# 进程池
# 上一个例子中50个进程很慢
#   寄存器 堆栈 文件
#   操作系统调度,cup切换
# 高级线程池有数量限定 ,有最低,任务变多的时候 逐步加到最高限制
import os,time
from multiprocessing import Pool,Manager
# ==================================
# def func2(i):
#     print(os.getpid(),os.getppid())
#     i+1
# def func(list):
#     list[1]['set'].add(os.getpid())
#     print(len(list[1]['set']))
# # 一般超过5个使用pool
# if __name__ == "__main__":
#     pid = Manager()
#     dict1 = pid.dict({"set":set()})
#     pool = Pool(5)
#     # 执行不同的任务,map 自带join方法
#     #pool.map(func2,range(100))
#     pool.map(func, [[i,dict1]for i in range(100)])
#     print(len(dict1['set']))
# """
# 等于
#     for i in range(100):
#         p = Process(target=func,args=(i,))
#         p.start()
# """
# ==================================
# def fun(n):
#     print("start fun%s" %n,os.getpid())
#     time.sleep(1)
#     print("end fun%s" % n, os.getpid())
# if __name__ == "__main__":
#     p = Pool() # 默认cup核心数量
#     for i in range(10):
#         #p.apply(fun,args=(i,)) # 同步提交的
#         p.apply_async(fun,args=(i,)) # 异步提交,真的异步,因此需要join
#     p.close()  # 不再接受新的任务
#     p.join()  # 感知进程池中任务结束 保持 主进程 与子进程同步
​
# ==================================
# import socket
# from multiprocessing import Pool
#
# def func(conn):
#     conn.send(b'hello')
#     print(conn.recv(1024).decode('utf-8'))
#     conn.close()
#
# if __name__ == '__main__':
#     p = Pool(5)
#     sk = socket.socket()
#     sk.bind(('127.0.0.1',8080))
#     sk.listen()
#     while True:
#         conn, addr = sk.accept()
#         p.apply_async(func,args=(conn,))
#     sk.close()
#     import socket
#
#     sk = socket.socket()
#     sk.connect(('127.0.0.1', 8080))
#
#     ret = sk.recv(1024).decode('utf-8')
#     print(ret)
#     msg = input('>>>').encode('utf-8')
#     sk.send(msg)
#     sk.close()
# ==================================
# 进程池的返回值
# p = Pool()
# p.map(funcname,iterable)     默认异步的执行任务,且自带close和join
# p.apply   同步调用的
# p.apply_async 异步调用 和主进程完全异步 需要手动close 和 join
# from multiprocessing import Pool
# def func(i):
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     for i in range(10):
#         res = p.apply(func,args=(i,))   # apply的结果就是func的返回值
#         print(res) --> 直接就是返回值
# ==================================
# import time
# from multiprocessing import Pool
# def func(i):
#     time.sleep(0.5)
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     res_l = []
#     for i in range(10):
#         res = p.apply_async(func,args=(i,))   # apply的结果就是func的返回值
#         res_l.append(res)
#   若在for 中直接获取res.get()会在成阻塞,程序变同步执行
#     for res in res_l:print(res.get())# 等着 func的计算结果
#    调用res.get时返回
# ==================================
# import time
# from multiprocessing import Pool
# def func(i):
#     time.sleep(0.5)
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     ret = p.map(func,range(100))
#     print(ret)  #  -> 直接返回全部,列表返回
# 自带join,close 最后一起返回
# ====================================
# 回调函数 , 回调的函数在主进程调用
# 对于子进程中再起子进程问题,还不知道
# 每个进程的回调函数 交给主进程顺序执行
import os
from multiprocessing import Pool,Process
def func2(nn):
    print('in func2',os.getpid())
    print(nn)
def func3(n):
    print('in func3', os.getpid())
    return n*n
def func1(n):
    print('in func1',os.getpid())
    p = Pool(5)
    p.apply_async(func3, args=(10,), callback=func2)
    p.close()
    p.join()
    return n*n
if __name__ == '__main__':
    print('主进程 :',os.getpid())
    p = Pool(5)
    p.apply_async(func1,args=(10,),callback=func2)
    p.close()
    p.join()
# ===================================================
import requests
from urllib.request import urlopen
from multiprocessing import Pool
# 200 网页正常的返回
# 404 网页找不到
# 502 504
# 场景:callback 耗时段,远小于网络延时,此时使用,在主进程运行,
def get(url):
    response = requests.get(url)
    if response.status_code == 200:
        return url, response.content.decode('utf-8')
​
​
def get_urllib(url):
    ret = urlopen(url)
    return ret.read().decode('utf-8')
​
​
def call_back(args):
    url, content = args
    print(url, len(content))
​
​
if __name__ == '__main__':
    url_lst = [
        'https://www.cnblogs.com/',
        'http://www.baidu.com',
        'https://www.sogou.com/',
        'http://www.sohu.com/',
    ]
    p = Pool(5)
    for url in url_lst:
        p.apply_async(get, args=(url,), callback=call_back) # callback 中的参数为 get函数的返回值
    p.close()
    p.join()

4_线程

两者之间应该有对应关系1:1 1:n

linux 中的nptl 1对1 线程

代码语言:javascript
复制
# 同一进程的线程间的数据共享的,共享的 共享的
#   可通过直接访问全局变量 global,还需要进程同步
#   创建,切换,撤销 相比进程 消耗小,轻量级
#   进程:资源分配单位,每个进程 至少一个线程
#   线程:cup调度单位
# thread 基本模块,避免使用,可能与threading 冲突
# threading thread的高级版本
# Queue 多线程之间共享数据的数据结构
# 与进程类似,好多方法相同
import time
from threading import Thread
import threading
# def func(n):
#     time.sleep(1)
#     print(n)
# t = Thread(target=func,args=(12,))
# t.daemon = True # 成为"守护线程"
# t.start()
# print("主线程") # 默认情况等待子线程结束
# ===================================
# class MyThread(Thread):
#     def __init__(self,name):
#         super().__init__()
#         self.name = name
#     def run(self):
#         # time.sleep(1)
#         print(self.name)
# MyThread("段志方").start()
# ================================
# GIL 锁的是线程,同一时间 只有一个线程 ,cpython解释器的问题,jpython 就不会
# 对于io密集型 没什么区别,只要io时会切换即可
# 但对于多核cup python 同时只能运行一个cup ,其他语言的会运行多个,因此...
# 即不能通过物理核心数增加速度,不能实现(并行)
# ============================================
# 多线程socket 可以input
# import socket
# from threading import Thread
# def chat(conn):
#     conn.send(b'hello')
#     msg = conn.recv(1024).decode('utf-8')
#     print(msg)
#     conn.close()
# sk = socket.socket()
# sk.bind(('127.0.0.1',8080))
# sk.listen()
# while True:
#     conn,addr = sk.accept()
#     Thread(target=chat,args = (conn,)).start()
# sk.close()
#
# import socket
# sk = socket.socket()
# sk.connect(('127.0.0.1',8080))
# msg = sk.recv(1024)
# print(msg)
# inp = input('>>> ').encode('utf-8')
# sk.send(inp)
# sk.close()
# =========================
# print(threading.current_thread()) # 当前线程
# print(threading.active_count()) # 全部线程,包括主线程
# print(threading.enumerate())  # 列表返回全部线程对象
# ==========================================
# 守护线程
# import time
# from threading import Thread
# def func1():
#     while True:
#         print('*'*10)
#         time.sleep(1)
# def func2():
#     print('in func2')
#     time.sleep(5)
#
# t = Thread(target=func1,)
# t.daemon = True
# t.start()
# t2 = Thread(target=func2,)
# t2.start()
# t2.join()
# print('主线程')
​
# (守护进程)随着(主进程代码)的执行结束而结束
# 守护(线程)会在主线程结束之后等待(其他非守护子线程)的结束才结束
​
# 主进程在执行完自己的代码之后不会立即结束 而是等待子进程结束之后 回收子进程的资源
# import time
# from multiprocessing import Process
# def func():
#     time.sleep(5)
#
# if __name__ == '__main__':
#         Process(target=func).start()
# =========================================
# 线程锁 ,与gil无关
import time
from threading import Lock,Thread
# Lock 互斥锁
# def func(lock):
#     global n
#     lock.acquire()
#     temp = n
#     time.sleep(0.2)
#     n = temp - 1
#     lock.release()
#
# n = 10
# t_lst = []
# lock = Lock()
# for i in range(10):
#     t = Thread(target=func,args=(lock,))
#     t.start()
#     t_lst.append(t)
​
# for t in  t_lst: t.join()
# print(n)
​
# 科学家吃面   还会死锁
​
# noodle_lock  = Lock()
# fork_lock = Lock()
# def eat1(name):
#     noodle_lock.acquire()
#     print('%s拿到面条啦'%name)
#     fork_lock.acquire()
#     print('%s拿到叉子了'%name)
#     print('%s吃面'%name)
#     fork_lock.release()
#     noodle_lock.release()
#
# def eat2(name):
#     fork_lock.acquire()
#     print('%s拿到叉子了'%name)
#     time.sleep(1)
#     noodle_lock.acquire()
#     print('%s拿到面条啦'%name)
#     print('%s吃面'%name)
#     noodle_lock.release()
#     fork_lock.release()
#
# Thread(target=eat1,args=('alex',)).start()
# Thread(target=eat2,args=('Egon',)).start()
# Thread(target=eat1,args=('bossjin',)).start()
# Thread(target=eat2,args=('nezha',)).start()
# ===============================================
from threading import RLock   # 递归锁
fork_lock = noodle_lock  = RLock()
# 一个钥匙串上的两把钥匙,同一个lock 在一个线程中可又多次acquire
# 传给其他线程时 不能被acquire
# def eat1(name):
#     print(name)
#     noodle_lock.acquire()            # 一把钥匙
#     print('%s拿到面条啦'%name)
#     fork_lock.acquire()
#     print('%s拿到叉子了'%name)
#     print('%s吃面'%name)
#     fork_lock.release()
#     noodle_lock.release()
#
# def eat2(name):
#     print(name)
#     fork_lock.acquire()
#     print('%s拿到叉子了'%name)
#     time.sleep(1)
#     noodle_lock.acquire()
#     print('%s拿到面条啦'%name)
#     print('%s吃面'%name)
#     noodle_lock.release()
#     fork_lock.release()
# Thread(target=eat1,args=('alex',)).start()
# Thread(target=eat2,args=('Egon',)).start()
# Thread(target=eat1,args=('bossjin',)).start()
# Thread(target=eat2,args=('nezha',)).start()
# =================================================

5_线程_信号量_事件_条件_定时器_列队_线程池

代码语言:javascript
复制
import time
from threading import Semaphore,Thread
# ====================================
# def func(sem,a,b):
#     sem.acquire()
#     time.sleep(1)
#     print(a+b)
#     sem.release()
# sem = Semaphore(4)
# for i in range(10):
#     t = Thread(target=func,args=(sem,i,i+5))
#     t.start()
# ====================================
# 事件被创建的时候
# False状态
    # wait() 阻塞
# True状态
    # wait() 非阻塞
# clear 设置状态为False
# set  设置状态为True
#  数据库 - 文件夹
#  文件夹里有好多excel表格
    # 1.能够更方便的对数据进行增删改查
    # 2.安全访问的机制
#  起两个线程
#  第一个线程 : 连接数据库
        # 等待一个信号 告诉我我们之间的网络是通的
        # 连接数据库
#  第二个线程 : 检测与数据库之间的网络是否连通
        # time.sleep(0,2) 2
        # 将事件的状态设置为True
# import time
# import random
# from threading import Thread,Event
# def connect_db(e):
#     count = 0
#     while count < 3:
#         e.wait(0.5)   # 状态为False的时候,我只等待1s就结束
#         if e.is_set() == True:
#             print('连接数据库')
#             break
#         else:
#             count += 1
#             print('第%s次连接失败'%count)
#     else:
#         raise TimeoutError('数据库连接超时')
# def check_web(e):
#     time.sleep(random.randint(0,3))
#     e.set()
# e = Event()
# t1 = Thread(target=connect_db,args=(e,))
# t2 = Thread(target=check_web,args=(e,))
# t1.start()
# t2.start()
# ====================================
# 条件 复杂的锁
# 条件
from threading import Condition
# 条件
# 锁
# acquire release
# 一个条件被创建之初 默认有一个(False)状态
# False状态 会影响wait一直处于等待状态
# notify(int数据类型)  造钥匙
# from threading import Thread,Condition
# def func(con,i):
#     con.acquire()
#     con.wait() # 等钥匙
#     print('在第%s个循环里'%i)
#     con.release()
# con = Condition()
# for i in range(10):
#     Thread(target=func,args = (con,i)).start()
# while True:
#     num = int(input('>>>'))
#     con.acquire()
#     con.notify(num)  # 造钥匙
#     con.release()
# ====================================
#定时器
# import time
# from threading import Timer
# def func():
#     print('时间同步')   #1-3
# while True:
#     t = Timer(5,func).start()   # 非阻塞的 ,异步的 ,会把所有的5s在一起
#     time.sleep(5) # 睡5s 每5s进行意思时间同步
# ====================================
# 加锁 麻烦 所以使用队列
#线程通信
# queue
# import queue #直接导入普通queue 是线程安全的
# q = queue.Queue()  # 队列 先进先出
# q.put()
# q.get()
# q.put_nowait()
# q.get_nowait()
# q = queue.LifoQueue()  # 栈 先进后出
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get())
# print(q.get())
# q = queue.PriorityQueue()  # 优先级队列
# q.put((1,'a'))
# q.put((10,'b'))
# q.put((30,'c'))
# q.put((1,'d'))
# q.put((1,'f'))
# print(q.get())
# 元祖中的元素按顺序比较,数字越小优先级大,祖父按照ascii越小优先级越大
# ====================================
# 线程池
import time
# 以前没有线程池
from concurrent.futures import ThreadPoolExecutor
# ProcessPoolExecutor 该模块下还有一个进程池,与multi 功能相同
# submit(fn,*args,**kwargs) 异步提交任务
# map(fun,*iterables,timeout=None,chunksize - 1) 循环的submit
# shutdown(wait=True) # 等于原来的 close join 合并
# result(time=None) 取得结果
# add_done_callback(fn) 回调函数
def func(n):
    time.sleep(2)
    print(n)
    return n*n
def call_back(m):
    print('结果是 %s'%m.result())
# 若使用进程池 只换ThreadPoolExecutor->ProcessPoolExecutor
tpool = ThreadPoolExecutor(max_workers=5)   #  默认 不要超过cpu个数*5
for i in range(20):
    tpool.submit(func,i).add_done_callback(call_back)
tpool.shutdown()
# tpool.map(func,range(20))  # 拿不到返回值
​
# t_lst = []
# for i in  range(20):
#     t = tpool.submit(func,i)
#     t_lst.append(t)
# tpool.shutdown()  # close+join    #
# print('主线程')
# for t in t_lst:print('***',t.result()) # 拿返回值

6_协程

代码语言:javascript
复制
# 进程 多个进程,操作系统负责
# 线程 不能同一时间多个cup 其他语言可以,但不影响高io
#   开启线程 创建线程 寄存器 堆栈
#   关闭一个线程
# 协程
#   本质是一个线程
#   能够在多个任务间切换,不需要寄存器,堆栈切换
#   任务之间切换时间开销 远小于线程
#   计算任务之间切换消耗也很大,一般都是遇到io的时候切换
#   进程(cup数+1)+线程(cup数*5)+协程(500)  = 50000
#   适合爬虫
# 实现并发的手段
# import time
# 实现在 con,pro之间来回切换
# def consumer():
#     while True:
#         x = yield
#         time.sleep(1)
#         print('处理了数据 :',x)
#
# def producer():
#     c = consumer()
#     next(c)
#     for i in range(10):
#         time.sleep(1)
#         print('生产了数据 :',i)
#         c.send(i)
#
# producer()
# =============================================
# 真正的协程模块就是使用greenlet完成的切换
from greenlet import greenlet
# def eat():
#     print('eating start')
#     g2.switch()
#     print('eating end')
#     g2.switch()
#
# def play():
#     print('playing start')
#     g1.switch()
#     print('playing end')
# g1 = greenlet(eat)  # 必须先有g1 ,g2 函数中才能使用g
# g2 = greenlet(play) # 不会自动切换
# g1.switch()
# ======================================
#  不能感知time.sleep(1)
# 可以感知gevent.sleep(1),在第一行引入 如下from...
# 后边的time 都会经过特殊处理,time.sleep() 就可以被识别
# from gevent import monkey;monkey.patch_all()
# import time
# import gevent
# import threading
# def eat():
#     DummyThread-1 虚拟的线程
#     print(threading.current_thread().getName())
#     print(threading.current_thread())
#     print('eating start')
#     time.sleep(1)
#     print('eating end')
#
# def play():
#     DummyThread-2 虚拟的线程
#     print(threading.current_thread().getName())
#     print(threading.current_thread())
#     print('playing start')
#     time.sleep(1)
#     print('playing end')
#
# g1 = gevent.spawn(eat) # 注册进入,会自动切换,不是操作系统调度
# g2 = gevent.spawn(play) # gevent 负责协程的调度 通过封装的greenlet switch
# g1.join()  gevent 是完全异步的  join等待协程结束
# g2.join()
# 进程和线程的任务切换右操作系统完成
# 协程任务之间的切换由程序(代码)完成,只有遇到协程模块能识别的IO操作,(时间片等不识别)的时候,程序才会进行任务切换,实现并发的效果
# ========================================
# 同步 和 异步
# from gevent import monkey;monkey.patch_all() # 放最前面
# import time
# import gevent
# def task(n):
#     time.sleep(1)
#     print(n)
# def sync(): # 同步
#     for i in range(10):
#         task(i)
# def async(): # 异步
#     g_lst = []
#     for i in range(10):
#         g = gevent.spawn(task,i)
#         g_lst.append(g)
#     gevent.joinall(g_lst)   #两种方法都可
#     for g in g_lst:g.join()
# ======================================
# 协程 : 能够在一个线程中实现并发效果的概念
    #    能够规避一些任务中的IO操作
    #    在任务的执行过程中,检测到IO就切换到其他任务
​
# 多线程 被弱化了
# 协程 在一个线程上 提高CPU 的利用率
# 协程相比于多线程的优势 切换的效率更快
# ==========================================
# 爬虫的例子
# 请求过程中的IO等待
# from gevent import monkey;monkey.patch_all()
# import gevent
# from urllib.request import urlopen    # 内置的模块
# urlopen html时有个格式的 reguests 无格式
# def get_url(url):
#     response = urlopen(url)
#     content = response.read().decode('utf-8')
#     return len(content)
#
# g1 = gevent.spawn(get_url,'http://www.baidu.com')
# g2 = gevent.spawn(get_url,'http://www.sogou.com')
# g3 = gevent.spawn(get_url,'http://www.taobao.com')
# g4 = gevent.spawn(get_url,'http://www.hao123.com')
# g5 = gevent.spawn(get_url,'http://www.cnblogs.com')
# gevent.joinall([g1,g2,g3,g4,g5])
# print(g1.value)
# print(g2.value)
# print(g3.value)
# print(g4.value)
# print(g5.value)
​
# ret = get_url('http://www.baidu.com')
# print(ret)
# ======================================
from gevent import monkey;monkey.patch_all()
import socket
import gevent
def talk(conn):
    conn.send(b'hello')
    print(conn.recv(1024).decode('utf-8'))
    conn.close()
​
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
    conn,addr = sk.accept()
    gevent.spawn(talk,conn)
sk.close()
​
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
print(sk.recv(1024))
msg = input('>>>').encode('utf-8')
sk.send(msg)
sk.close()

7_io模型

阻塞模型

非阻塞模型

io多路复用

代码语言:javascript
复制
# 同步 提交一个任务之后要等待这个任务执行完毕
# 异步 只管提交任务,不等待这个任务执行完毕就可以做其他事情
# 阻塞 recv recvfrom accept
# 非阻塞

# 阻塞   线程   运行状态 --> 阻塞状态 --> 就绪
# 非阻塞

# IO多路复用
    # select机制  Windows  linux  都是操作系统轮询每一个被监听的项,看是否有读操作
    # poll机制    linux          它可以监听的对象比select机制可以监听的数量多
                                 # 随着监听项的增多,导致效率降低
    # epoll机制   linux           更高级,绑定回调函数,
# =================================
# 以前的都是阻塞io
# =================================
# 非阻塞io实例
# import socket
# sk = socket.socket()
# sk.bind(('127.0.0.1',9000))
# sk.setblocking(False)  # 设置不阻塞
# sk.listen()
# conn_l = []
# del_conn = []
# while True:
#     try:
#         conn,addr = sk.accept()  #不阻塞,但是没人连我会报错
#         print('建立连接了:',addr)
#         conn_l.append(conn)
#     except BlockingIOError:
#         for con in conn_l:
#             try:
#                 msg = con.recv(1024)  # 非阻塞,如果没有数据就报错
#                 if msg == b'':   # 若客户端关闭 会发送空消息
#                     del_conn.append(con)
#                     continue
#                 print(msg)
#                 con.send(b'byebye')
#             except BlockingIOError:pass
#         for con in del_conn:
#             con.close()
#             conn_l.remove(con)
#         del_conn.clear()
# # while True : 10000   500  501
#
# import time
# import socket
# import threading
# def func():
#     sk = socket.socket()
#     sk.connect(('127.0.0.1',9000))
#     sk.send(b'hello')
#     time.sleep(1)
#     print(sk.recv(1024))
#     sk.close()
#
# for i in range(2):
#     threading.Thread(target=func).start()
# =================================
# io 多路复用, 监听列表的循环 变为有操作系统执行
import select
import socket

sk = socket.socket()
sk.bind(('127.0.0.1',8000))
sk.setblocking(False)
sk.listen()

read_lst = [sk] # 监听列表
while True:   # [sk,conn]
    # 等待读列表,写列表,修改列表 都必传
    # 返回元祖中3个列表,对应三个list,一般只用第一个
    # r_lst里面就是sk对象
    r_lst,w_lst,x_lst = select.select(read_lst,[],[])
    for i in r_lst:
        if i is sk:
            conn,addr = i.accept()
            read_lst.append(conn)
        else:
            ret = i.recv(1024)
            if ret == b'':
                i.close()
                read_lst.remove(i)
                continue
            print(ret)
            i.send(b'goodbye!')
            import time
            import socket
            import threading


            def func():
                sk = socket.socket()
                sk.connect(('127.0.0.1', 8000))
                sk.send(b'hello')
                time.sleep(3)
                print(sk.recv(1024))
                sk.close()
            for i in range(20):
                threading.Thread(target=func).start()

# =================================
import selectors # 选择合适的多路复用机制
from socket import *

def accept(sk,mask):
    conn,addr=sk.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sk=socket()
sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk.bind(('127.0.0.1',8088))
sk.listen(5)
sk.setblocking(False) #设置socket的接口为非阻塞

sel = selectors.DefaultSelector()   # 自动选择一个适合我的IO多路复用的机制
sel.register(sk,selectors.EVENT_READ,accept)
#相当于网select的读列表里append了一个sk对象,并且绑定了一个回调函数accept
# 说白了就是 如果有人请求连接sk,就调用accrpt方法

while True:
    events=sel.select() #检测所有的sk,conn,是否有完成wait data阶段
    for sel_obj,mask in events:  # [conn]
        callback=sel_obj.data #callback=read
        callback(sel_obj.fileobj,mask) #read(conn,1)

pymysql

代码语言:javascript
复制
import pymysql
# 连接
conn = pymysql.connect(
    host="106.15.39.74",
    port=3306,
    database="test",
    user="root",
    password="dzf123,.",
    charset="utf8" # 没有"-" 没有
)
cursor = conn.cursor()
sql = "select*from student"
name = "dzf"
password = "123456"
sql = "select * from student where name = %s and password = %s"
ret = cursor.execute(sql,[name,password])
# 自己拼接需要加引号,使用防注入sql不用加引号,参数不能少,多
#print(cursor.lastrowid) # 获取刚插入数据的id 应该就是主键 自增的那个,与名字无关
print(ret) # 返回受影响行数
ret = cursor.fetchall() # 元祖 大元组里边小元祖
print(ret,"a")
ret = cursor.fetchone() # 取一条数据
print(ret,"a")
ret = cursor.fetchone()
print(ret,"a")
# 直接返回一条元素,格式是 小元祖,或只有list中的一个小字典,外边没有元祖或list
# 若连续fetchone() 第一次第一条,第二次第二条,一次向下取
# 若取完后 再次 fetchone() 取不到
# -->(('dzf','1234'),('dzf','1234'))
# 在执行语句前 修改cursor格式
cursor.fetchmany(3) # 在cursor位置接下取3条,大元组中小元祖
# 移动光标
cursor.scroll(1,mode="absolute") # 绝对移动 移到1位置,从2开始 ,
cursor.scroll(1,mode="relative") # 相对移动 原来在3 位置,从4 开始读,现在 移动到4 从5开始读
# 向上移可以使用负的
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 指定为字典格式
"""
    [
    {'id':1,'name':'dzf'},
    {'id':1,'name':'dzf'},
    ]
"""

cursor.close()
conn.close()
# ====================================
# 插入数据还是用cursor.execute(),注意提交后conn.commit()
# 若多语句,可能错误,conn.rollback()
# sql2 = "insert into student (name,password) values(%s, %s)"
# ret = cursor.execute(sql2,['123','123'])
# conn.commit()
# 或insert into student (name,password) values(%(name)s, %(pwd)s)
# 下边传入字典excute(sql,{"name":xxx..})
# ====================================
# 批量执行
data = (['12','12'],['23','32'],['32','23']) # 格式必须固定
cursor.executemany(sql,data)  # 内部的for循环
# try 防止异常,要回滚, 会取消以前正确的插入语句
# =================================
# 删除,同理,也要提交
# ================================
# 修改 记得提交
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-10-12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 多进程
  • 2 信号量_事件
  • 3 进程通信_队列管道
  • 3_进程通信__数据共享
  • 4_进程池
  • 4_线程
    • linux 中的nptl 1对1 线程
    • 5_线程_信号量_事件_条件_定时器_列队_线程池
    • 6_协程
    • 7_io模型
      • 阻塞模型
        • 非阻塞模型
          • io多路复用
          • pymysql
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档