1、什么是装饰器
2、装饰器的作用
3、使用高阶函数模仿装饰器功能
1.定义:把一个函数名当做实参传给另一个函数 2.返回值中包含函数名 3.下面使用高阶函数虽然可以实现装饰器的一些功能,但是违反了装饰器不能改变调用方式的原则, 以前使用bar()现在将调用方式改编成了test1(bar)就是将bar的函数名当做变量传给了test1()
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import time
def timer(func):
start_time = time.time()
func()
print '函数执行时间为', time.time() - start_time
def test():
print '开始执行test'
time.sleep(3)
print 'test执行结束'
timer(test)
'''
开始执行test
test执行结束
函数执行时间为 3.00332999229
'''
4.高阶函数——不修改高阶函数的调用方式增加新的功能(但是无法传参数) 注:bar = test2(bar) 等价于:@timer重新将函数名bar赋值,将原函数bar的内存地址当做实参传递该函数test2(),再将test2()赋值给bar
import time
def bar():
time.sleep(3)
print("in the bar")
def test2(func):
print(func)
return func
bar = test2(bar)
bar()
5.嵌套函数 嵌套函数:在一个函数中嵌套另一个函数,并在函数内部调用
def foo():
print("in the foo")
def bar():
print("in the bar")
bar()
foo()
4、能够适应90%的业务需求
import time
def timer(func): #timer(test1) func=test1
def deco(*args,**kwargs):
start_time = time.time()
func(*args,**kwargs) #run test1
stop_time = time.time()
print("running time is %s"%(stop_time-start_time))
return deco
@timer # test1=timer(test1)
def test1():
time.sleep(3)
print("in the test1")
@timer
def test2(name):
print("in the test2",name)
test1()
test2("tom")
5、对特定网页进行身份验证
import time
user,passwd = 'aaa','123'
def auth(func):
def wrapper(*args,**kwargs):
username = input("Username:").strip()
password = input("Password:").strip()
if user == username and password == passwd:
print("User has passed authentication")
res = func(*args,**kwargs) #这里执行func()相当于执行调用的函数如home()
return res #为了获得home()函数返回值,可以将执行结果赋值给res然后返回print(home())结果是"from home"而不是"None"了
else:
exit("Invalid username or password")
return wrapper
def index():
print("welcome to index page")
@auth
def home():
print("welcome to home page")
return "from home"
@auth
def bbs():
print("welcome to bbs page")
index()
print(home()) #在这里调用home()相当于调用wrapper()
bbs()
6、实现对不同网页不同方式的身份认证
import time
user,passwd = 'aaa','123'
def auth(auth_type):
print("auth func:",auth_type)
def outer_wrapper(func):
def wrapper(*args, **kwargs):
print("wrapper func args:", *args, **kwargs)
if auth_type == "local":
username = input("Username:").strip()
password = input("Password:").strip()
if user == username and passwd == password:
print("\033[32;1mUser has passed authentication\033[0m")
res = func(*args, **kwargs) # from home
print("---after authenticaion ")
return res
else:
exit("\033[31;1mInvalid username or password\033[0m")
elif auth_type == "ldap":
print("搞毛线ldap,不会。。。。")
return wrapper
return outer_wrapper
def index():
print("welcome to index page")
@auth(auth_type="local") # home = wrapper()
def home():
print("welcome to home page")
return "from home"
@auth(auth_type="ldap")
def bbs():
print("welcome to bbs page")
index()
print(home()) #wrapper()
bbs()
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import time
def auth(auth_type):
print("auth func:",auth_type)
def outer_wrapper(func):
def wrapper(*args, **kwargs):
print("wrapper func args:", *args, **kwargs)
print('运行前')
func(*args, **kwargs)
print('运行后')
return wrapper
return outer_wrapper
@auth(auth_type="local") # home = wrapper()
def home():
print("welcome to home page")
return "from home"
home()
7、使用闭包实现装饰器功能
闭包概念:
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import time
def timer(func): #timer(test1) func=test1
def deco(*args,**kwargs): # # 函数嵌套
start_time = time.time()
func(*args,**kwargs) # 跨域访问,引用了外部变量func (func实质是函数内存地址)
stop_time = time.time()
print "running time is %s"%(stop_time-start_time)
return deco # 内层函数作为外层函数返回值
def test(name):
print "in the test2",name
time.sleep(2)
test = timer(test) # 等价于 ==》 @timer语法糖
test("tom")
'''
运行结果:
in the test2 tom
running time is 2.00302696228
'''
1、什么是生成器
2、定义
3、生成器哪些场景应用
import time
t1 = time.time()
g = (i for i in range(100000000))
t2 = time.time()
lst = [i for i in range(100000000)]
t3 = time.time()
print('生成器时间:',t2 - t1) # 生成器时间: 0.0
print('列表时间:',t3 - t2) # 列表时间: 5.821957349777222
4、生成器的作用
print( [i*2 for i in range(10)] ) #列表生成式: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
print( (i*2 for i in range(10)) ) #生 成 器: <generator object <genexpr> at 0x005A3690>
g = (i*2 for i in range(10))
print( g.__next__() ) # 0
print( g.__next__() ) # 2
5、生成器工作原理
6、yield生成器运行机制
def fib(max_num):
a,b = 1,1
while a < max_num:
yield b
a,b=b,a+b
g = fib(10) #生成一个生成器:[1,2, 3, 5, 8, 13]
print(g.__next__()) #第一次调用返回:1
print(list(g)) #把剩下元素变成列表:[2, 3, 5, 8, 13]
7、yield实现单线程下的并发效果
def consumer(name):
print("%s 准备吃包子啦!" %name)
while True:
baozi = yield
print("包子[%s]来了,被[%s]吃了!" %(baozi,name))
c = consumer("Tom")
c.__next__()
b1 = "韭菜馅包子"
c.send(b1)
# c.send(b1)作用:
# c.send()的作用是给yied的传递一个值,并且每次调用c.send()的同时自动调用一次__next__
'''运行结果:
Tom 准备吃包子啦!
包子[韭菜馅包子]来了,被[Tom]吃了!
'''
import time
def consumer(name):
print("%s 准备吃包子啦!" %name)
while True:
baozi = yield
print("包子[%s]来了,被[%s]吃了!" %(baozi,name))
def producer(name):
c = consumer('A')
c2 = consumer('B')
c.__next__()
c2.__next__()
print("老子开始准备做包子啦!")
for i in range(10):
time.sleep(1)
print("做了2个包子!")
c.send(i)
c2.send(i)
producer("alex")
'''运行结果:
A 准备吃包子啦!
B 准备吃包子啦!
老子开始准备做包子啦!
做了2个包子!
包子[0]来了,被[A]吃了!
包子[0]来了,被[B]吃了!
做了2个包子!
包子[1]来了,被[A]吃了!
包子[1]来了,被[B]吃了!
做了2个包子!
包子[2]来了,被[A]吃了!
包子[2]来了,被[B]吃了!
做了2个包子!
包子[3]来了,被[A]吃了!
包子[3]来了,被[B]吃了!
做了2个包子!
包子[4]来了,被[A]吃了!
包子[4]来了,被[B]吃了!
做了2个包子!
包子[5]来了,被[A]吃了!
包子[5]来了,被[B]吃了!
'''
1、什么是迭代器
2、定义:
3、迭代器和可迭代对象
for
循环的对象都是可迭代的(Iterable)类型;next()
函数的对象都是迭代器(Iterator)
类型,它们表示一个惰性计算的序列;list
、dict
、str
等是可迭代的但不是迭代器,不过可以通过iter()
函数获得一个Iterator
对象。for
循环本质上就是通过不断调用next()
函数实现的4、迭代器的两个方法
a = iter([1,2,]) #生成一个迭代器
print(a.__next__())
print(a.__next__())
print(a.__next__()) #在这一步会引发 “StopIteration” 的异常
5、判断是迭代器和可迭代对象
注:列表,元组,字典是可迭代的但不是迭代器
from collections import Iterable
print(isinstance([],Iterable)) #True
print(isinstance({},Iterable)) #True
print(isinstance((),Iterable)) #True
print(isinstance("aaa",Iterable)) #True
print(isinstance((x for x in range(10)),Iterable)) #True
6、列表不是迭代器,只有生成器是迭代器
from collections import Iterator
t = [1,2,3,4]
print(isinstance(t,Iterator)) #False
t1 = iter(t)
print(isinstance(t1,Iterator)) #True
7、自定义迭代器
#! /usr/bin/env python
# -*- coding: utf-8 -*-
class MyRange(object):
def __init__(self, n):
self.idx = 0
self.n = n
def __iter__(self):
return self
def next(self):
if self.idx < self.n:
val = self.idx
self.idx += 1
return self.n[val]
else:
raise StopIteration()
l = [4,5,6,7,8]
obj = MyRange(l)
print obj.next() # 4
print obj.next() # 5
print obj.next() # 6
8、迭代器与生成器
#! /usr/bin/env python
# -*- coding: utf-8 -*
l = [1,2,3,4,5] # 列表是一个可迭代对象,不是一个迭代器
print dir(l) # 所以 l 中有 __iter__() 方法,没有 __next__()方法
iter_obj = l.__iter__() # __iter__()方法返回迭代器对象本身(这个迭代器对象就会有 next 方法了)
print '###################################\n'
print iter_obj.next() # 1
print iter_obj.next() # 2
print iter_obj.next() # 3
1、什么是进程(process)?(进程是资源集合)
2、进程是资源分配的最小单位( 内存、cpu、网络、io)
3、一个运行起来的程序就是一个进程
CPU分时
进程如何通信
为什么需要进程池
2、定义:进程是资源分配最小单位
3、进程并发性:
4、线程并发性:
5、有了进程为什么还要线程?
1.进程优点:
2. 进程的两个重要缺点
6、什么是线程(thread)(线程是操作系统最小的调度单位)
7、进程和线程的区别
8、进程和程序的区别
Python多线程编程中常用方法:
GIL全局解释器锁:
线程锁(互斥锁):
1、线程2种调用方式:直接调用, 继承式调用
import threading
import time
def sayhi(num): # 定义每个线程要运行的函数
print("running on number:%s" % num)
time.sleep(3)
#1、target=sayhi :sayhi是定义的一个函数的名字
#2、args=(1,) : 括号内写的是函数的参数
t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一个线程实例
t2 = threading.Thread(target=sayhi, args=(2,)) # 生成另一个线程实例
t1.start() # 启动线程
t2.start() # 启动另一个线程
print(t1.getName()) # 获取线程名
print(t2.getName())
import threading
import time
class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num = num
def run(self):#定义每个线程要运行的函数
print("running on number:%s" %self.num)
time.sleep(3)
if __name__ == '__main__':
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
2、for循环同时启动多个线程
import threading
import time
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3)
for i in range(50):
t = threading.Thread(target=sayhi,args=('t-%s'%i,))
t.start()
3、t.join(): 实现所有线程都执行结束后再执行主线程
import threading
import time
start_time = time.time()
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3)
t_objs = [] #将进程实例对象存储在这个列表中
for i in range(50):
t = threading.Thread(target=sayhi,args=('t-%s'%i,))
t.start() #启动一个线程,程序不会阻塞
t_objs.append(t)
print(threading.active_count()) #打印当前活跃进程数量
for t in t_objs: #利用for循环等待上面50个进程全部结束
t.join() #阻塞某个程序
print(threading.current_thread()) #打印执行这个命令进程
print("----------------all threads has finished.....")
print(threading.active_count())
print('cost time:',time.time() - start_time)
4、setDaemon(): 守护线程,主线程退出时,需要子线程随主线程退出
import threading
import time
start_time = time.time()
def sayhi(num): #定义每个线程要运行的函数
print("running on number:%s" %num)
time.sleep(3)
for i in range(50):
t = threading.Thread(target=sayhi,args=('t-%s'%i,))
t.setDaemon(True) #把当前线程变成守护线程,必须在t.start()前设置
t.start() #启动一个线程,程序不会阻塞
print('cost time:',time.time() - start_time)
5、GIL锁和用户锁(Global Interpreter Lock 全局解释器锁)
import time
import threading
lock = threading.Lock() #1 生成全局锁
def addNum():
global num #2 在每个线程中都获取这个全局变量
print('--get num:',num )
time.sleep(1)
lock.acquire() #3 修改数据前加锁
num -= 1 #4 对此公共变量进行-1操作
lock.release() #5 修改后释放
在有GIL的情况下执行 count = count + 1 会出错原因解析,用线程锁解决方法
# 1)第一步:count = 0 count初始值为0
# 2)第二步:线程1要执行对count加1的操作首先申请GIL全局解释器锁
# 3)第三步:调用操作系统原生线程在操作系统中执行
# 4)第四步:count加1还未执行完毕,时间到了被要求释放GIL
# 5)第五步:线程1释放了GIL后线程2此时也要对count进行操作,此时线程1还未执行完,所以count还是0
# 6)第六步:线程2此时拿到count = 0后也要对count进行加1操作,假如线程2执行很快,一次就完成了
# count加1的操作,那么count此时就从0变成了1
# 7)第七步:线程2执行完加1后就赋值count=1并释放GIL
# 8)第八步:线程2执行完后cpu又交给了线程1,线程1根据上下文继续执行count加1操作,先拿到GIL
# 锁,完成加1操作,由于线程1先拿到的数据count=0,执行完加1后结果还是1
# 9)第九步:线程1将count=1在次赋值给count并释放GIL锁,此时连个线程都对数据加1,但是值最终是1
1 >> lock = threading.Lock() #定义一把锁
2 >> lock.acquire() #对数据操作前加锁防止数据被另一线程操作
3 >> lock.release() #对数据操作完成后释放锁
6、死锁
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('\033[41m%s 拿到A锁\033[0m' %self.name)
mutexB.acquire()
print('\033[42m%s 拿到B锁\033[0m' %self.name)
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print('\033[43m%s 拿到B锁\033[0m' %self.name)
time.sleep(2)
mutexA.acquire()
print('\033[44m%s 拿到A锁\033[0m' %self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(2):
t=MyThread()
t.start()
# 运行结果:输出下面结果后程序卡死,不再向下进行了
# Thread-1 拿到A锁
# Thread-1 拿到B锁
# Thread-1 拿到B锁
# Thread-2 拿到A锁
7、递归锁:lock = threading.RLock() 解决死锁问题
from threading import Thread,Lock,RLock
import time
mutexA=mutexB=RLock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print('%s 拿到A锁' %self.name)
mutexB.acquire()
print('%s 拿到B锁' %self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print('%s 拿到B锁' % self.name)
time.sleep(0.1)
mutexA.acquire()
print('%s 拿到A锁' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(5):
t=MyThread()
t.start()
# 下面是运行结果:不会产生死锁
# Thread-1 拿到A锁
# Thread-1 拿到B锁
# Thread-1 拿到B锁
# Thread-1 拿到A锁
# Thread-2 拿到A锁
# Thread-2 拿到B锁
# Thread-2 拿到B锁
# Thread-2 拿到A锁
# Thread-4 拿到A锁
# Thread-4 拿到B锁
# Thread-4 拿到B锁
# Thread-4 拿到A锁
# Thread-3 拿到A锁
# Thread-3 拿到B锁
# Thread-3 拿到B锁
# Thread-3 拿到A锁
# Thread-5 拿到A锁
# Thread-5 拿到B锁
# Thread-5 拿到B锁
# Thread-5 拿到A锁
8、Semaphore(信号量)
# import threading,time
# def run(n):
# semaphore.acquire()
# time.sleep(1)
# print("run the thread: %s\n" %n)
# semaphore.release()
#
# if __name__ == '__main__':
# semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
# for i in range(22):
# t = threading.Thread(target=run,args=(i,))
# t.start()
#
# while threading.active_count() != 1:
# pass #print threading.active_count()
# else:
# print('----all threads done---')
# 代码结果说明:这里可以清晰看到运行时0-4是同时运行的没有顺序,而且是前五个,
# 表示再semaphore这个信号量的定义下程序同时仅能执行5个线程
9、events总共就只有四个方法
1. event.set() : # 设置标志位
2. event.clear() : # 清除标志位
3. event.wait() : # 等待标志被设定
4. event.is_set() : # 判断标志位是否被设定
import time,threading
event = threading.Event()
#第一:写一个红绿灯的死循环
def lighter():
count = 0
event.set() #1先设置为绿灯
while True:
if count > 5 and count <10: #2改成红灯
event.clear() #3把标志位清了
print("red light is on.....")
elif count > 10:
event.set() #4再设置标志位,变绿灯
count = 0
else:
print("green light is on.....")
time.sleep(1)
count += 1
#第二:写一个车的死循环
def car(name):
while True:
if event.is_set(): #设置了标志位代表绿灯
print("[%s] is running"%name)
time.sleep(1)
else:
print('[%s] sees red light, waiting......'%name)
event.wait()
print('[%s] green light is on,start going.....'%name)
light = threading.Thread(target=lighter,)
light.start()
car1 = threading.Thread(target=car,args=("Tesla",))
car1.start()
import multiprocessing,time,threading
#3 被多线程调用的函数
def thread_run():
print(threading.get_ident()) #打印线程id号
time.sleep(2)
#2 被多进程调用的函数,以及在这个函数中起一个进程
def run(name):
time.sleep(2)
print("hello",name)
t = threading.Thread(target=thread_run,) #在进程调用的函数中启用一个线程
t.start()
#1 一次性启动多个进程
if __name__ == '__main__':
for i in range(10):
p = multiprocessing.Process(target=run,args=('bob %s'%i,)) #启用一个多线程
p.start()
from multiprocessing import Process
import queue
import threading
def f():
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = queue.Queue() #1 在父进程中定义一个队列实例q
# p = threading.Thread(target=f,) #在线程程中就可以相互访问,线程中内存共享
p = Process(target=f,) #2 在父进程中起一个子进程 p,在子进程中使用父进程的q会报错
p.start()
print(q.get())
p.join()
from multiprocessing import Process, Queue
def f(qq): # 将符进程中的q传递过来叫qq
qq.put([42, None, 'hello']) # 此时子进程就可以使用符进程中的q
if __name__ == '__main__':
q = Queue() # 使用Queue()在父进程中定义一个队列实例q
p = Process(target=f, args=(q,)) # 在父进程中起一个子进程 p,将父进程刚定义的q传递给子进程p
p.start()
print(q.get())
p.join()
# 运行结果: [42, None, 'hello']
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello']) # 3 子进程发送数据,就像socket一样
print("son process recv:", conn.recv())
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
# 1 生成一个管道实例,实例一生成就会生成两个返回对象,一个是管道这头,一个是管道那头
p = Process(target=f, args=(child_conn,)) # 2 启动一个子进程将管道其中一头传递给子进程
p.start()
print(parent_conn.recv()) # 4 父进程收消息 # prints "[42, None, 'hello']"
parent_conn.send('i am parent process')
p.join()
# 运行结果:
# [42, None, 'hello']
# son process recv: i am parent process
from multiprocessing import Process, Manager
import os
def f(d, l):
d[1] = '1' # 是个进程对字典放入的是同一个值,所以看上去效果不明显
l.append(os.getpid()) # 将这是个进程的进程id放入列表中
if __name__ == '__main__':
with Manager() as manager: # 1 将Manager()赋值给manager
d = manager.dict() # 2 定义一个可以在多个进程间可以共享的字典
l = manager.list(range(5)) # 3 定义一个可以在多个进程间可以共享的列表,默认写五个数据
p_list = []
for i in range(10): # 生成是个进程
p = Process(target=f, args=(d, l)) # 将刚刚生成的可共享字典和列表传递给子进程
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
from multiprocessing import Process, Lock
def f(l, i):
l.acquire() #一个进程要打印数据时先锁定
print('hello world', i)
l.release() #打印完毕后就释放这把锁
if __name__ == '__main__':
lock = Lock() #先生成一把锁
for num in range(5):
Process(target=f, args=(lock, num)).start()
# 运行结果:
# hello world 4
# hello world 0
# hello world 2
# hello world 3
# hello world 1
from multiprocessing import Process,Pool
import time,os
def foo(i):
time.sleep(2)
print("in the process",os.getpid()) #打印子进程的pid
return i+100
def call(arg):
print('-->exec done:',arg,os.getpid())
if __name__ == '__main__':
pool = Pool(3) #进程池最多允许5个进程放入进程池
print("主进程pid:",os.getpid()) #打印父进程的pid
for i in range(10):
#用法1 callback作用是指定只有当Foo运行结束后就执行callback调用的函数,父进程调用的callback函数
pool.apply_async(func=foo, args=(i,),callback=call)
#用法2 串行 启动进程不在用Process而是直接用pool.apply()
# pool.apply(func=foo, args=(i,))
print('end')
pool.close() #关闭pool
pool.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
僵尸进程
#!/usr/bin/env python
#coding=utf8
import os, sys, time
#产生子进程
pid = os.fork()
if pid == 0:
#子进程退出
sys.exit(0)
#父进程休息30秒
time.sleep(30)
# 先产生一个子进程,子进程退出,父进程休息30秒,那就会产生一个僵尸进程
[root@linux-node4 ~]# ps -ef| grep defunct
root 110401 96083 0 19:11 pts/2 00:00:00 python defunct.py
root 110402 110401 0 19:11 pts/2 00:00:00 [python] <defunct>
root 110406 96105 0 19:11 pts/3 00:00:00 grep --color=auto defunct
1、什么是协程(进入上一次调用的状态)
2、协程的好处
3、协程缺点
4、使用yield实现协程相同效果
import time
import queue
def consumer(name):
print("--->starting eating baozi...")
while True:
new_baozi = yield # 只要遇到yield程序就返回,yield还可以接收数据
print("[%s] is eating baozi %s" % (name, new_baozi))
time.sleep(1)
def producer():
r = con.__next__() # 直接调用消费者的__next__方法
r = con2.__next__() # 函数里面有yield第一次加括号调用会变成一个生成器函数不执行,运行next才执行
n = 0
while n < 5:
n += 1
con.send(n) # send恢复生成器同时并传递一个值给yield
con2.send(n)
print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
if __name__ == '__main__':
con = consumer("c1")
con2 = consumer("c2")
p = producer()
5、协程为何能处理大并发1:Greenlet遇到I/O手动切换
from greenlet import greenlet
def test1():
print(12) #4 gr1会调用test1()先打印12
gr2.switch() #5 然后gr2.switch()就会切换到gr2这个协程
print(34) #8 由于在test2()切换到了gr1,所以gr1又从上次停止的位置开始执行
gr2.switch() #9 在这里又切换到gr2,会再次切换到test2()中执行
def test2():
print(56) #6 启动gr2后会调用test2()打印56
gr1.switch() #7 然后又切换到gr1
print(78) #10 切换到gr2后会接着上次执行,打印78
gr1 = greenlet(test1) #1 启动一个协程gr1
gr2 = greenlet(test2) #2 启动第二个协程gr2
gr1.switch() #3 首先gr1.switch() 就会去执行gr1这个协程
6、协程为何能处理大并发2:Gevent遇到I/O自动切换
7、Gevent实现简单的自动切换小例子
import gevent
def func1():
print('\033[31;1m第一次打印\033[0m')
gevent.sleep(2) # 为什么用gevent.sleep()而不是time.sleep()因为是为了模仿I/O
print('\033[31;1m第六次打印\033[0m')
def func2():
print('\033[32;1m第二次打印\033[0m')
gevent.sleep(1)
print('\033[32;1m第四次打印\033[0m')
def func3():
print('\033[32;1m第三次打印\033[0m')
gevent.sleep(1)
print('\033[32;1m第五次打印\033[0m')
gevent.joinall([ # 将要启动的多个协程放到event.joinall的列表中,即可实现自动切换
gevent.spawn(func1), # gevent.spawn(func1)启动这个协程
gevent.spawn(func2),
gevent.spawn(func3),
])
# 运行结果:
# 第一次打印
# 第二次打印
# 第三次打印
# 第四次打印
# 第五次打印
# 第六次打印
8、使用Gevent实现并发下载网页与串行下载网页时间比较
from urllib import request
import gevent,time
from gevent import monkey
monkey.patch_all() #把当前程序所有的I/O操作给我单独做上标记
def f(url):
print('GET: %s' % url)
resp = request.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))
#1 并发执行部分
time_binxing = time.time()
gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])
print("并行时间:",time.time()-time_binxing)
#2 串行部分
time_chuanxing = time.time()
urls = [
'https://www.python.org/',
'https://www.yahoo.com/',
'https://github.com/',
]
for url in urls:
f(url)
print("串行时间:",time.time()-time_chuanxing)
# 注:为什么要在文件开通使用monkey.patch_all()
# 1. 因为有很多模块在使用I / O操作时Gevent是无法捕获的,所以为了使Gevent能够识别出程序中的I / O操作。
# 2. 就必须使用Gevent模块的monkey模块,把当前程序所有的I / O操作给我单独做上标记
# 3.使用monkey做标记仅用两步即可:
第一步(导入monkey模块): from gevent import monkey
第二步(声明做标记) : monkey.patch_all()
9、通过gevent自己实现单线程下的多socket并发
import gevent
from gevent import socket,monkey #下面使用的socket是Gevent的socket,实际测试monkey没用
# monkey.patch_all()
def server(port):
s = socket.socket()
s.bind(('0.0.0.0',port))
s.listen(5)
while True:
cli,addr = s.accept()
gevent.spawn(handle_request,cli)
def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print('recv:',data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as e:
print(e)
finally:
conn.close()
if __name__=='__main__':
server(8001)
import socket
HOST = 'localhost' # The remote host
PORT = 8001 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"),encoding="utf8").strip()
if len(msg) == 0:continue
s.sendall(msg)
data = s.recv(1024)
print('Received', repr(data))
s.close()
10、协程本质原理
11、epoll处理 I/O 请求原理
12、select处理协程
13、select、epool、pool
Python进程池和线程池(ThreadPoolExecutor&ProcessPoolExecutor)
1. Executor
2. Future
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池
future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task
print(future1.done()) # 判断task1是否结束
time.sleep(3)
print(future2.done()) # 判断task2是否结束
print(future1.result()) # 查看task1返回的结果
print(future2.result()) # 查看task2返回的结果
# 运行结果:
# False # 这个False与下面的True会等待3秒
# True # 后面三个输出都是一起打出来的
# hello
# world
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Start the load operations and mark each future with its URL
# future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} # 这一句相当于下面for循环获取的字典
future_to_url = {}
for url in URLS:
future_to_url[executor.submit(load_url,url,60)] = url # {'future对象':'url'} future对象作为key,url作为value
for future in concurrent.futures.as_completed(future_to_url): # as_completed返回已经有返回结果的future对象
url = future_to_url[future] # 通过future对象获取对应的url
try:
data = future.result() # 获取future对象的返回结果
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
from concurrent.futures import ThreadPoolExecutor
# 创建线程池
executor = ThreadPoolExecutor(10)
def test_function(num1,num2):
return "%s + %s = %s"%(num1,num2,num1+num2)
result_iterators = executor.map(test_function,[1,2,3],[5,6,7])
for result in result_iterators:
print(result)
# 1 + 5 = 6
# 2 + 6 = 8
# 3 + 7 = 10
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_dic = {}
for url, data in zip(URLS, executor.map(load_url, URLS)):
print('%r page is %d bytes' % (url, len(data)))
future_dic[url] = data # {'url':'执行结果'} url作为key,执行结果作为value
# 'http://httpbin.org' page is 13011 bytes
# 'http://example.com/' page is 1270 bytes
# 'https://api.github.com/' page is 2039 bytes
使用线程池、进程池、协程向多个url并发获取页面数据比较
import requests
url_list = [
'https://www.baidu.com',
'http://dig.chouti.com/',
]
for url in url_list:
result = requests.get(url)
print(result.text)
import requests
from concurrent.futures import ProcessPoolExecutor
def fetch_request(url):
result = requests.get(url)
print(result.text)
url_list = [
'https://www.baidu.com',
'https://www.google.com/', #google页面会卡住,知道页面超时后这个进程才结束
'http://dig.chouti.com/', #chouti页面内容会直接返回,不会等待Google页面的返回
]
if __name__ == '__main__':
pool = ProcessPoolExecutor(10) # 创建线程池
for url in url_list:
pool.submit(fetch_request,url) # 去线程池中获取一个进程,进程去执行fetch_request方法
pool.shutdown(False)
import requests
from concurrent.futures import ThreadPoolExecutor
def fetch_request(url):
result = requests.get(url)
print(result.text)
url_list = [
'https://www.baidu.com',
'https://www.google.com/', #google页面会卡住,知道页面超时后这个进程才结束
'http://dig.chouti.com/', #chouti页面内容会直接返回,不会等待Google页面的返回
]
pool = ThreadPoolExecutor(10) # 创建一个线程池,最多开10个线程
for url in url_list:
pool.submit(fetch_request,url) # 去线程池中获取一个线程,线程去执行fetch_request方法
pool.shutdown(True) # 主线程自己关闭,让子线程自己拿任务执行
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_async(url):
response = requests.get(url)
return response.text
def callback(future):
print(future.result())
url_list = ['http://www.github.com', 'http://www.bing.com']
pool = ThreadPoolExecutor(5)
for url in url_list:
v = pool.submit(fetch_async, url)
v.add_done_callback(callback)
pool.shutdown(wait=True)
import gevent
import requests
from gevent import monkey
monkey.patch_all()
# 这些请求谁先回来就先处理谁
def fetch_async(method, url, req_kwargs):
response = requests.request(method=method, url=url, **req_kwargs)
print(response.url, response.content)
# ##### 发送请求 #####
gevent.joinall([
gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://www.google.com/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
])