来源:气象水文科研猫
1.Mann-Kendall突变点检测:
# Mann-Kendall突变点检测
# 数据序列y
# 结果序列UF,UB
#--------------------------------------------
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif']=['SimHei'] #用来正常显示中文标签
plt.rcParams['axes.unicode_minus']=False #用来正常显示负号
def Kendall_change_point_detection(inputdata):
inputdata = np.array(inputdata)
n=inputdata.shape[0]
# 正序列计算---------------------------------
# 定义累计量序列Sk,初始值=0
Sk = [0]
# 定义统计量UFk,初始值 =0
UFk = [0]
# 定义Sk序列元素s,初始值 =0
s = 0
Exp_value = [0]
Var_value = [0]
# i从1开始,因为根据统计量UFk公式,i=0时,Sk(0)、E(0)、Var(0)均为0
# 此时UFk无意义,因此公式中,令UFk(0)=0
for i in range(1,n):
for j in range(i):
if inputdata[i] > inputdata[j]:
s = s+1
else:
s = s+0
Sk.append(s)
Exp_value.append((i+1)*(i+2)/4 ) # Sk[i]的均值
Var_value.append((i+1)*i*(2*(i+1)+5)/72 ) # Sk[i]的方差
UFk.append((Sk[i]-Exp_value[i])/np.sqrt(Var_value[i]))
# ------------------------------正序列计算
# 逆序列计算---------------------------------
# 定义逆序累计量序列Sk2,长度与inputdata一致,初始值=0
Sk2 = [0]
# 定义逆序统计量UBk,长度与inputdata一致,初始值=0
UBk = [0]
UBk2 = [0]
# s归0
s2 = 0
Exp_value2 = [0]
Var_value2 = [0]
# 按时间序列逆转样本y
inputdataT = list(reversed(inputdata))
# i从2开始,因为根据统计量UBk公式,i=1时,Sk2(1)、E(1)、Var(1)均为0
# 此时UBk无意义,因此公式中,令UBk(1)=0
for i in range(1,n):
for j in range(i):
if inputdataT[i] > inputdataT[j]:
s2 = s2+1
else:
s2 = s2+0
Sk2.append(s2)
Exp_value2.append((i+1)*(i+2)/4 ) # Sk[i]的均值
Var_value2.append((i+1)*i*(2*(i+1)+5)/72 ) # Sk[i]的方差
UBk.append((Sk2[i]-Exp_value2[i])/np.sqrt(Var_value2[i]))
UBk2.append(-UBk[i])
# 由于对逆序序列的累计量Sk2的构建中,依然用的是累加法,即后者大于前者时s加1,
# 则s的大小表征了一种上升的趋势的大小,而序列逆序以后,应当表现出与原序列相反
# 的趋势表现,因此,用累加法统计Sk2序列,统计量公式(S(i)-E(i))/sqrt(Var(i))
#也不应改变,但统计量UBk应取相反数以表征正确的逆序序列的趋势
# UBk(i)=0-(Sk2(i)-E)/sqrt(Var)
# ------------------------------逆序列计算
# 此时上一步的到UBk表现的是逆序列在逆序时间上的趋势统计量
# 与UFk做图寻找突变点时,2条曲线应具有同样的时间轴,因此
# 再按时间序列逆转结果统计量UBk,得到时间正序的UBkT,
UBkT = list(reversed(UBk2))
diff = np.array(UFk) - np.array(UBkT)
K = list()
# 找出交叉点
for k in range(1,n):
if diff[k-1]*diff[k]<0:
K.append(k)
# 做突变检测图时,使用UFk和UBkT
plt.figure(figsize=(10,5))
plt.plot(range(1,n+1) ,UFk ,label='UFk') # UFk
plt.plot(range(1,n+1) ,UBkT ,label='UBk') # UBk
plt.ylabel('UFk-UBk')
x_lim = plt.xlim()
plt.plot(x_lim,[-1.96,-1.96],'m--',color='r')
plt.plot(x_lim,[ 0 , 0 ],'m--')
plt.plot(x_lim,[+1.96,+1.96],'m--',color='r')
plt.legend(loc=2) # 图例
plt.show()
return K
2.Pettitt突变点检测:
def Pettitt_change_point_detection(inputdata):
inputdata = np.array(inputdata)
n = inputdata.shape[0]
k = range(n)
inputdataT = pd.Series(inputdata)
r = inputdataT.rank()
Uk = [2*np.sum(r[0:x])-x*(n + 1) for x in k]
Uka = list(np.abs(Uk))
U = np.max(Uka)
K = Uka.index(U)
pvalue = 2 * np.exp((-6 * (U**2))/(n**3 + n**2))
if pvalue <= 0.05:
change_point_desc = '显著'
else:
change_point_desc = '不显著'
#Pettitt_result = {'突变点位置':K,'突变程度':change_point_desc}
return K #,Pettitt_result
3.Buishand U test突变点检测:
def Buishand_U_change_point_detection(inputdata):
inputdata = np.array(inputdata)
inputdata_mean = np.mean(inputdata)
n = inputdata.shape[0]
k = range(n)
Sk = [np.sum(inputdata[0:x+1] - inputdata_mean) for x in k]
sigma = np.sqrt(np.sum((inputdata-np.mean(inputdata))**2)/(n-1))
U = np.sum((Sk[0:(n - 2)]/sigma)**2)/(n * (n + 1))
Ska = np.abs(Sk)
S = np.max(Ska)
K = list(Ska).index(S) + 1
Skk = (Sk/sigma)
return K
4.Standard Normal Homogeneity Test (SNHT)突变点检测:
def SNHT_change_point_detection(inputdata):
inputdata = np.array(inputdata)
inputdata_mean = np.mean(inputdata)
n = inputdata.shape[0]
k = range(1,n)
sigma = np.sqrt(np.sum((inputdata-np.mean(inputdata))**2)/(n-1))
Tk = [x*(np.sum((inputdata[0:x]-inputdata_mean)/sigma)/x)**2 + (n-x)*(np.sum((inputdata[x:n]-inputdata_mean)/sigma)/(n-x))**2 for x in k]
T = np.max(Tk)
K = list(Tk).index(T) + 1
return K
5.非平稳时间序列突变检测的启发式分割算法
注:BG代码还在优化中,下期推文重点介绍。
声明:欢迎转载、转发本号原创内容,可留言区留言或者后台联系小编(微信:gavin7675)进行授权。气象学家公众号转载信息旨在传播交流,其内容由作者负责,不代表本号观点。文中部分图片来源于网络,如涉及作品内容、版权和其他问题,请后台联系小编处理。