今天,将介绍一个开源数据可靠性工具,它很可能是市场上最简单的解决方案,任何数据团队都可以将其集成到自己的管道中,并在不到一天的时间内利用它!
一、什么是Soda Core?
Soda Core是一个免费的开源命令行工具。它利用用户定义的输入来准备 SQL 查询,对数据源中的数据集运行检查,以查找无效、丢失或意外的数据。当检查失败时,它们会显示您在检查中定义为“不良”的数据。有了这些信息,您的数据工程团队就可以诊断“不良”数据进入数据管道的位置,采取有效措施确定问题的优先级并予以解决。
1、数据源
使用Soda Core扫描多种数据源。
Soda Checks Language (SodaCL) 是一种基于 YAML 的、针对特定领域的数据可靠性语言。与 Soda Core(Soda 的开源命令行工具)结合使用,您可以使用 SodaCL 编写数据质量检查,然后使用 Soda Core 扫描数据源中的数据并执行这些检查。
3、Soda Check
Soda Check是 Soda Core 在扫描数据源中的数据集时执行的测试。当您使用 Soda Core 对源中的数据运行扫描时,可以在扫描命令中引用配置和检查 YAML 文件。
4、Soda Scan
Soda Scan 执行您在检查 YAML 文件中定义的检查,并返回每个检查的结果:通过、失败或错误。(您可以通过设置警报配置来配置检查以发出警告而不是失败。)
在这个项目中,我将使用我本地的Postgres数据库作为主要数据源来连接和扫描。为了让 Soda Core连接到 Postgres 数据库,我必须安装相关的 Python 包。
pip install soda-core-postgres
如果您的本地计算机上没有 Postgres 数据库,您可以从此链接安装它。另外,对于示例数据库,我将使用DVD Rental 数据库。示例数据库有下表;
DVD Rental 数据库中有 15 个表:
安装 Postgres、构建 DVD Rental数据库并安装 Soda Core 后,必须在 Soda Core 和 Postgres 之间建立连接。为此,必须在configuration.yml文件中填写相关信息。
data_source dvdrental:
type: postgres
connection:
host: localhost
port: '5432'
username: postgres
password: ${POSTGRES_PASSWORD}
database: dvd-rental
schema: public
正如之前提到的,Soda Core 是一个命令行工具。它需要 configuration.yml 连接到相关的数据源和 checks.yml 来评估给定的检查。
下面的代码将通过应用相关的 configuration.yml 和 checks.yml 文件来运行 DVD Rental 数据源的扫描。一开始,我将把 checks.yml 文件保留为空以查看输出。
soda scan -d dvdrental -c configuration.yml checks.yml
正如我们在上面看到的,扫描成功运行,没有应用任何预期的检查。这样,我们就可以确保我们的 Soda Core 在给定的配置下正常工作。
在这个例子中,我们将为我们的“actor”表添加一个检查来评估它的空性。为了添加检查,我们需要开始编辑checks.yml文件。下面我们编写第一个检查 SodaCL;
checks for actor:
- row_count > 0
由于我们在文件中添加了 check 语句,因此我们需要向 scan 命令传递一个参数来读取 checks.yml 文件。在我们的终端上,我们执行以下命令来应用检查;
soda scan -d dvdrental -c configuration.yml checks.yml
如上所示,我们的检查已成功通过。但是Soda如何转换用SodaCL编写的check语句呢?如果我们想查看 Soda 生成的 SQL 查询,我们需要在扫描中添加“-V”参数。
soda scan -d dvdrental -c configuration.yml -V checks.yml
如上所示,我们可以看到 Postgres 连接参数和 Soda 生成的 SQL 查询。
在Soda,我们可以在 checks.yml 文件中定义按列检查。这些检查可以包含不同的场景。下面我将创建各种检查来控制按列丢失行、重复行、最大数量和架构检查。
# Checks for basic validations
checks for payment:
# table empytiness check
- row_count > 0
# missing row check on the primary and foreign key columns
- missing_count(payment_id) = 0
- missing_count(customer_id) = 0
- missing_count(staff_id) = 0
- missing_count(rental_id) = 0
# duplicate row check on the primary key column
- duplicate_count(payment_id) = 0
# max amount check
- max(amount) < 12
- schema:
name: Confirm that required columns are present
fail:
when required column missing:
[payment_id, customer_id, staff_id, rental_id]
Soda 可以使用引用检查来验证同一数据源中的数据集之间的列内容是否匹配。另外,我将演示在同一个 check.yml 文件中,我们可以一次检查多个表。
# Checks for basic validations
checks for customer:
# table empytiness check
- row_count > 0
checks for payment:
# all customers in the payment table should exist in the customer table
- values in (customer_id) must exist in customer (customer_id)
在 Soda,我们可以使用交叉检查来比较相同或不同数据源内的数据集之间的行数。当我们想要检查 ETL 管道是否意外删除表中的行时,此功能非常方便。
# Checks for basic validations
checks for payment:
# Check row count between datasets in different data sources
- row_count same as payment_raw
Soda 支持对时间列进行各种新鲜度检查。下面我将检查 payment_date 列的更新时间是否不超过一天。此检查将失败,因为 DVD Rental数据库自 2007 年以来未更新。
# freshness check
checks for payment:
# checking whether the payment_date is not older than 1 day
- freshness(payment_date) < 1d
Soda 允许我们定义适合我们要求的任何逻辑的检查。在这些检查中,我们可以使用 Soda 辅助的功能,甚至可以编写 SQL 查询来定义我们的需求。下面我将创建一个定义日期范围的平均金额,以演示用作支票的平均金额;
# Checks for user-defined checks
checks for payment:
- avg_amount >= 10:
avg_amount query: |
select avg(amount)
from payment
where cast(payment_date as date) between '2007-02-15' and '2007-02-18'
使用Soda,我们可以在配置文件中定义多个表并一起检查它们。为了定义多个表,我们需要使用“for every dataset T”命令,如下所示;
for each dataset T:
datasets:
- payment
- rental
- store
checks:
- row_count > 0
到目前为止,我们使用 CLI 在终端上手动运行检查。在现实生活中,不可能每次都在终端上手动运行检查。在这方面,我们需要使用 Python 以编程方式运行我们的检查。
Soda Python 库支持编程检查,我们不需要一直使用 CLI。下面我创建了一个 Python 脚本来读取配置并检查文件并执行它们。为了得到错误,我将使用 freshness.yml 文件。
from soda.scan import Scan # importing the Soda library
scan = Scan() # loading the function
scan.set_data_source_name("dvdrental") # initialising the datasource name
# Loading the configuration file
scan.add_configuration_yaml_file(
file_path="~/configuration.yml"
)
# Adding scan date variable to label the scan date
scan.add_variables({"date": "2023-04-11"})
# Loading the check yaml file
scan.add_sodacl_yaml_file("freshness_checks.yml")
# Executing the scan
scan.execute()
# Setting logs to verbose mode
scan.set_verbose(True)
# Inspect the scan result
scan.get_scan_results()
Soda Core 的简单性令人惊叹!该工具支持用简单的类似英语的命令来编写数据验证步骤。对专用Python库的支持、与编排工具的集成以及与十多个最常见数据源的数据源连接,使其成为开源数据可靠性的最佳解决方案之一。
原文作者:Seckin Dinc