如何构建自己的数据监视器以识别数据管道中的新鲜度和分布异常
在本系列文章中,我们将逐步介绍如何从头开始创建自己的数据可观察性监视器,并将其映射到数据运行状况的五个关键支柱。
从空值和重复的行,到建模错误和架构更改,数据可能由于多种原因而中断。数据测试通常是我们防范不良数据的第一道防线,但是如果数据在其生命周期中中断,会发生什么呢?
我们称这种现象为数据停机时间,它是指数据丢失,错误或不准确的时间段。数据停机提示我们提出以下问题:
为了在数据中断时触发警报并防止数据停机,数据团队可以利用我们在软件工程领域的朋友们的可靠策略:监视和可观察性。
我们将数据可观察性定义为组织回答这些问题并评估其数据生态系统的健康状况的能力。反映数据健康状况的关键变量,数据可观察性的五个支柱是:
用这种概念性的方式谈论数据可观察性是一回事,但是完整的处理应该拉开帷幕-数据可观察性实际上在代码中看起来是什么样?
很难完全回答这个问题,因为细节将取决于数据仓库,数据湖,BI工具,首选语言和框架等的选择。即使这样,使用SQLite和Jupyter之类的轻量级工具解决这些问题还是有用的。
在本文中,我们将通过一个示例数据生态系统逐步介绍如何在SQL中创建我们自己的数据质量监视器,并探讨实际中数据可观察性如何。
让我们来看看。
实践中的数据可观察性
欢迎您使用Jupyter Notebook和SQL自己尝试这些练习。
我们的样本数据生态系统使用有关宜居系外行星的模拟天文数据。出于此练习的目的,我使用Python生成了数据集,对我在生产环境中遇到的真实事件进行了建模。
我使用的是SQLite 3.32.3,它应该使数据库可以从命令提示符或SQL文件进行最少的设置访问。
0. Setup
%matplotlib inline
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
from datetime import datetime, timedelta
from matplotlib import pyplot as plt
import sys
sys.path.append("..")
from data.utils.exercise_1 import all_days, show_reports
import sqlite3
conn = sqlite3.connect("../data/dbs/Ex1.db")
c = conn.cursor()
1. Introduction
c.execute("PRAGMA table_info(EXOPLANETS);")
c.fetchall()
[(0, '_id', 'TEXT', 0, None, 0),
(1, 'distance', 'REAL', 0, None, 0),
(2, 'g', 'REAL', 0, None, 0),
(3, 'orbital_period', 'REAL', 0, None, 0),
(4, 'avg_temp', 'REAL', 0, None, 0),
(5, 'date_added', 'TEXT', 0, None, 0)]c.execute("PRAGMA table_info(EXOPLANETS);")c.fetchall()[(0, '_id', 'TEXT', 0, None, 0),(1, 'distance', 'REAL', 0, None, 0),(2, 'g', 'REAL', 0, None, 0),(3, 'orbital_period', 'REAL', 0, None, 0),(4, 'avg_temp', 'REAL', 0, None, 0),(5, 'date_added', 'TEXT', 0, None, 0)]
数据库条目EXOPLANETS
包含以下信息:
0 . _id
:对应于该行星的UUID。
1 distance
.:距地球的距离,以光年为单位。
2 . g
:表面重力为g的倍数,重力常数。
3 . orbital_period
:单个轨道周期的长度,以天为单位。
4 . avg_temp
:平均表面温度,以开氏度为单位。
5 date_added
.:系统发现行星并将其自动添加到数据库的日期。
需要注意的是一个或多个distance
,g
,orbital_period
,并avg_temp
可能是NULL
一个给定的星球为丢失或错误数据的结果。
pd.read_sql_query("SELECT * FROM EXOPLANETS LIMIT 10", conn)
_id | distance | g | orbital_period | avg_temp | date_added | |
---|---|---|---|---|---|---|
c168b188-ef0c-4d6a-8cb2-f473d4154bdb | 34.6273036348341 | 476.480044083599 | 2020-01-01 | |||
e7b56e84-41f4-4e62-b078-01b076cea369 | 110.196919810563 | 2.52507362359066 | 839.8378167897 | 2020-01-01 | ||
a27030a0-e4b4-4bd7-8d24-5435ed86b395 | 26.6957950454452 | 10.2764970016067 | 301.018816321399 | 2020-01-01 | ||
54f9cf85-eae9-4f29-b665-855357a14375 | 54.8883521129783 | 173.788967912197 | 328.644125249613 | 2020-01-01 | ||
4d06ec88-f5c8-4d03-91ef-7493a12cd89e | 153.264217159834 | 0.922874568459221 | 200.712661803056 | 2020-01-01 |
请注意,此练习是追溯性的-我们正在查看历史数据。在生产数据环境中,数据可观察性是实时的,并应用于数据生命周期的每个阶段,因此与此处所做的实现所涉及的实现略有不同。
出于本练习的目的,我们将构建数据可观察性算法以实现新鲜度和分发,但是在以后的文章中,我们将介绍其余五个支柱,甚至更多。
一、新鲜度
我们监控的数据可观察性的第一支柱是新鲜度,它可以为我们提供关键数据资产上次更新时间的有力指标。如果按小时定期更新的报告突然看起来很陈旧,则这种类型的异常现象应为我们提供一个强有力的迹象,表明存在问题。
首先,请注意该DATE_ADDED
列。当添加单个记录时,SQL不会存储元数据。因此,为了可视化这种追溯设置中的新鲜度,我们需要自己跟踪这些信息。
通过按DATE_ADDED
列分组可以使我们深入了解EXOPLANETS
每天的更新方式。例如,我们可以查询每天添加的新ID的数量:
SQL = """SELECT DATE_ADDED, COUNT(*) AS ROWS_ADDEDFROM EXOPLANETSGROUP BY DATE_ADDED"""
rows_added = pd.read_sql_query(SQL, conn)rows_added = rows_added \ .rename(columns={clmn: clmn.lower() for clmn in rows_added.columns})rows_added = rows_added.set_index("date_added")rows_added = rows_added.reindex(all_days)plt.figure(figsize=(20, 10))plt.bar(all_days, height=rows_added["rows_added"])plt.show()
date_added | ROWS_ADDED | |
---|---|---|
0 | 2020-01-01 | 84 |
1 | 2020-01-02 | 92 |
2 | 2020-01-03 | 101 |
3 | 2020-01-04 | 102 |
4 | 2020-01-05 | 100 |
... | ... | ... |
170 | 2020-07-14 | 104 |
171 | 2020-07-15 | 110 |
172 | 2020-07-16 | 103 |
173 | 2020-07-17 | 89 |
174 | 2020-07-18 | 104 |
175 rows × 2 columns
根据我们的数据集的图形表示,看起来好像EXOPLANETS
每天持续更新约100个新条目,尽管有几天没有数据输入的空白。
回想一下,我们很想问一个问题:“我的数据是否是最新的?” 因此,了解表更新中的这些差距对于了解我们数据的可靠性至关重要。
此查询通过引入的指标来实现新鲜度的操作DAYS_SINCE_LAST_UPDATE
。(注意:由于本教程使用SQLite3,因此在MySQL和其他环境中,用于计算时间差的SQL语法将有所不同)
WITH UPDATES AS(
SELECT DATE_ADDED,
COUNT(*) AS ROWS_ADDED
FROM EXOPLANETS
GROUP
BY DATE_ADDED)SELECT DATE_ADDED, JULIANDAY(DATE_ADDED) - JULIANDAY(LAG(DATE_ADDED) OVER(
ORDER
BY DATE_ADDED )) AS DAYS_SINCE_LAST_UPDATEFROM UPDATES;
date_added days_since_last_update
0 2020-01-01 NaN
1 2020-01-02 1.0
2 2020-01-03 1.0
3 2020-01-04 1.0
4 2020-01-05 1.0
.. ... ...
170 2020-07-14 1.0
171 2020-07-15 1.0
172 2020-07-16 1.0
173 2020-07-17 1.0
174 2020-07-18 1.00
结果表显示“日期X,最近的数据EXOPLANETS
是Y天。” 这是DATE_ADDED
表中列
中未明确提供的信息-但是应用数据可观察性为我们提供了发现这些信息的工具。
现在,我们拥有检测新鲜度异常所需的数据。剩下要做的就是为Y设置阈值 参数-多少天了?参数将查询转换为检测器,因为它决定了什么算作异常,什么则不算。
WITH UPDATES AS(
SELECT DATE_ADDED,
COUNT(*) AS ROWS_ADDED
FROM EXOPLANETS
GROUP
BY DATE_ADDED),NUM_DAYS_UPDATES AS (
SELECT DATE_ADDED, JULIANDAY(DATE_ADDED) - JULIANDAY(LAG(DATE_ADDED)
OVER(
ORDER
BY DATE_ADDED ) ) AS DAYS_SINCE_LAST_UPDATE
FROM UPDATES)SELECT *FROM NUM_DAYS_UPDATESWHERE DAYS_SINCE_LAST_UPDATE > 1;
返回给我们的数据代表发生新鲜事件的日期。
2020–05–14,表中的最新数据为8天!这样的中断可能表示我们的数据管道中断,并且很高兴知道我们是否将这些数据用于任何有价值的事情(并且如果我们在生产环境中使用它,很可能就是这样)。
特别注意查询的最后一行:DAYS_SINCE_LAST_UPDATE > 1;
。
在这里,1是一个模型参数-这个数字没有什么“正确的”,尽管更改它会影响我们认为是事件的日期。数字越小,我们将发现的异常现象就越多(高召回率),但是机会是,其中一些“异常现象”不会反映实际的停机情况。数字越大,我们捕获的所有异常反映真实异常(高精度)的可能性就越大,但是我们可能会漏掉一些异常。
就本示例而言,我们可以将1更改为7,从而仅在2020-02-08和2020-05-14遇到两次最严重的停机。这里的任何选择都将反映特定的用例和目标,并且是在将规模的数据可观察性应用于生产环境时一次又一次出现的重要平衡。
下面,我们利用相同的新鲜度检测器,但将其DAYS_SINCE_LAST_UPDATE > 3;
用作阈值。现在,两个较小的中断都未被发现。
请注意两次未被发现的中断-这些间隔必须少于3天。
现在,我们可视化相同的新鲜度检测器,但DAYS_SINCE_LAST_UPDATE > 7;
现在将其用作阈值。现在,除了两个最大的中断之外,其他所有中断都未被发现
就像行星一样,最佳模型参数位于被认为过低和过高的值之间的“戈尔德洛克区”或“最佳位置”。这些数据可观察性概念(以及更多!)将在以后的文章中进行讨论。
二、分布
接下来,我们要评估数据的现场级别,分布状况。分布告诉我们数据的所有期望值,以及每个值出现的频率。最简单的问题之一是“我的数据NULL
多久一次”?在许多情况下,某种程度的不完整数据是可以接受的-但是,如果10%的无效率变成90%,我们将要知道。
SELECT
DATE_ADDED,
CAST(
SUM(
CASE
WHEN DISTANCE IS NULL THEN 1
ELSE 0
END
) AS FLOAT) / COUNT(*) AS DISTANCE_NULL_RATE,
CAST(
SUM(
CASE
WHEN G IS NULL THEN 1
ELSE 0
END
) AS FLOAT) / COUNT(*) AS G_NULL_RATE,
CAST(
SUM(
CASE
WHEN ORBITAL_PERIOD IS NULL THEN 1
ELSE 0
END
) AS FLOAT) / COUNT(*) AS ORBITAL_PERIOD_NULL_RATE,
CAST(
SUM(
CASE
WHEN AVG_TEMP IS NULL THEN 1
ELSE 0
END
) AS FLOAT) / COUNT(*) AS AVG_TEMP_NULL_RATE
FROM
EXOPLANETS
GROUP BY
DATE_ADDED;
date_added distance_null_rate g_null_rate orbital_period_null_rate avg_temp_null_rate
0 2020-01-01 0.083333 0.178571 0.214286 0.380952
1 2020-01-02 0.000000 0.152174 0.326087 0.402174
2 2020-01-03 0.059406 0.188119 0.237624 0.336634
3 2020-01-04 0.049020 0.117647 0.264706 0.490196
4 2020-01-05 0.040000 0.180000 0.280000 0.300000
.. ... ... ... ...
170 2020-07-14 0.067308 0.125000 0.269231 0.394231
171 2020-07-15 0.063636 0.118182 0.245455 0.490909
172 2020-07-16 0.058252 0.145631 0.262136 0.466019
173 2020-07-17 0.101124 0.089888 0.247191 0.393258
174 2020-07-18 0.067308 0.201923 0.317308 0.3365380
该查询返回大量数据!这是怎么回事?
常规公式CAST(SUM(CASE WHEN SOME_METRIC IS NULL THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*)
(按DATE_ADDED
列分组)告诉我们的每日新数据批处理中的NULL
值的费率。通过查看原始输出很难感觉到,但是视觉可以帮助阐明这种异常:SOME_METRIC``EXOPLANETS
视觉效果清楚地表明,我们应该检测到零速率的“尖峰”事件。现在,我们仅关注最后一个指标AVG_TEMP
。我们可以用一个简单的阈值最基本地检测出空尖峰:
WITH NULL_RATES AS(
SELECT
DATE_ADDED,
CAST(
SUM(
CASE
WHEN AVG_TEMP IS NULL THEN 1
ELSE 0
END
) AS FLOAT) / COUNT(*) AS AVG_TEMP_NULL_RATE
FROM
EXOPLANETS
GROUP BY
DATE_ADDED
)
SELECT
*
FROM
NULL_RATES
WHERE
AVG_TEMP_NULL_RATE > 0.9;
date_added avg_temp_null_rate
0 2020-03-09 0.967391
1 2020-06-02 0.929412
2 2020-06-03 0.977011
3 2020-06-04 0.989691
4 2020-06-07 0.987805
5 2020-06-08 0.961905
随着检测算法的发展,这种方法是一种钝器。有时,我们的数据中的模式对于这样的阈值就足够简单了。但是,在其他情况下,数据将是嘈杂的或具有其他复杂性(例如季节性),这需要我们更改方法。
例如,检测2020-06-02、2020-06-03和2020-0604似乎是多余的。我们可以过滤掉其他警报之后立即发生的日期:
WITH NULL_RATES AS(
SELECT
DATE_ADDED,
CAST(
SUM(
CASE
WHEN AVG_TEMP IS NULL THEN 1
ELSE 0
END
) AS FLOAT
) / COUNT(*) AS AVG_TEMP_NULL_RATE
FROM
EXOPLANETS
GROUP BY
DATE_ADDED
),
ALL_DATES AS (
SELECT
*,
JULIANDAY(DATE_ADDED) - JULIANDAY(LAG(DATE_ADDED)
OVER(
ORDER BY DATE_ADDED
)
) AS DAYS_SINCE_LAST_ALERT
FROM
NULL_RATES
WHERE
AVG_TEMP_NULL_RATE > 0.9
)
SELECT
DATE_ADDED,
AVG_TEMP_NULL_RATE
FROM
ALL_DATES
WHERE
DAYS_SINCE_LAST_ALERT IS NULL OR DAYS_SINCE_LAST_ALERT > 1;
date_added avg_temp_null_rate
0 2020-03-09 0.967391
1 2020-06-02 0.929412
2 2020-06-07 0.987805
请注意,在这两个查询中,键参数均为0.9
。我们实际上是在说:“任何高于90%的空缺率都是一个问题,我需要知道这一点。”
在这种情况下,我们可以(并且应该)通过将滚动平均的概念应用于更智能的参数来变得更加智能:
WITH NULL_RATES AS(
SELECT
DATE_ADDED,
CAST(SUM(CASE WHEN AVG_TEMP IS NULL THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*) AS AVG_TEMP_NULL_RATE
FROM
EXOPLANETS
GROUP BY
DATE_ADDED
),
NULL_WITH_AVG AS(
SELECT
*,
AVG(AVG_TEMP_NULL_RATE) OVER (
ORDER BY DATE_ADDED ASC
ROWS BETWEEN 14 PRECEDING AND CURRENT ROW) AS TWO_WEEK_ROLLING_AVG
FROM
NULL_RATES
GROUP BY
DATE_ADDED
)
SELECT
*
FROM
NULL_WITH_AVG
WHERE
AVG_TEMP_NULL_RATE - TWO_WEEK_ROLLING_AVG > 0.3;
date_added avg_temp_null_rate two_week_rolling_avg
0 2020-03-09 0.967391 0.436078
1 2020-06-02 0.929412 0.441300
2 2020-06-03 0.977011 0.479132
3 2020-06-04 0.989691 0.515566
4 2020-06-07 0.987805 0.554753
5 2020-06-08 0.961905 0.594967
一项澄清:请注意,在第28行,我们使用数量进行过滤AVG_TEMP_NULL_RATE — TWO_WEEK_ROLLING_AVG
。在其他情况下,我们可能想要取ABS()
这个误差量的,但不是在这里-原因是,如果NULL
“峰值”率比以前的平均值有所提高,则“峰值”会更加令人震惊。每当NULL
频率突然下降时进行监视可能都不值得,而检测NULL
速率上升的值很明显。
当然,有越来越多的用于异常检测的度量标准(例如Z分数和自回归建模)不在本文范围之内。本教程仅提供用于SQL中的现场健康监控的基本框架。我希望它能为您提供有关您自己的数据的想法!
三、下一步
这篇简短的教程旨在表明“数据可观察性”并不像名称中所暗示的那样神秘,并且通过整体方法来理解数据的健康状况,可以确保管道的每个阶段都具有高度的数据信任度和可靠性。
实际上,只要保留一些关键信息(例如记录时间戳和历史表元数据),就可以使用简单的SQL“检测器”来实现数据可观察性的核心原理。还值得注意的是,对于随您的生产环境而增长的端到端数据可观察性系统,必须使用关键的ML支持的参数调整。
请继续关注本系列的后续文章,重点关注监视分布和架构中的异常,沿袭和元数据在数据可观察性中的作用,以及如何一起大规模监视这些支柱以获取更可靠的数据。
文丨Soundhearer
图丨来源于网络