我正在使用Mydia从视频中提取随机帧。因为我有很多视频,所以我想在保持可重复性的同时并行化这个工作流程。mydia.Videos
接受随机种子,这对于确保可重复性很重要。现在,我需要处理并行化部分。
给定n
视频和随机种子r
,我如何确保提取的每个视频的帧是相同的,而不考虑工作人员的数量?我对算法组件特别感兴趣,不一定是代码。
我最初的想法是使用multiprocessing.Pool
。但是,如果进程的完成时间是不确定的,则在对帧进行采样时将存在竞争条件;即,如果proc 1花费的时间比proc 0长,则来自Videos
类的采样帧将与proc 0花费的时间长于proc 1。
发布于 2018-10-09 07:25:22
我的解决方案有点非正统,因为它是特定于库的。Mydia允许传递要提取的帧,而不是强制Videos
客户端直接采样。这为我提供了在父进程中预先计算要采样的帧的机会。通过这样做,我可以通过用这些帧实例化一个新的Videos
来“模拟”子流程中的随机性。例如:
class MySampler:
def __init__(self, input_directory: Path, total_frames: int, num_frames: int, fps: int):
self.input_directory = Path(input_directory)
self.frames_per_video = [
self.__get_frame_numbers_for_each_video(total_frames, num_frames, fps)
for _ in self.input_directory.glob("*.mp4")
]
@staticmethod
def get_reader(num_frames: int, frames: List[int]):
# ignores the inputs and returns samples the frames that its constructed with
return Videos(target_size=(512, 512), num_frames=num_frames, mode=lambda *_: frames)
然后我可以简单地将其并行化:
def sample_frames(self, number_of_workers: int):
pool = Pool(processes=number_of_workers)
videos = list(self.input_directory.glob("*.mp4"))
pool.starmap_async(self.read_video, zip(self.frames_per_video, videos))
pool.close()
pool.join()
其中read_video
是调用get_reader
并执行读取的方法。
https://stackoverflow.com/questions/52709049
复制相似问题