毋庸置疑在做时空序列模型的时候,oving数据集,或者说标准的数据集是必要的
这篇文章我们主要介绍MovingMnist数据集,做这个方向的research是逃不过这个数据集的使用的

地址:http://www.cs.toronto.edu/~nitish/unsupervised_video/
这个数据集主要是在 Unsupervised Learning of Video Representations using LSTMs 被开始训练并测试使用,这篇paper也是一篇很有奠基性的paper,一直没来得及写解析,有时间会补上
一个seq有二十个图片,前十帧为input,后十帧为target,一共有10000个sequence,每个图片的大小为64✖64
(contains 10,000 sequences each of length 20 showing 2 digits moving in a 64 x 64 frame.)


我们的任务大体上就是输入前十张去预测后十张
废话不多说,直接show my code
MovingMnist_dataset.py
import numpy as np
from torch.utils.data import Dataset
import torch
def MNISTdataLoader(path):
# load moving mnist data, data shape = [time steps, batch size, width, height] = [20, batch_size, 64, 64]
# B S H W -> S B H W
data = np.load(path)
data_trans = data.transpose(1, 0, 2, 3)
return data_trans
class MovingMNISTdataset(Dataset):
def __init__(self, path):
self.path = path
self.data = MNISTdataLoader(path)
def __len__(self):
return len(self.data[:, 0, 0, 0])
def __getitem__(self, indx):
self.trainsample_ = self.data[indx, ...]
# self.sample_ = self.trainsample_/255.0 # normalize
self.sample_ = self.trainsample_
self.sample = torch.from_numpy(np.expand_dims(self.sample_, axis=1)).float()
return self.sample
if __name__ == '__main__':
mnistdata = MovingMNISTdataset("./mnist_test_seq.npy")
print(mnistdata.__getitem__(0).shape)

写了个分train test val的main函数的开头demo,仅供参考
import numpy as np
from torch.utils.data import Dataset
import torch
from torch.utils.data import DataLoader, random_split
from torch.utils.data.sampler import SubsetRandomSampler
valid_size = 0.2
batch_size = 64
shuffle_dataset = True
random_seed = 1222
if torch.cuda.device_count() > 1:
torch.cuda.manual_seed_all(random_seed)
else:
torch.cuda.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def MNISTdataLoader(path):
# load moving mnist data, data shape = [time steps, batch size, width, height] = [20, batch_size, 64, 64]
# B S H W -> S B H W
data = np.load(path)
data_trans = data.transpose(1, 0, 2, 3)
return data_trans
class MovingMNISTdataset(Dataset):
def __init__(self, path):
self.path = path
self.data = MNISTdataLoader(path)
def __len__(self):
return len(self.data[:, 0, 0, 0])
def __getitem__(self, indx):
self.trainsample_ = self.data[indx, ...]
# self.sample_ = self.trainsample_/255.0 # normalize
self.sample_ = self.trainsample_
self.sample = torch.from_numpy(np.expand_dims(self.sample_, axis=1)).float()
return self.sample
# training set or testing set, val set
mnistdata = MovingMNISTdataset("./data/mnist_test_seq.npy")
train_size = int(0.8 * len(mnistdata))
test_size = len(mnistdata) - train_size
torch.manual_seed(torch.initial_seed())
train_dataset, test_dataset = random_split(mnistdata, [train_size, test_size])
num_train = len(train_dataset)
indices = list(range(num_train))
np.random.shuffle(indices)
split = int(np.floor(valid_size * num_train))
if shuffle_dataset:
np.random.seed(random_seed)
np.random.shuffle(indices)
train_idx, valid_idx = indices[split:], indices[:split]
# define samplers for obtaining training and validation batches
train_sampler = SubsetRandomSampler(train_idx)
valid_sampler = SubsetRandomSampler(valid_idx)
# load training data in batches
train_loader = DataLoader(train_dataset,
batch_size=batch_size,
sampler=train_sampler)
# load validation data in batches
valid_loader = DataLoader(train_dataset,
batch_size=batch_size,
sampler=valid_sampler)
# load test data in batches
test_loader = DataLoader(test_dataset,
batch_size=batch_size)
https://github.com/tychovdo/MovingMNIST

基本上和我写的分布差不多
总体来说比我写的更官方,更具体,看来我自身还是有很大的差距的,直视差距,慢慢努力~
整体代码不难,自己看看基本上就明白了,这里不具体讲了~
MovingMNIST.py
from __future__ import print_function
import torch.utils.data as data
from PIL import Image
import os
import os.path
import errno
import numpy as np
import torch
import codecs
class MovingMNIST(data.Dataset):
"""`MovingMNIST <http://www.cs.toronto.edu/~nitish/unsupervised_video/>`_ Dataset.
Args:
root (string): Root directory of dataset where ``processed/training.pt``
and ``processed/test.pt`` exist.
train (bool, optional): If True, creates dataset from ``training.pt``,
otherwise from ``test.pt``.
split (int, optional): Train/test split size. Number defines how many samples
belong to test set.
download (bool, optional): If true, downloads the dataset from the internet and
puts it in root directory. If dataset is already downloaded, it is not
downloaded again.
transform (callable, optional): A function/transform that takes in an PIL image
and returns a transformed version. E.g, ``transforms.RandomCrop``
target_transform (callable, optional): A function/transform that takes in an PIL
image and returns a transformed version. E.g, ``transforms.RandomCrop``
"""
urls = [
'https://github.com/tychovdo/MovingMNIST/raw/master/mnist_test_seq.npy.gz'
]
raw_folder = 'raw'
processed_folder = 'processed'
training_file = 'moving_mnist_train.pt'
test_file = 'moving_mnist_test.pt'
def __init__(self, root, train=True, split=1000, transform=None, target_transform=None, download=False):
self.root = os.path.expanduser(root)
self.transform = transform
self.target_transform = target_transform
self.split = split
self.train = train # training set or test set
if download:
self.download()
if not self._check_exists():
raise RuntimeError('Dataset not found.' +
' You can use download=True to download it')
if self.train:
self.train_data = torch.load(
os.path.join(self.root, self.processed_folder, self.training_file))
else:
self.test_data = torch.load(
os.path.join(self.root, self.processed_folder, self.test_file))
def __getitem__(self, index):
"""
Args:
index (int): Index
Returns:
tuple: (seq, target) where sampled sequences are splitted into a seq
and target part
"""
# need to iterate over time
def _transform_time(data):
new_data = None
for i in range(data.size(0)):
img = Image.fromarray(data[i].numpy(), mode='L')
new_data = self.transform(img) if new_data is None else torch.cat([self.transform(img), new_data], dim=0)
return new_data
if self.train:
seq, target = self.train_data[index, :10], self.train_data[index, 10:]
else:
seq, target = self.test_data[index, :10], self.test_data[index, 10:]
if self.transform is not None:
seq = _transform_time(seq)
if self.target_transform is not None:
target = _transform_time(target)
return seq, target
def __len__(self):
if self.train:
return len(self.train_data)
else:
return len(self.test_data)
def _check_exists(self):
return os.path.exists(os.path.join(self.root, self.processed_folder, self.training_file)) and \
os.path.exists(os.path.join(self.root, self.processed_folder, self.test_file))
def download(self):
"""Download the Moving MNIST data if it doesn't exist in processed_folder already."""
from six.moves import urllib
import gzip
if self._check_exists():
return
# download files
try:
os.makedirs(os.path.join(self.root, self.raw_folder))
os.makedirs(os.path.join(self.root, self.processed_folder))
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
raise
for url in self.urls:
print('Downloading ' + url)
data = urllib.request.urlopen(url)
filename = url.rpartition('/')[2]
file_path = os.path.join(self.root, self.raw_folder, filename)
with open(file_path, 'wb') as f:
f.write(data.read())
with open(file_path.replace('.gz', ''), 'wb') as out_f, \
gzip.GzipFile(file_path) as zip_f:
out_f.write(zip_f.read())
os.unlink(file_path)
# process and save as torch files
print('Processing...')
training_set = torch.from_numpy(
np.load(os.path.join(self.root, self.raw_folder, 'mnist_test_seq.npy')).swapaxes(0, 1)[:-self.split]
)
test_set = torch.from_numpy(
np.load(os.path.join(self.root, self.raw_folder, 'mnist_test_seq.npy')).swapaxes(0, 1)[-self.split:]
)
with open(os.path.join(self.root, self.processed_folder, self.training_file), 'wb') as f:
torch.save(training_set, f)
with open(os.path.join(self.root, self.processed_folder, self.test_file), 'wb') as f:
torch.save(test_set, f)
print('Done!')
def __repr__(self):
fmt_str = 'Dataset ' + self.__class__.__name__ + '\n'
fmt_str += ' Number of datapoints: {}\n'.format(self.__len__())
tmp = 'train' if self.train is True else 'test'
fmt_str += ' Train/test: {}\n'.format(tmp)
fmt_str += ' Root Location: {}\n'.format(self.root)
tmp = ' Transforms (if any): '
fmt_str += '{0}{1}\n'.format(tmp, self.transform.__repr__().replace('\n', '\n' + ' ' * len(tmp)))
tmp = ' Target Transforms (if any): '
fmt_str += '{0}{1}'.format(tmp, self.target_transform.__repr__().replace('\n', '\n' + ' ' * len(tmp)))
return fmt_str
demo.py
import os
import torch
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torch
import torch.nn as nn
from torch.autograd import Variable
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.nn.functional as F
import torch.optim as optim
from MovingMNIST import MovingMNIST
root = './data'
if not os.path.exists(root):
os.mkdir(root)
train_set = MovingMNIST(root='.data/mnist', train=True, download=True)
test_set = MovingMNIST(root='.data/mnist', train=False, download=True)
batch_size = 100
train_loader = torch.utils.data.DataLoader(
dataset=train_set,
batch_size=batch_size,
shuffle=True)
test_loader = torch.utils.data.DataLoader(
dataset=test_set,
batch_size=batch_size,
shuffle=False)
print('==>>> total trainning batch number: {}'.format(len(train_loader)))
print('==>>> total testing batch number: {}'.format(len(test_loader)))
for seq, seq_target in train_loader:
print('--- Sample')
print('Input: ', seq.shape)
print('Target: ', seq_target.shape)
break
主要的区别在于这个朋友写的代码直接dataset内部把train test分开了,并且输出的直接就是可以遍历input和target,而我写的相对简单,都是在一起的,外部来分开。
欢迎转发,点赞,star https://github.com/chehongshu/AIwoniuche_Learning/tree/master/Pytorch_MovingMnist