原文地址: https://blog.csdn.net/fgf00/article/details/52773459 编辑:智能算法,欢迎关注! 上期我们一起学习了python中的类的相关知识
今天我们继续深入,一起学习python中的线程相关知识。
目录
1 paramiko模块
2 多线程、多进程
3 使用多线程 threading
paramiko:基于ssh用于连接远程服务器并执行相关操作,批量管理
1yum -y install python-pip # linux环境,win安装完python自带pip
2pip install paramiko
1import paramiko
2# 创建ssh对象
3ssh = paramiko.SSHClient()
4# 允许连接不在know_hosts文件中的主机,可以注释掉看下效果
5ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
6# 连接服务器
7ssh.connect(hostname='192.168.8.140',port=22,username='root',password='123456')
8# 执行命令
9stdin, stdout, stderr = ssh.exec_command("top") # 标准输入、标准输出、标准错误
10# 获取命令结果
11stdout_result = stdout.read()
12stderr_result = stderr.read()
13result = stdout_result if stdout_result else stderr_result
14print(result.decode())
15# 关闭连接
16ssh.close()
1# shell scp 传输文件
2scp -rp -P22 fgf.txt root@192.168.8.140:/tmp/
3
4import paramiko
5
6transport = paramiko.Transport(('192.168.8.140',22))
7transport.connect(username='root', password='123456')
8sftp = paramiko.SFTPClient.from_transport(transport)
9# 将文件上传到服务器
10sftp.put('ssh.py', '/tmp/ssh.py')
11# 讲文件下载到本地
12sftp.get('/tmp/ssh.py', 'ssh1.py')
13
14transport.close()
1RSA : 非对称密钥验证
2公钥 : public key
3私钥 : private key
4把自己的公钥放在自己要连的机器上
1# 生成: 权限必须600
2ssh-keygen # 公钥放到对方要登录用户的.ssh下 authorized_keys (一行)
3# 命令拷贝 (以下命令用一个就可以,或者自己拷贝)
4ssh-copy-id "-p22 root@10.0.0.31 "
5ssh-copy-id -i ~/.ssh/id_rsa.pub 192.168.198.132
1import paramiko
2private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
3
4# 创建ssh对象
5ssh = paramiko.SSHClient()
6# 允许连接不在know_hosts文件中的主机
7ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
8# 连接服务器
9ssh.connect(hostname='192.168.8.140',port=22,username='root',pkey=private_key)
10# 执行命令
11stdin, stdout, stderr = ssh.exec_command("ifconfig")
12# 获取命令结果
13stdout_result = stdout.read()
14stderr_result = stderr.read()
15result = stdout_result if stdout_result else stderr_result
16print(result.decode())
17# 关闭连接
18ssh.close()
1import paramiko
2
3private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
4
5transport = paramiko.Transport(('hostname', 22))
6transport.connect(username='fgf', pkey=private_key )
7
8sftp = paramiko.SFTPClient.from_transport(transport)
9# 将location.py 上传至服务器 /tmp/test.py
10sftp.put('/tmp/location.py', '/tmp/test.py')
11# 将remove_path 下载到本地 local_path
12sftp.get('remove_path', 'local_path')
13
14transport.close()
1线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。
2一个线程指的是进程中一个单一顺序的控制流,
3一个进程中可以并发多个线程,每条线程并行执行不同的任务
4简单理解:线程 <—> 一堆指令
1每一个程序的内存是独立的,例如:word不能访问QQ
2进程:qq 要以一个整体的形式暴露给操作系统管理,里面包含各种资源的调用(内存管理、网络接口调用等)
1进程不能单独执行,只是资源的集合
2进程要操作CPU,必须要先创建一个线程。
3所有在同一个进程里的线程,是同享同一块内存空间的
1进程中第一个线程是主线程,主线程创建其他线程,其他线程也可以创建线程,线程之间是平等的
2进程有父进程、子进程,独立的内存空间,唯一的进程标识符、pid
1启动线程比启动进程快。运行进程和运行线程速度上是一样的,没有可比性。
2线程共享内存空间,进程的内存是独立的
1父进程生成子进程,相当于克隆一份内存空间。进程直接不能直接访问
2创建新线程很简单,创建新进程需要对其父进程进行一次克隆
3一个线程可以控制和操作同一线程里的其他线程,但是进程只能操作子进程
1同一个进程之间的线程之间可以直接交流
2两个进程想通信必须通过一个中间代理来实现。
1import threading
2import time
3
4def run(n):
5 print("task",n)
6 time.sleep(2)
7
8t1 = threading.Thread(target=run, args=('t1',)) # 一个参数也得加逗号
9t2 = threading.Thread(target=run, args=('t2',))
10t1.start()
11t2.start()
1import threading
2
3class MyThread(threading.Thread):
4 def __init__(self,n):
5 super(MyThread,self).__init__()
6 self.n = n
7 def run(self):
8 print("这种方式函数名必须是run,写死的",self.n)
9
10t1 = MyThread('t1')
11t2 = MyThread('t2')
12
13t1.start()
14t2.start()
上面是一行一行输的启动线程,那么如果一下启动很多线程呢?
1import threading, time
2
3def run(n):
4 print("task",n)
5 time.sleep(2)
6
7start_time = time.time()
8for i in range(16):
9 t = threading.Thread(target=run, args=('t-%s'%i,))
10 t.start()
11print("cost:", time.time()-start_time)
上面的代码打印执行时间才零点几秒,不是应该2秒么?
1注意:
2主线程启动子线程只有继续往下走,不会等待子线程执行结束。
3因为:多线程、是并行执行的
那么,我如果想等所有子线程执行结果怎么办?
1import threading
2import time
3
4def run(n):
5 print("task",n, threading.current_thread()) # 线程号
6 time.sleep(2)
7
8start_time = time.time()
9t_objs = [] # 存线程实例
10for i in range(16):
11 t = threading.Thread(target=run, args=('t-%s'%i,))
12 t.start() # 在这里直接加join()会阻塞,变成串行效果
13 t_objs.append(t) # 为了不阻塞后面线程的启动,不在这里join,先放到列表里
14
15print("当前活动线程:",threading.active_count())
16for t in t_objs:
17 t.join() # t.wait() 等待所有线程执行结束
18
19print("all threads has finished...",threading.current_thread())
20print("cost:", time.time()-start_time)
1import threading
2import time
3
4def run(n):
5 print("task",n)
6 time.sleep(2)
7
8start_time = time.time()
9t_objs = []
10for i in range(16):
11 t = threading.Thread(target=run, args=('t-%s'%i,))
12 t.setDaemon(True) # 设置线程为守护状态,非守护状态线程都退出程序就退出,不等待守护状态线程
13 t.start() # setDaemon 必须在 start 前面
14 t_objs.append(t)
15
16print(threading.active_count())
17print(t_objs)
18print("all threads has finished...",threading.current_thread())
19print("cost:", time.time()-start_time)
1import threading
2
3# 假定这是你的银行存款:
4balance = 0
5
6def change_it(n):
7 # 先存后取,结果应该为0:
8 global balance
9 balance = balance + n
10 balance = balance - n
11
12def run_thread(n):
13 for i in range(10000):
14 change_it(n)
15
16t1 = threading.Thread(target=run_thread, args=(5,))
17t2 = threading.Thread(target=run_thread, args=(8,))
18t1.start()
19t2.start()
20t1.join()
21t2.join()
22print(balance)
注意:这里这个锁和全局解释器锁没有关系,这是用户程序自己的锁,用户态的锁。 通过上图可以看出,全局解释器锁:是出口加锁给OS,线程锁是返回数据修改原数据加锁。
1import threading,time
2
3def run1():
4 print("grab the first part data")
5 lock.acquire()
6 global num
7 num +=1
8 lock.release()
9 return num
10def run2():
11 print("grab the second part data")
12 lock.acquire()
13 global num2
14 num2+=1
15 lock.release()
16 return num2
17def run3():
18 lock.acquire()
19 res = run1()
20 print('--------between run1 and run2-----')
21 res2 = run2()
22 lock.release()
23 print(res,res2)
24
25if __name__ == '__main__':
26
27 num,num2 = 0,0
28 lock = threading.RLock() # 声明递归锁
29 # lock = threading.Lock() # 用互斥锁,会锁死了,弄混锁情况,可以试一下
30 for i in range(10):
31 t = threading.Thread(target=run3)
32 t.start()
33
34 while threading.active_count() != 1:
35 print(threading.active_count())
36 else:
37 print('----all threads done---')
38 print(num,num2)
1import threading
2import time
3
4def run(n):
5 semaphore.acquire()
6 time.sleep(1)
7 print("run the thread: %s\n" %n)
8 semaphore.release()
9
10if __name__ == '__main__':
11 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行
12 for i in range(22):
13 t = threading.Thread(target=run, args=(i,))
14 t.start()
15
16 while threading.active_count() != 1:
17 pass
18 else:
19 print('---all threads done---')
1import threading
2import time
3
4event = threading.Event() # 声明事件
5
6def ligter():
7 count = 0
8 event.set() # 设置标志位,先设置绿灯
9 while True:
10 if count in list(range(6,10)): # 改红灯
11 event.clear() # 清空标志位
12 print("\033[41;1mred light is on...\033[0m")
13 elif count > 10: # 改绿灯
14 event.set()
15 count =0
16 else:
17 print("\033[42;1mgreen light is on...\033[0m")
18 time.sleep(1)
19 count +=1
20
21def car(name):
22 while True:
23 if event.is_set(): # 判断是不是设定(代表路灯)
24 print("[%s] running..." %name)
25 time.sleep(1)
26 else:
27 print("[%s] sees red light , waiting..." %name)
28 event.wait() # 标志位没设,就卡住,设置了就执行(等待标志位被设定)
29 print("\033[34;1m[%s] green light is on ,start going...\033[0m" %name)
30
31light = threading.Thread(target=ligter,)
32light.start()
33car1 = threading.Thread(target=car,args=("Tesla",))
34car1.start()
队列的几个方法
1class queue.Queue(maxsize=0) # 先入先出 first in first out
2class queue.LifoQueue(maxsize=0) # 后进先出 last in fisrt out
3class queue.PriorityQueue(maxsize=0) # 存储数据时可设置优先级的队列,VIP
1Queue.put(item, block=True, timeout=None) # put 放数据
2Queue.get(block=True, timeout=None) # get 取数据
3# block:是否阻塞 timeout:超时时间
4Queue.qsize() # qsize 队列大小
5……
使用
1>>> import queue
2>>> q = queue.Queue()
3>>> q.put("disk1")
4>>> q.put("disk2")
5>>> q.qsize()
62
7>>> q.get()
8'disk1'
9>>> q.get()
10'disk2'
11>>> q.get()
12# 使用get,如果没数据,就等待状态
13>>> q.get_nowait() # 没有数据报queue.Empty异常
14# 可以通过判断qsize,为零就不取了。
15
16q = queue.Queue(maxsize=3) # 设置最大为3个数据
设置优先级
1import queue
2
3q = queue.PriorityQueue()
4
5q.put((5,"fgf"))
6q.put((7, "ze"))
7q.put((2, "ping")) # 数字越小的,优先级越高
8print(q.get()) # (2, 'ping')
9print(q.get()) # (5, 'fgf')
10print(q.get()) # (7, 'ze')
下面来学习一个最基本的生产者消费者模型的例子:
1import threading,time
2import queue
3
4q = queue.Queue(maxsize=10)
5
6def Producer(name):
7 count = 1
8 while True:
9 q.put("骨头%s" % count)
10 print("生产了骨头",count)
11 count +=1
12 time.sleep(0.1)
13
14def Consumer(name):
15 #while q.qsize()>0:
16 while True:
17 print("[%s] 取到[%s] 并且吃了它..." %(name, q.get()))
18 time.sleep(1)
19
20p = threading.Thread(target=Producer,args=("fgf",))
21c = threading.Thread(target=Consumer,args=("dog A",))
22c1 = threading.Thread(target=Consumer,args=("dog B",))
23
24p.start()
25c.start()
26c1.start()
声明:本文系网络转载,版权归原作者所有。如涉及版权,请联系删除!