我正在尝试找到一种有效的、数值稳定的算法来计算滚动方差(例如,20个周期滚动窗口上的方差)。我知道Welford algorithm可以有效地计算数字流的运行方差(它只需要一次遍历),但不确定这是否适用于滚动窗口。我也想要一个解决方案,以避免约翰·D·库克在this article顶部讨论的准确性问题。任何语言的解决方案都可以。
发布于 2011-07-12 20:33:27
我一直在处理同样的问题。
Mean很容易迭代计算,但您需要在循环缓冲区中保存值的完整历史记录。
next_index = (index + 1) % window_size; // oldest x value is at next_index, wrapping if necessary.
new_mean = mean + (x_new - xs[next_index])/window_size;
我已经修改了Welford的算法,它适用于我测试过的所有值。
varSum = var_sum + (x_new - mean) * (x_new - new_mean) - (xs[next_index] - mean) * (xs[next_index] - new_mean);
xs[next_index] = x_new;
index = next_index;
要获得当前方差,只需将varSum除以窗口大小:variance = varSum / window_size;
发布于 2014-04-25 02:32:40
如果你更喜欢代码而不是文字(很大程度上基于DanS的帖子):http://calcandstuff.blogspot.se/2014/02/rolling-variance-calculation.html
public IEnumerable RollingSampleVariance(IEnumerable data, int sampleSize)
{
double mean = 0;
double accVar = 0;
int n = 0;
var queue = new Queue(sampleSize);
foreach(var observation in data)
{
queue.Enqueue(observation);
if (n < sampleSize)
{
// Calculating first variance
n++;
double delta = observation - mean;
mean += delta / n;
accVar += delta * (observation - mean);
}
else
{
// Adjusting variance
double then = queue.Dequeue();
double prevMean = mean;
mean += (observation - then) / sampleSize;
accVar += (observation - prevMean) * (observation - mean) - (then - prevMean) * (then - mean);
}
if (n == sampleSize)
yield return accVar / (sampleSize - 1);
}
}
发布于 2017-08-30 06:26:04
我知道这个问题有点老生常谈了,但如果其他人对此感兴趣,请遵循python代码。它的灵感来自于johndcook的博客文章,@Joachim的,@DanS的代码和@Jaime的评论。对于较小的数据窗口大小,下面的代码仍然提供了较小的不精确度。好好享受吧。
from __future__ import division
import collections
import math
class RunningStats:
def __init__(self, WIN_SIZE=20):
self.n = 0
self.mean = 0
self.run_var = 0
self.WIN_SIZE = WIN_SIZE
self.windows = collections.deque(maxlen=WIN_SIZE)
def clear(self):
self.n = 0
self.windows.clear()
def push(self, x):
self.windows.append(x)
if self.n <= self.WIN_SIZE:
# Calculating first variance
self.n += 1
delta = x - self.mean
self.mean += delta / self.n
self.run_var += delta * (x - self.mean)
else:
# Adjusting variance
x_removed = self.windows.popleft()
old_m = self.mean
self.mean += (x - x_removed) / self.WIN_SIZE
self.run_var += (x + x_removed - old_m - self.mean) * (x - x_removed)
def get_mean(self):
return self.mean if self.n else 0.0
def get_var(self):
return self.run_var / (self.WIN_SIZE - 1) if self.n > 1 else 0.0
def get_std(self):
return math.sqrt(self.get_var())
def get_all(self):
return list(self.windows)
def __str__(self):
return "Current window values: {}".format(list(self.windows))
https://stackoverflow.com/questions/5147378
复制相似问题