使用两个摄像头来计算深度图,在没有RGBD的情况下可以使用,不过为了保持效率,需要使用Jetson边缘计算设备。
先输入的是两个IMX219的视频流,捕获的视频流是1080P的,接着传为G流这个后处理器,看图是处理了一下颜色空间。
这个VPI我查一下,是英伟达家的视觉编程接口,总之就是进行了一些转换的过程。
大概是这样的
最后进行这个合成运算和对齐
capture->rectify->SGBM->depth calc.
数据流管道
处理流程
IMX219,Sony家的东西
sudo apt install libnvvpi1 python3-vpi1 vpi1-dev
需要安装的库
https://repo.download.nvidia.com/jetson/
但是这个源码我找不到了,就是找到了deb的库。
英伟达的库好全啊
也自带了一个demo,视差图
当然了,算法很全面
安装后里面有的内容
可以处理的视频流后端,这里是支持jetson的(废话)
后端使用CUDA的结果
写一个,Python是快,但是不能进行内存管理,CSDK可以。U8是八位无符号数。
#include <opencv2/core/version.hpp>
#include <opencv2/imgproc/imgproc.hpp>
#if CV_MAJOR_VERSION >= 3
#include <opencv2/imgcodecs.hpp>
#else
#include <opencv2/highgui/highgui.hpp>
#endif
#include <iostream>
#include <vpi/OpenCVInterop.hpp>
#include <vpi/Image.h>
#include <vpi/Stream.h>
#include <vpi/algo/BoxFilter.h>
#include <vpi/algo/ConvertImageFormat.h>
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Must pass an input image to be blurred" << std::endl;
return 1;
}
cv::Mat cvImage = cv::imread(argv[1]);
if (cvImage.data == NULL)
{
std::cerr << "Can't open input image" << std::endl;
return 2;
}
VPIStream stream;
vpiStreamCreate(0, &stream);
VPIImage image;
vpiImageCreateWrapperOpenCVMat(cvImage, 0, &image);
VPIImage imageGray;
vpiImageCreate(cvImage.cols, cvImage.rows, VPI_IMAGE_FORMAT_U8, 0, &imageGray);
VPIImage blurred;
vpiImageCreate(cvImage.cols, cvImage.rows, VPI_IMAGE_FORMAT_U8, 0, &blurred);
vpiSubmitConvertImageFormat(stream, VPI_BACKEND_CUDA, image, imageGray, NULL);
vpiStreamSync(stream);
VPIImageData outData;
vpiImageLockData(blurred, VPI_LOCK_READ, VPI_IMAGE_BUFFER_HOST_PITCH_LINEAR, &outData);
cv::Mat cvOut;
vpiImageDataExportOpenCVMat(outData, &cvOut);
imwrite("tutorial_blurred.png", cvOut);
vpiImageUnlock(blurred);
vpiStreamDestroy(stream);
vpiImageDestroy(image);
vpiImageDestroy(imageGray);
vpiImageDestroy(blurred);
return 0;
}
CPP的版本,还需要Cmake编译
主要是为了加速运行视觉的算法
算法:表示不可分割的计算操作。 后端:代表负责实际计算的硬件引擎。 Streams:充当提交算法的异步队列,最终在给定后端按顺序执行。流和事件是计算管道的构建块。 缓冲区:存储输入和输出数据。 事件:提供在流和/或应用程序线程上操作的同步原语。 上下文:保存 VPI 和创建对象的状态。
可以运行的硬件
补一个硬件的淘汰路线
最近一个版本是有了Python的支持
真新啊
这个项目一开始需要的是进行摄像头的标定。
导入的库
将棋盘格引入
wget https://www.dropbox.com/sh/j4w6tg9b0ov209g/AAA1HfT7Yy33tmGI1tWaqDKCa\?dl\=1 -O /tmp/calib_files.zip
unzip /tmp/calib_files.zip
需要提前执行一下
接着开始处理
ret, K_l, dist_coeff_l, rvecs, tvecs = cv2.calibrateCamera(obj_pts, img_pts, (arr.shape[1], arr.shape[0]), None,None)
得到校准的参数
参数到手
img = Image.open(FOLDER+"001.png")
arr = np.array(img)
arr_corr = cv2.undistort(arr, K_l, dist_coeff_l, None, K_l)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,10))
ax1.imshow(arr, cmap='gray')
ax2.imshow(arr_corr, cmap='gray')
plt.show()
因为是鱼眼的相机,这里使用标定的参数进行校正。
就是这样
因为是两个相机,记得标定两次,然后保存参数:
np.save("K_l.npy", K_l)
np.save("K_r.npy", K_r)
np.save("dist_coeff_l.npy", dist_coeff_l)
np.save("dist_coeff_r.npy", dist_coeff_r)
重新开个Jupyter,导入库,生成一张新图,把相机的照片准备好
建立一些列表来存放我们的数据,使用ZIP来进行遍历,处理角点
现在就只相当于左右两个棋盘图像对齐操作
将摄像头对齐
使用立体校正
rect_l, rect_r, proj_mat_l, proj_mat_r, Q, roiL, roiR = cv2.stereoRectify(K_l, dist_l, K_r, dist_r, gray_l.shape[::-1], Rot, Trns, 1 ,(0,0))
left_stereo_maps = cv2.initUndistortRectifyMap(K_l, dist_l, rect_l, proj_mat_l,
gray_l.shape[::-1], cv2.CV_16SC2)
right_stereo_maps = cv2.initUndistortRectifyMap(K_r, dist_r, rect_r, proj_mat_r,
gray_l.shape[::-1], cv2.CV_16SC2)
初始化每个校正过的视图
arr_l_rect = cv2.remap(arr_l, left_stereo_maps[0],left_stereo_maps[1], cv2.INTER_LANCZOS4, cv2.BORDER_CONSTANT, 0)
arr_r_rect = cv2.remap(arr_r, right_stereo_maps[0],right_stereo_maps[1], cv2.INTER_LANCZOS4, cv2.BORDER_CONSTANT, 0)
将函数进行进行映射
开始进行校正
看一张就行
效果不错的情况下都安排了
读取所有的图像
对单个图像校准
fig, axs = plt.subplots(2, 2, figsize=(20,10))
# before
axs[0][0].title.set_text('Original L')
axs[0][0].imshow(arr_l)
axs[0][1].title.set_text('Original R')
axs[0][1].imshow(arr_r)
axs[1][0].title.set_text('Rectified L')
axs[1][0].imshow(arr_l_rect)
axs[1][0].title.set_text('Rectified R')
axs[1][1].imshow(arr_r_rect)
结果
开始融合
可以先看下数组形状
X和Y的
组合一下
最终的结果
import time
import queue
import numpy as np
import cv2
from threading import Thread
from camera import Camera
from concurrent.futures import ThreadPoolExecutor
MAX_DISP = 128
WINDOW_SIZE = 10
class Depth(Thread):
def __init__(self):
super().__init__()
print("Reading camera calibration...")
self._map_l, self._map_r = get_calibration()
self._cam_r = CameraThread(0)
self._cam_l = CameraThread(1)
self._disp_arr = None
self._should_run = True
self._dq = queue.deque(maxlen=3)
self._executor = ThreadPoolExecutor(max_workers=4)
self.start()
# Wait for the deque to start filling up
while len(self._dq) < 1:
time.sleep(0.1)
def stop(self):
self._should_run = False
self._cam_l.stop()
self._cam_r.stop()
def disparity(self):
while len(self._dq) == 0:
time.sleep(0.01)
return self._dq.pop()
def enqueue_async(self, vpi_image):
arr = vpi_image.cpu().copy()
self._dq.append(arr)
del vpi_image
def run(self):
import vpi # Don't import vpi in main thread to avoid creating a global context
i = 0
self._warp_l = make_vpi_warpmap(self._map_l)
self._warp_r = make_vpi_warpmap(self._map_r)
ts_history = []
while self._should_run:
i += 1
ts = []
ts.append(time.perf_counter())
with vpi.Backend.CUDA:
ts.append(time.perf_counter())
# confidenceMap = vpi.Image(vpi_l.size, vpi.Format.U16)
# Read Images
arr_l = self._cam_l.image
arr_r = self._cam_r.image
ts.append(time.perf_counter())
# Convert to VPI image
vpi_l = vpi.asimage(arr_l)
vpi_r = vpi.asimage(arr_r)
ts.append(time.perf_counter())
# Rectify
vpi_l = vpi_l.remap(self._warp_l)
vpi_r = vpi_r.remap(self._warp_r)
ts.append(time.perf_counter())
# Resize
vpi_l = vpi_l.rescale(
(480, 270), interp=vpi.Interp.LINEAR, border=vpi.Border.ZERO
)
vpi_r = vpi_r.rescale(
(480, 270), interp=vpi.Interp.LINEAR, border=vpi.Border.ZERO
)
ts.append(time.perf_counter())
# Convert to 16bpp
vpi_l_16bpp = vpi_l.convert(vpi.Format.U16, scale=1)
vpi_r_16bpp = vpi_r.convert(vpi.Format.U16, scale=1)
ts.append(time.perf_counter())
# Disparity
disparity_16bpp = vpi.stereodisp(
vpi_l_16bpp,
vpi_r_16bpp,
out_confmap=None,
backend=vpi.Backend.CUDA,
window=WINDOW_SIZE,
maxdisp=MAX_DISP,
)
ts.append(time.perf_counter())
# Convert again
disparity_8bpp = disparity_16bpp.convert(
vpi.Format.U8, scale=255.0 / (32 * MAX_DISP)
)
ts.append(time.perf_counter())
# CPU mapping
# self._disp_arr = disparity_8bpp
self._executor.submit(self.enqueue_async, disparity_8bpp)
# global frames
# frames.append(self._disp_arr)
ts.append(time.perf_counter())
# Render
# cv2.imshow("Disparity", disp_arr)
# cv2.waitKey(1)
ts.append(time.perf_counter())
ts = np.array(ts)
ts_deltas = np.diff(ts)
ts_history.append(ts_deltas)
debug_str = f"Iter {i}\n"
tasks = [
"Enter context",
"Read images",
"Convert to VPI image",
"Rectify",
"Resize 1080p->270p",
"Convert to 16bpp",
"Disparity",
"Convert 16bpp->8bpp",
".cpu() mapping",
"OpenCV colormap",
"Render",
]
debug_str += f"Sum: {sum(ts_deltas):0.2f}\n\n"
task_acc_time = 1000 * np.array(ts_history).mean(axis=0)
for task, dt in zip(tasks, task_acc_time):
debug_str += f"{task.ljust(20)} {dt:0.2f}\n"
# print(debug_str)
if i % 10 == 0:
vpi.clear_cache()
def disp2depth(disp_arr):
F, B, PIXEL_SIZE = (CAM_PARAMS.F, CAM_PARAMS.B, CAM_PARAMS.PIXEL_SIZE)
disp_arr[disp_arr < 10] = 10
disp_arr_mm = disp_arr * PIXEL_SIZE # convert disparites from px -> mm
depth_arr_mm = F * B / (disp_arr_mm + 1e-10) # calculate depth
return depth_arr_mm
def get_calibration() -> tuple:
fs = cv2.FileStorage(
"calib/rectify_map_imx219_160deg_1080p.yaml", cv2.FILE_STORAGE_READ
)
map_l = (fs.getNode("map_l_x").mat(), fs.getNode("map_l_y").mat())
map_r = (fs.getNode("map_r_x").mat(), fs.getNode("map_r_y").mat())
fs.release()
return map_l, map_r
def make_vpi_warpmap(cv_maps):
import vpi
src_map = cv_maps[0]
idk_what_that_is = cv_maps[1]
map_y, map_x = src_map[:, :, 0], src_map[:, :, 1]
warp = vpi.WarpMap(vpi.WarpGrid((1920, 1080)))
# (H, W, C) -> (C,H,W)
arr_warp = np.asarray(warp)
wx = arr_warp[:, :, 0]
wy = arr_warp[:, :, 1]
wy[:1080, :] = map_x
wx[:1080, :] = map_y
return warp
class CameraThread(Thread):
def __init__(self, sensor_id) -> None:
super().__init__()
self._camera = Camera(sensor_id)
self._should_run = True
self._image = self._camera.read()
self.start()
def run(self):
while self._should_run:
self._image = self._camera.read()
@property
def image(self):
# NOTE: if we care about atomicity of reads, we can add a lock here
return self._image
def stop(self):
self._should_run = False
self._camera.stop()
def calculate_depth(disp_arr):
F, B, PIXEL_SIZE = (CAM_PARAMS.F, CAM_PARAMS.B, CAM_PARAMS.PIXEL_SIZE)
disp_arr_mm = disp_arr * PIXEL_SIZE # disparites in mm
depth_arr_mm = F * B / (disp_arr_mm + 1e-1) # distances
return depth_arr_mm
if __name__ == "__main__":
DISPLAY = True
SAVE = True
frames_d = []
frames_rgb = []
depth = Depth()
t1 = time.perf_counter()
for i in range(100):
disp_arr = depth.disparity()
frames_d.append(disp_arr)
frames_rgb.append(depth._cam_l.image)
print(i)
# print(disp_arr)
# depth_arr = calculate_depth(disp_arr)
# print(f"{i}/20: {disp_arr:0.2f}")
# time.sleep(1)
if DISPLAY:
disp_arr = cv2.applyColorMap(disp_arr, cv2.COLORMAP_TURBO)
cv2.imshow("Depth", disp_arr)
cv2.imshow("Image", cv2.resize(depth._cam_l.image, (480, 270)))
cv2.waitKey(1)
depth.stop()
t2 = time.perf_counter()
print(f"Approx framerate: {len(frames_d)/(t2-t1)} FPS")
# Save frames
for i, (disp_arr, rgb_arr) in enumerate(zip(frames_d, frames_rgb)):
print(f"{i}/{len(frames_d)}", end="\r")
disp_arr = cv2.applyColorMap(disp_arr, cv2.COLORMAP_TURBO)
cv2.imwrite(f"images/depth_{i}.jpg", disp_arr)
cv2.imwrite(f"images/rgb_{i}.jpg", rgb_arr)
写了这么多了,不写了,给个demo。
给出相机的打印件
后面
由远及近
https://docs.nvidia.com/vpi/python/utilities.html
https://github.com/NVIDIA-AI-IOT/jetson-stereo-depth