ETL,即提取(Extract)、转换(Transform)、加载(Load),是一种将数据迁移并准备好以供后续使用(如数据分析或机器学习建模)的流程。对于数据科学家来说,ETL是获取所需数据的关键活动。
为了简化ETL流程,有许多工具可供选择,其中之一就是DuckDB。DuckDB是一款开源的OLAP SQL数据库管理系统,专为高效地处理内存中的数据分析工作负载而设计。无论你处理的数据规模如何,DuckDB都是数据科学家的卓越工具。
对于数据科学家来说,构建ETL流水线至关重要,因此彻底理解整个流程非常必要。本文将带你学习如何利用DuckDB构建一个ETL流水线。
准备工作
首先,我们需要搭建所有必要组件,以模拟真实数据科学项目中的ETL流水线。本文中演示的所有代码也可在GitHub仓库中获取。
我们首先需要用于演示的数据集。在本例中,我们采用来自Kaggle的数据科学家薪资数据。作为数据仓库,我们将使用DuckDB驱动的云数据仓库——Motherduck。注册一个免费账号,然后通过数据科学薪资数据文件创建表,将其存放在my_db数据库中。
完成上述步骤后,你就可以查询数据集,其展示效果如下图所示。
基于DuckDB的数据科学ETL流水线
数据库准备好后,获取用于访问云数据库的访问令牌。
接着,打开你的IDE(如Visual Studio Code),进行流水线环境配置。第一步是创建虚拟环境,代码如下:
python -m venv duckdb_venv
你可以自定义虚拟环境名称。激活虚拟环境后,我们将安装所有必需库。新建一个requirements.txt文件,内容如下:
duckdb
pandas
pyarrow
python-dotenv
准备好后,使用以下命令安装项目所需库:
pip install -r requirements.txt
库安装成功后,我们将通过.env文件配置环境变量。创建该文件,并将从Motherduck获得的MOTHERDUCK_TOKEN写入其中。
至此,准备工作完成,接下来开始配置DuckDB的ETL流水线。
用DuckDB实现ETL流水线
DuckDB的操作方式与SQL非常相似,但连接更加简便。我们将利用DuckDB的内存特性,在Python环境中运行查询处理数据,并将结果重新加载回Motherduck云数据库。
首先,新建一个Python文件用于存放ETL流水线代码。本文以etl_duckdb.py为例,你也可以选择其他名称。
文件初始部分,建立DuckDB与云数据库的连接:
import os
import duckdb
from dotenv import load_dotenv
load_dotenv()
MD_TOKEN = os.getenv("MOTHERDUCK_TOKEN")
con = duckdb.connect(f'md:?motherduck_token={MD_TOKEN}')
随后,我们创建一个名为analytics的schema以存储提取的数据:
con.sql("CREATE SCHEMA IF NOT EXISTS analytics;")
DuckDB中的操作与SQL语句如出一辙,熟悉SQL的用户会觉得搭建流水线非常容易。
接下来,我们将原始数据提取到另一个表,以演示DuckDB可以将数据提取并加载到新表中:
con.sql("""
CREATE OR REPLACE TABLE raw_salaries AS
SELECT
work_year,
experience_level,
employment_type,
job_title,
salary,
salary_currency,
salary_in_usd,
employee_residence,
remote_ratio,
company_location,
company_size
FROM my_db.ds_salaries;
""")
数据准备完毕后,即可进行各种转换,并将结果加载用于后续分析。
例如,我们可以根据工作年限和经验水平计算平均薪资,并将其加载到avg_salary_year_exp表:
con.sql("""
CREATE OR REPLACE TABLE analytics.avg_salary_year_exp AS
SELECT
work_year,
experience_level,
ROUND(AVG(salary_in_usd), 2) AS avg_usd_salary
FROM raw_salaries
GROUP BY work_year, experience_level
ORDER BY work_year, experience_level;
""")
我们可以用如下代码检查已加载的转换结果:
con.sql("SELECT * FROM analytics.avg_salary_year_exp LIMIT 5").show()
输出结果如下:
┌───────────┬──────────────────┬────────────────┐
│ work_year │ experience_level │ avg_usd_salary │
│ int64 │ varchar │ double │
├───────────┼──────────────────┼────────────────┤
│ 2020 │ EN │ 57511.61 │
│ 2020 │ EX │ 139944.33 │
│ 2020 │ MI │ 87564.72 │
│ 2020 │ SE │ 137240.5 │
│ 2021 │ EN │ 54905.25 │
└───────────┴──────────────────┴────────────────┘
利用DuckDB,我们可以高效地完成ETL操作。
DuckDB在操作灵活性方面表现优异,我们还可以结合Pandas进行ETL操作。
例如,将前述的平均薪资数据转换为DataFrame对象,便于进一步处理:
df_avg = con.sql("SELECT * FROM analytics.avg_salary_year_exp").df()
df_avg["avg_salary_k"] = df_avg["avg_usd_salary"] / 1_000
可以用如下代码查看DataFrame转换结果:
print(df_avg.head())
输出如下:
work_year experience_level avg_usd_salary avg_salary_k
0 2020 EN 57511.61 57.51161
1 2020 EX 139944.33 139.94433
2 2020 MI 87564.72 87.56472
3 2020 SE 137240.50 137.24050
4 2021 EN 54905.25 54.90525
接下来,可以将DataFrame注册到DuckDB中,作为一张表:
con.register("pandas_avg_salary", df_avg)
此时,Pandas DataFrame已准备好进行进一步处理。例如,可以基于特定条件转换数据并重新加载到云数据库:
con.sql("""
CREATE OR REPLACE TABLE analytics.avg_salary_year_exp_pandas AS
SELECT
work_year,
experience_level,
avg_salary_k
FROM pandas_avg_salary
WHERE avg_salary_k > 100
ORDER BY avg_salary_k DESC
""")
查看结果:
con.sql("SELECT * FROM analytics.avg_salary_year_exp_pandas LIMIT 5").show()
输出如下:
┌───────────┬──────────────────┬──────────────┐
│ work_year │ experience_level │ avg_salary_k │
│ int64 │ varchar │ double │
├───────────┼──────────────────┼──────────────┤
│ 2023 │ EX │ 203.70568 │
│ 2022 │ EX │ 188.26029 │
│ 2021 │ EX │ 186.128 │
│ 2023 │ SE │ 159.56893 │
│ 2022 │ SE │ 147.65969 │
└───────────┴──────────────────┴──────────────┘
至此,你已完成一个简单的数据科学ETL流水线的开发。你可以根据项目需求,结合自动化与调度(如CRON定时任务)进行拓展。
结论
ETL(提取、转换、加载)是一种用于迁移和准备数据以供进一步使用的流程。对于数据科学家来说,ETL适用于所有依赖数据的工作,例如数据分析或机器学习建模。
本文介绍了如何使用DuckDB为数据科学工作构建ETL流水线,展示了如何从云数据库提取数据,通过SQL查询和Pandas DataFrame进行转换,并将结果加载回云数据库。
希望本文对你有所帮助!
领取专属 10元无门槛券
私享最新 技术干货