authors:: Hansheng Ren, Bixiong Xu, Yujing Wang, Chao Yi, Congrui Huang, Xiaoyu Kou, Tony Xing, Mao Yang, Jie Tong, Qi Zhang
container:: Proceedings of the 25th ACM SIGKDD International Conference on Knowledge Discovery & Data Mining
year:: 2019
DOI:: 10.1145/3292500.3330680
rating:: ⭐
share:: true
comment:: 将图像领域的光谱残差分析引入到时间序列检测,通过无监督的方法进行异常检测,异常为人工添加。
「异常检测」旨在发现数据中的意外事件或罕见项目。它在许多工业应用中非常流行,是数据挖掘中的一个重要研究领域。
为了解决时间序列异常检测的问题,作者提出了一种基于 谱残差(SR) 以及 卷积神经网络(CNN) 的新算法。首次尝试将 SR
模型从视觉显著性检测领域借用到时间序列异常检测中。此外,作者创新性地将 SR
和 CNN
结合起来,以提高 SR
模型的性能。
主要用于微软一个时间序列异常检测服务,帮助客户连续监测时间序列,并及时提醒潜在的事件。
SR
模型和 CNN
模型,以提高时间序列异常检测的精度。Spectral Residual(光谱残差)是一种基于快速傅里叶变换的方法,是一种无监督方法。
自然图像的统计特性具有变换不变性:即将图像从原来的空间坐标变换到频率坐标系中,图像在空间中具有的统计特性在频域中仍然保留,这种不变性恰好保证了采用能量谱来刻画自然图像空间相关性的可靠性。
从信息论的角度出发,一张图像信息可以如下表示:
表示新奇的部分,
表示冗余部分。因此在进行检测时,应该首先去掉冗余部分。
SR
模型所做的事情也就是去除冗余部分,保留新奇部分。
在自然图像统计的不变因子中,尺度不变性是最著名和研究最广泛的性质,它也被称为 1/f 定律。它表明,自然图像集合的平均傅里叶光谱的振幅
服从一个分布:
基于上述事实,作者首先通过对图像进行傅里叶变换,将其从空间域转到频域,对幅值取对数后得到
谱,由于
曲线满足局部线性条件,所以用局部平均滤波器对其进行平滑,获得平均频谱。
其中
分别表示傅里叶变化和傅里叶反变化;
表示原图像;
分别表示振幅和相位;
大小的滤波器。
微软主要提出和比较了 SR
和 SR+CNN
方法在时序数据异常检测上的效果,其中 SR
算法唯一的差别是输入变成了时序数据。
如下图所示,得到了 saliency map
之后,很容易利用一个简单的规则来注释异常点。可以采用一个简单的阈值 τ\tauτ 来注释异常点。
另外也可以采用动态阈值方法,详见:【论文阅读】Dynamic Error Thresholds
一般来讲都是采用「滑动窗口」进行异常点检测的,往往我们需要检测的点都是位于一段序列的末端,而 SR
算法当检测的点位于序列中央的时候效果会比较好,因此在进行 SR
计算之前需要对序列进行简单的预测进而延长序列,下面是论文中采用的延长算法:
由于 SR
方法是通过简单的手动设置阈值进行分类的,因此可以使用 CNN
这种更加强大的分类器进行分类。但是 CNN
分类的话需要有明确的标签,因此可以通过异常注入的方法来制造标签:
在一个实验中,作者收集了具有合成异常的生产时间序列作为训练数据。其优点是,该检测器可以自适应时间序列分布的变化,而不需要手动标记的数据。
SR-CNN
网络由两个一维卷积层(滤波器大小等于滑动窗口大小 ω\omegaω)和两个完全连接的层组成。第一个卷积层的信道大小等于 ω\omegaω;而第二个卷积层的信道大小增加了一倍。在 sigmoid
输出之前堆叠了两个完全连接的层。采用交叉熵作为损失函数,在训练过程中采用 SGD
优化器。
import numpy as np
class SpectralResidual:
def __init__(self, series_window_size, mag_window_size, score_window_size) -> None:
self.EPS = 1e-8
self.series_window_size = series_window_size
self.mag_window_size = mag_window_size
self.score_window_size = score_window_size
def detect(self, values):
result = np.array([])
for i in range(0, len(values), self.series_window_size):
if i + self.series_window_size > len(values):
seg = values[i:len(values)]
else:
seg = values[i:i + self.series_window_size]
anomaly = self.__detect_core(seg)
result = np.concatenate((result, anomaly), axis=0)
return result
def __detect_core(self, values):
extended_series = SpectralResidual.extend_series(values)
mags = self.spectral_residual_transform(extended_series)
anomaly_scores = self.generate_spectral_score(mags)
return anomaly_scores[:len(values)]
def generate_spectral_score(self, mags):
ave_mag = average_filter(mags, n=self.score_window_size)
safeDivisors = np.clip(ave_mag, self.EPS, ave_mag.max())
raw_scores = np.abs(mags - ave_mag) / safeDivisors
scores = np.clip(raw_scores / 10.0, 0, 1.0)
return scores
def spectral_residual_transform(self, values):
"""
This method transform a time series into spectral residual series
:param values: list.
a list of float values.
:return: mag: list.
a list of float values as the spectral residual values
"""
trans = np.fft.fft(values)
mag = np.sqrt(trans.real**2 + trans.imag**2)
eps_index = np.where(mag <= self.EPS)[0]
mag[eps_index] = self.EPS
mag_log = np.log(mag)
mag_log[eps_index] = 0
spectral = np.exp(mag_log - average_filter(mag_log, n=self.mag_window_size))
trans.real = trans.real * spectral / mag
trans.imag = trans.imag * spectral / mag
trans.real[eps_index] = 0
trans.imag[eps_index] = 0
wave_r = np.fft.ifft(trans)
mag = np.sqrt(wave_r.real**2 + wave_r.imag**2)
return mag
@staticmethod
def predict_next(values):
"""
Predicts the next value by sum up the slope of the last value with previous values.
Mathematically, g = 1/m * sum_{i=1}^{m} g(x_n, x_{n-i}), x_{n+1} = x_{n-m+1} + g * m,
where g(x_i,x_j) = (x_i - x_j) / (i - j)
:param values: list.
a list of float numbers.
:return : float.
the predicted next value.
"""
if len(values) <= 1:
raise ValueError(f'data should contain at least 2 numbers')
v_last = values[-1]
n = len(values)
slopes = [(v_last - v) / (n - 1 - i) for i, v in enumerate(values[:-1])]
return values[1] + sum(slopes)
@staticmethod
def extend_series(values, extend_num=5, look_ahead=5):
"""
extend the array data by the predicted next value
:param values: list.
a list of float numbers.
:param extend_num: int, default 5.
number of values added to the back of data.
:param look_ahead: int, default 5.
number of previous values used in prediction.
:return: list.
The result array.
"""
if look_ahead < 1:
raise ValueError('look_ahead must be at least 1')
extension = [SpectralResidual.predict_next(values[-look_ahead - 2:-1])
] * extend_num
return np.concatenate((values, extension), axis=0)
def average_filter(values, n=3):
"""
Calculate the sliding window average for the give time series.
Mathematically, res[i] = sum_{j=i-t+1}^{i} values[j] / t, where t = min(n, i+1)
:param values: list.
a list of float numbers
:param n: int, default 3.
window size.
:return res: list.
a list of value after the average_filter process.
"""
if n >= len(values):
n = len(values)
res = np.cumsum(values, dtype=float)
res[n:] = res[n:] - res[:-n]
res[n:] = res[n:] / n
for i in range(1, n):
res[i] /= (i + 1)
return res