在深度学习时代(这么说也不为过)的今天,我们做各种视觉任务时候都会想到使用深度学习,但是大家也都知道深度学习的模型如果想要使用的话,设备必须得有,虽然各种各样的量化策略和剪枝策略大大加速了模型的推理能力,但是实时的话在低配电脑还是不可用! 但是实际中有些视觉任务不怎么依赖实时性,我们只需要保证1s处理一帧图片就可以了,或者几十秒处理一帧也可以。那么这种处理策略怎么处理呢?特别对于IP摄像头,它是以数据流的形式传输,因此当其帧率较高时,本地处理程序会处理不过来,导致卡帧(延时)和程序卡死!我们一起来看看吧!
threading模块(线程)
当我们使用Threading模块创建线程时或者自定义线程任务时,最好的方法就是建立一个线程类,并继承于threading.Thead,然后重写run方法,这是最推荐大家使用的方法(最优雅)。当然也可以使用其他方式创建!主要有以下几个常用函数:
代码示例
import threading
import time
class MyThread(threading.Thread):
def __init__(self, n):
super(MyThread, self).__init__() # 重构run函数必须要写
self.n = n
def run(self):
print("task", self.n)
time.sleep()
print('2s')
time.sleep()
print('1s')
time.sleep()
print('0s')
time.sleep()
if __name__ == "__main__":
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()
t1.join()
t2.join()
代码示例:
import queue
q=queue.Queue() #如果不设置长度,默认为无限长
# q = queue.LifoQueue(5) # 后进先出,类似于栈结构
print(q.maxsize) #注意没有括号
q.put()
q.put()
q.put()
q.put()
q.put()
q.put()
print(q.get())
print(q.get())
很遗憾,上面的结构不是我想要的,因为在处理视频流中,如果我们的队列满了,我们需要从队头删除旧数据,在队尾插入新数据,并且每次获取队尾的数据,那么我们就需要一个双端队列了,而上面的queue模块显然不满足!!!
笔者在collections模块中找到了一个deque的子模块,其使用很简单,读者可以自行查阅!
deque的方法列表
如果我们碰到了一个实时性要求不是那么高的,或者自己设备太差处理不过来图像时,我们可以考虑使用多线程读取摄像头画面!比如我们现在需要两个线程,一个用于实时读取视频流,另外一个每隔一秒钟处理一个最新的摄像头画面!
核心思路:我们使用双端队列来缓存数据,当缓存数据满时,我们从队头剔除数据,然后在队尾加入新数据,在获取时只读取队尾数据,这样就会一直处理当前帧!一定要注意线程退出时,需要在关闭摄像头的同时清空队列,而另一个线程进行队列是否为空的判断!
注意:由于这个队列是可读可写,当两个线程同时工作,会出现冲突,因此我们需要对公共变量加锁进行保护,使用threading.Lock()方法创建锁,加锁解锁的方法为acquire()和release()两种!
import cv2
import threading
from collections import deque
lock = threading.Lock()
class RealReadThread(threading.Thread):
def __init__(self, input):
super(RealReadThread).__init__()
self._jobq = input
# ip_camera_url = 'rtsp://admin:admin@192.168.1.64/' # rtsp数据流
# 创建一个窗口
self.cap = cv2.VideoCapture()
threading.Thread.__init__(self)
def run(self):
cv2.namedWindow('ip_camera', flags=cv2.WINDOW_NORMAL | cv2.WINDOW_FREERATIO)
if not self.cap.isOpened():
print('请检查IP地址还有端口号,或者查看IP摄像头是否开启,另外记得使用sudo权限运行脚本')
while self.cap.isOpened():
ret, frame = self.cap.read()
lock.acquire()
if len(self._jobq) == :
self._jobq.popleft()
else:
self._jobq.append(frame)
lock.release()
cv2.imshow('ip_camera', frame)
if cv2.waitKey() == ord('q'):
# 退出程序
break
print("实时读取线程退出!!!!")
cv2.destroyWindow('ip_camera')
self._jobq.clear() # 读取进程结束时清空队列
self.cap.release()
class GetThread(threading.Thread):
def __init__(self, input):
super(GetThread).__init__()
self._jobq = input
threading.Thread.__init__(self)
def run(self):
cv2.namedWindow('get', flags=cv2.WINDOW_NORMAL | cv2.WINDOW_FREERATIO)
flag = False
while(True):
if len(self._jobq) != :
lock.acquire()
im_new = self._jobq.pop()
lock.release()
cv2.imshow("get", im_new)
cv2.waitKey()
flag = True
elif flag == True and len(self._jobq) == :
break
print("间隔1s获取图像线程退出!!!!")
if __name__ == "__main__":
q = deque([], )
th1 = RealReadThread(q)
th2 = GetThread(q)
th1.start()
th2.start() # 开启两个线程
th1.join()
th2.join()
显示效果图