本文将介绍如何使用长短期记忆(Long Short-Term Memory,LSTM)网络来预测降雨时间序列。LSTM是一种递归神经网络(Recurrent Neural Network,RNN),专门用于处理序列数据中的长期依赖关系。
每年的降雨量数据可能是相当不稳定的。与温度不同,温度通常在四季中表现出明显的趋势,而雨量作为一个时间序列可能是相当不稳定的。LSTM网络能够捕捉和记忆长序列中的信息,因此非常适用于降雨时间序列数据。
与传统的前馈神经网络不同,LSTM网络具有可以存储和更新信息的记忆单元。这使得它们能够学习输入序列中的模式和依赖关系。
LSTM单元的关键组成部分是输入门、遗忘门和输出门。这些门控制信息进出单元的流动,使LSTM能够选择性地保留或丢弃信息。此外,细胞状态作为长期记忆,跨时间步保留相关信息。
LSTM网络在降雨时间序列预测中具有以下优势:
这里是一个多波段的全球降水数据(ERA5)
from utils import plot
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import numpy as np
import xarray as xr
file_name='D:/Onedrive/Acdemic/DL_grace/data/train/prcp.tif'
ds=xr.open_dataset(file_name)
data = ds['band_data'][7]
fig = plt.figure()
proj = ccrs.Robinson() #ccrs.Robinson()ccrs.Mollweide()Mollweide()
ax = fig.add_subplot(111, projection=proj)
levels = np.linspace(0, 0.25, num=9)
plot.one_map_flat(data, ax, levels=levels, cmap="RdBu", mask_ocean=False, add_coastlines=True, add_land=True, colorbar=True, plotfunc="pcolormesh")
上述是用一个函数one_map_flat
绘制的,如果想学习,可以参考之前的推文:一键绘制Nature风格地图
import torch
import torch as t
import numpy as np
import torch.nn as nn
import torch.optim as optim
from torchnet import meter
import xarray as xr
import rioxarray as rxr
检查GPU是否可用
torch.cuda.is_available()
如果你没有深度学习环境,请参考之前的推文:从零搭建深度学习环境
从零搭建深度学习环境Tensorflow+PyTorch(附深度学习入门三大名著)
不会深度学习基本原理?可以参考之前的资源汇总:
我把数据科学/深度学习资源做了个汇总...(PDF电子书+网课)
将数据转换为 PyTorch 张量,并标准化
precipitation_data = rxr.open_rasterio('D:/Onedrive/Acdemic/DL_grace/data/train/prcp.tif').values
# 将数据转换为 PyTorch 张量
precipitation_data = torch.tensor(precipitation_data, dtype=torch.float32)
precipitation_mean = torch.mean(precipitation_data, 0)
precipitation_std = torch.std(precipitation_data, 0)
precipitation = (precipitation_data - precipitation_mean) / precipitation_std
precipitation_re = precipitation.reshape(183,-1).transpose(0,1)
这里是全球陆表降水,因此删除那些海洋为空的栅格
# 创建二维矩阵
import random
matrix = torch.mean(torch.stack([torch.mean(precipitation_re, 1)], 1), 1).flatten()
# 将矩阵中值为NaN的元素置为0
matrix[torch.isnan(matrix)] = 0
# 获取所有不为NaN的元素的索引
non_negative_indices = torch.nonzero(matrix)
precipitation_re = precipitation_re[non_negative_indices.flatten(), :]
定义全局参数:
class Config(object):
t0 = 155 #155
t1 = 12
t = t0 + t1
train_num = 8000 #8
validation_num = 1000 #1
test_num = 1000 #1
in_channels = 1
batch_size = 500 #500 NSE 0.75
lr = .0005 # learning rate
epochs = 100
定义数据集加载等等:
import torch
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import Dataset
class time_series_decoder_paper(Dataset):
"""synthetic time series dataset from section 5.1"""
def __init__(self,t0=120,N=4500,dx=None,dy=None,transform=None):
"""
Args:
t0: previous t0 data points to predict from
N: number of data points
transform: any transformations to be applied to time series
"""
self.t0 = t0
self.N = N
self.dx = dx
self.dy = dy
self.transform = None
# time points
#self.x = torch.cat(N*[torch.arange(0,t0+24).type(torch.float).unsqueeze(0)])
self.x = dx
self.fx = dy
# self.fx = torch.cat([A1.unsqueeze(1)*torch.sin(np.pi*self.x[0,0:12]/6)+72 ,
# A2.unsqueeze(1)*torch.sin(np.pi*self.x[0,12:24]/6)+72 ,
# A3.unsqueeze(1)*torch.sin(np.pi*self.x[0,24:t0]/6)+72,
# A4.unsqueeze(1)*torch.sin(np.pi*self.x[0,t0:t0+24]/12)+72],1)
# add noise
# self.fx = self.fx + torch.randn(self.fx.shape)
self.masks = self._generate_square_subsequent_mask(t0)
# print out shapes to confirm desired output
print("x: ",self.x.shape,
"fx: ",self.fx.shape)
def __len__(self):
return len(self.fx)
def __getitem__(self,idx):
if torch.is_tensor(idx):
idx = idx.tolist()
sample = (self.x[idx,:,:], #self.x[idx,:]
self.fx[idx,:],
self.masks)
if self.transform:
sample=self.transform(sample)
return sample
def _generate_square_subsequent_mask(self,t0):
mask = torch.zeros(Config.t,Config.t)
for i in range(0,Config.t0):
mask[i,Config.t0:] = 1
for i in range(Config.t0,Config.t):
mask[i,i+1:] = 1
mask = mask.float().masked_fill(mask == 1, float('-inf'))#.masked_fill(mask == 1, float(0.0))
return mask
定义上下文序列标记
import torch
import numpy as np
import matplotlib.pyplot as plt
import torch.nn.functional as F
class CausalConv1d(torch.nn.Conv1d):
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride=1,
dilation=1,
groups=1,
bias=True):
super(CausalConv1d, self).__init__(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=0,
dilation=dilation,
groups=groups,
bias=bias)
self.__padding = (kernel_size - 1) * dilation
def forward(self, input):
return super(CausalConv1d, self).forward(F.pad(input, (self.__padding, 0)))
class context_embedding(torch.nn.Module):
def __init__(self,in_channels=Config.in_channels,embedding_size=256,k=5):
super(context_embedding,self).__init__()
self.causal_convolution = CausalConv1d(in_channels,embedding_size,kernel_size=k)
def forward(self,x):
x = self.causal_convolution(x)
return torch.tanh(x)
定义LSTM网络
class LSTM_Time_Series(torch.nn.Module):
def __init__(self,input_size=2,embedding_size=256,kernel_width=9,hidden_size=512):
super(LSTM_Time_Series,self).__init__()
self.input_embedding = context_embedding(input_size,embedding_size,kernel_width)
self.lstm = torch.nn.LSTM(embedding_size,hidden_size,batch_first=True)
self.fc1 = torch.nn.Linear(hidden_size,1)
def forward(self,x,y):
"""
x: the time covariate
y: the observed target
"""
# concatenate observed points and time covariate
# (B,input size + covariate size,sequence length)
# z = torch.cat((y.unsqueeze(1),x.unsqueeze(1)),1)
z_obs = torch.cat((y.unsqueeze(1),x.unsqueeze(1)),1)
if isLSTM:
z_obs = torch.cat((y, x),1)
# input_embedding returns shape (B,embedding size,sequence length)
z_obs_embedding = self.input_embedding(z_obs)
# permute axes (B,sequence length, embedding size)
z_obs_embedding = self.input_embedding(z_obs).permute(0,2,1)
# all hidden states from lstm
# (B,sequence length,num_directions * hidden size)
lstm_out,_ = self.lstm(z_obs_embedding)
# input to nn.Linear: (N,*,Hin)
# output (N,*,Hout)
return self.fc1(lstm_out)
在所有数据中随机选择数据进行训练,验证,预测。
这里为8:1:1
from torch.utils.data import DataLoader
import random
random.seed(0)
random_indices = random.sample(range(non_negative_indices.shape[0]), Config.train_num)
random_indices1 = random.sample(range(non_negative_indices.shape[0]), Config.validation_num)
random_indices2 = random.sample(range(non_negative_indices.shape[0]), Config.test_num)
dx = torch.stack([torch.cat(Config.train_num*[torch.arange(0,Config.t).type(torch.float).unsqueeze(0)]).cuda()], 1)
dx1 = torch.stack([torch.cat(Config.validation_num*[torch.arange(0,Config.t).type(torch.float).unsqueeze(0)]).cuda()], 1)
dx2 = torch.stack([torch.cat(Config.test_num*[torch.arange(0,Config.t).type(torch.float).unsqueeze(0)]).cuda()], 1)
train_dataset = time_series_decoder_paper(t0=Config.t0,N=Config.train_num,dx=dx ,dy=precipitation_re[np.array([random_indices]).flatten(),0:Config.t].unsqueeze(1))
validation_dataset = time_series_decoder_paper(t0=Config.t0,N=Config.validation_num,dx=dx1,dy=precipitation_re[np.array([random_indices1]).flatten(),0:Config.t].unsqueeze(1))
test_dataset = time_series_decoder_paper(t0=Config.t0,N=Config.test_num,dx=dx2,dy=precipitation_re[np.array([random_indices2]).flatten(),0:Config.t].unsqueeze(1))
将数据加载到GPU
criterion = torch.nn.MSELoss()
train_dl = DataLoader(train_dataset,batch_size=Config.batch_size,shuffle=True, generator=torch.Generator(device='cpu'))
validation_dl = DataLoader(validation_dataset,batch_size=Config.batch_size, generator=torch.Generator(device='cpu'))
test_dl = DataLoader(test_dataset,batch_size=Config.batch_size, generator=torch.Generator(device='cpu'))
定义损失函数
criterion_LSTM = torch.nn.MSELoss()
将模型加载到GPU
LSTM = LSTM_Time_Series().cuda()
定义train_epoch
等训练、验证、测试的函数
def Dp(y_pred,y_true,q):
return max([q*(y_pred-y_true),(q-1)*(y_pred-y_true)])
def Rp_num_den(y_preds,y_trues,q):
numerator = np.sum([Dp(y_pred,y_true,q) for y_pred,y_true in zip(y_preds,y_trues)])
denominator = np.sum([np.abs(y_true) for y_true in y_trues])
return numerator,denominator
def train_epoch(LSTM,train_dl,t0=Config.t0):
LSTM.train()
train_loss = 0
n = 0
for step,(x,y,_) in enumerate(train_dl):
x = x.cuda()
y = y.cuda()
optimizer.zero_grad()
output = LSTM(x,y)
loss = criterion(output.squeeze()[:,(Config.t0-1):(Config.t0+Config.t1-1)],y.cuda()[:,0,Config.t0:])
loss.backward()
optimizer.step()
train_loss += (loss.detach().cpu().item() * x.shape[0])
n += x.shape[0]
return train_loss/n
def eval_epoch(LSTM,validation_dl,t0=Config.t0):
LSTM.eval()
eval_loss = 0
n = 0
with torch.no_grad():
for step,(x,y,_) in enumerate(train_dl):
x = x.cuda()
y = y.cuda()
output = LSTM(x,y)
loss = criterion(output.squeeze()[:,(Config.t0-1):(Config.t0+Config.t1-1)],y.cuda()[:,0,Config.t0:])
eval_loss += (loss.detach().cpu().item() * x.shape[0])
n += x.shape[0]
return eval_loss/n
def test_epoch(LSTM,test_dl,t0=Config.t0):
with torch.no_grad():
predictions = []
observations = []
LSTM.eval()
for step,(x,y,_) in enumerate(train_dl):
x = x.cuda()
y = y.cuda()
output = LSTM(x,y)
for p,o in zip(output.squeeze()[:,(Config.t0-1):(Config.t0+Config.t1-1)].cpu().numpy().tolist(),y.cuda()[:,0,Config.t0:].cpu().numpy().tolist()):
predictions.append(p)
observations.append(o)
num = 0
den = 0
for y_preds,y_trues in zip(predictions,observations):
num_i,den_i = Rp_num_den(y_preds,y_trues,.5)
num+=num_i
den+=den_i
Rp = (2*num)/den
return Rp
开始训练
train_epoch_loss = []
eval_epoch_loss = []
Rp_best = 30
isLSTM = True
optimizer = torch.optim.Adam(LSTM.parameters(), lr=Config.lr)
for e,epoch in enumerate(range(Config.epochs)):
train_loss = []
eval_loss = []
l_train = train_epoch(LSTM,train_dl)
train_loss.append(l_train)
l_eval = eval_epoch(LSTM,validation_dl)
eval_loss.append(l_eval)
Rp = test_epoch(LSTM,test_dl)
if Rp_best > Rp:
Rp_best = Rp
with torch.no_grad():
print("Epoch {}: Train loss={} \t Eval loss = {} \t Rp={}".format(e,np.mean(train_loss),np.mean(eval_loss),Rp))
train_epoch_loss.append(np.mean(train_loss))
eval_epoch_loss.append(np.mean(eval_loss))
结果如图,这里是100个epoch
image-20230807223945852
展示结果,在时间序列上,预测效果还是不错:
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
n_plots = 5
t0=120
with torch.no_grad():
LSTM.eval()
for step,(x,y,_) in enumerate(test_dl):
x = x.cuda()
y = y.cuda()
output = LSTM(x,y)
if step > n_plots:
break
with torch.no_grad():
plt.figure(figsize=(10,10))
plt.plot(x[1, 0].cpu().detach().squeeze().numpy(),y[1].cpu().detach().squeeze().numpy(),'g--',linewidth=3)
plt.plot(x[1, 0, Config.t0:].cpu().detach().squeeze().numpy(),output[1,(Config.t0-1):(Config.t0+Config.t1-1),0].cpu().detach().squeeze().numpy(),'b--',linewidth=3)
plt.xlabel("x",fontsize=20)
plt.legend(["$[0,t_0+24)_{obs}$","$[t_0,t_0+24)_{predicted}$"])
plt.show()
接下来在测试集上进行全部验证:
matrix = torch.empty(0).cuda()
obsmat = torch.empty(0).cuda()
with torch.no_grad():
LSTM.eval()
predictions = []
observations = []
for step,(x,y,attention_masks) in enumerate(test_dl):
# if step == 8:
# break
output = LSTM(x.cuda(),y.cuda())
matrix = torch.cat((matrix, output.cuda()))
obsmat = torch.cat((obsmat, y.cuda()))
pre = matrix.cpu().detach().numpy()
obs = obsmat.cpu().detach().numpy()
# libraries
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# data
df = pd.DataFrame({
'obs': obs[:, 0, Config.t0:Config.t].flatten(),
'pre': pre[:, Config.t0:Config.t, 0].flatten()
})
df
可视化结果:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from matplotlib import rcParams
from statistics import mean
from sklearn.metrics import explained_variance_score,r2_score,median_absolute_error,mean_squared_error,mean_absolute_error
from scipy.stats import pearsonr
# 加载数据(PS:原始数据太多,采样10000)
# 默认是读取csv/xlsx的列成DataFrame
config = {"font.family":'Times New Roman',"font.size": 16,"mathtext.fontset":'stix'}
#df = df.sample(5000)
# 用于计算指标
x = df['obs']; y = df['pre']
rcParams.update(config)
BIAS = mean(x - y)
MSE = mean_squared_error(x, y)
RMSE = np.power(MSE, 0.5)
R2 = pearsonr(x, y).statistic
adjR2 = 1-((1-r2_score(x,y))*(len(x)-1))/(len(x)-Config.in_channels-1)
MAE = mean_absolute_error(x, y)
EV = explained_variance_score(x, y)
NSE = 1 - (RMSE ** 2 / np.var(x))
# 计算散点密度
xy = np.vstack([x, y])
z = stats.gaussian_kde(xy)(xy)
idx = z.argsort()
x, y, z = x.iloc[idx], y.iloc[idx], z[idx]
# 拟合(若换MK,自行操作)最小二乘
def slope(xs, ys):
m = (((mean(xs) * mean(ys)) - mean(xs * ys)) / ((mean(xs) * mean(xs)) - mean(xs * xs)))
b = mean(ys) - m * mean(xs)
return m, b
k, b = slope(x, y)
regression_line = []
for a in x:
regression_line.append((k * a) + b)
# 绘图,可自行调整颜色等等
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
fig,ax=plt.subplots(figsize=(8,6),dpi=300)
scatter=ax.scatter(x, y, marker='o', c=z*100, edgecolors=None ,s=15, label='LST',cmap='Spectral_r')
cbar=plt.colorbar(scatter,shrink=1,orientation='vertical',extend='both',pad=0.015,aspect=30,label='frequency')
plt.plot([-30,30],[-30,30],'black',lw=1.5) # 画的1:1线,线的颜色为black,线宽为0.8
plt.plot(x,regression_line,'red',lw=1.5) # 预测与实测数据之间的回归线
plt.axis([-30,30,-30,30]) # 设置线的范围
plt.xlabel('OBS',family = 'Times New Roman')
plt.ylabel('PRE',family = 'Times New Roman')
plt.xticks(fontproperties='Times New Roman')
plt.yticks(fontproperties='Times New Roman')
plt.text(-1.8,1.75, '$N=%.f$' % len(y), family = 'Times New Roman') # text的位置需要根据x,y的大小范围进行调整。
plt.text(-1.8,1.50, '$R^2=%.3f$' % R2, family = 'Times New Roman')
plt.text(-1.8,1.25, '$NSE=%.3f$' % NSE, family = 'Times New Roman')
plt.text(-1.8,1, '$RMSE=%.3f$' % RMSE, family = 'Times New Roman')
plt.xlim(-2,2) # 设置x坐标轴的显示范围
plt.ylim(-2,2) # 设置y坐标轴的显示范围
plt.show()
如果想学习上图的制作方法,可以参考之前的推文:python绘制散点密度图
粗略的结果还是不错的,模型没有进行任何的调参。尝试修改lr,batch size,损失函数,epochs等等进行深度调参,会使结果更好。