本文对应脚本及数据已上传至我的
Github
仓库https://github.com/CNFeffery/DataScienceStudyNotes
在数据分析任务中,从原始数据读入,到最后分析结果出炉,中间绝大部分时间都是在对数据进行一步又一步的加工规整,以流水线(pipeline)的方式完成此过程更有利于梳理分析脉络,也更有利于查错改正。pdpipe
作为专门针对pandas
进行流水线化改造的模块,为熟悉pandas
的数据分析人员书写优雅易读的代码提供一种简洁的思路,本文就将针对pdpipe
的用法进行介绍。
pdpipe
的出现极大地对数据分析过程进行规范,其主要拥有以下特性:
Python
编写,便于二次开发 通过pip install pdpipe
安装完成,接下来我们将在jupyter lab
中以TMDB 5000 Movie Dataset中的tmdb_5000_movies.csv
数据集(图1)为例来介绍pdpipe
的主要功能,这是Kaggle上的公开数据集,记录了一些电影的相关属性信息,你也可以在数据科学学习手札系列文章的Github
仓库对应本篇文章的路径下直接获取该数据集。
图1 TMDB 5000 Movie Dataset数据集
首先在jupyter lab
中读入tmdb_5000_movies.csv
数据集并查看其前3行(图2):
import pandas as pd
import pdpipe
# 读入tmdb_5000_movies.csv数据集并查看前3行
data = pd.read_csv('tmdb_5000_movies.csv');data.head(3)
图2
可以看出,数据集包含了数值、日期、文本以及json等多种类型的数据,现在假设我们需要基于此数据完成以下流程:
1、删除original_title列 2、对title列进行小写化处理 3、丢掉vote_average小于等于7,且original_language不为
en
的行 4、求得genres对应电影类型的数量保存为新列genres_num,并删除原有的genres列 5、丢掉genres_num小于等于5的行
上述操作直接使用pandas
并不会花多少时间,但是想要不创造任何中间临时结果一步到位产生所需的数据框子集,并且保持代码的可读性不是一件太容易的事,但是利用pdpipe
,我们可以非常优雅地实现上述过程:
# 以pdp.PdPipeline传入流程列表的方式创建pipeline
first_pipeline = pdp.PdPipeline([pdp.ColDrop("original_title"),
pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),
pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),
pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),
pdp.RowDrop({'genres_num': lambda x: x <= 5})])
# 将创建的pipeline直接作用于data直接得到所需结果,并打印流程信息
first_pipeline(data, verbose=True).reset_index(drop=True)
得到的结果如图3所示:
图3
我们不仅保证了代码优雅简洁,可读性强,结果的一步到位,还自动打印出整个流水线运作过程的状态说明!令人兴奋的是pdpipe
充分封装了pandas
的核心功能尤其是apply
相关操作,使得常规或非常规的数据分析任务都可以利用pdpipe
中的API结合自定义函数来优雅地完成,小小领略到pdpipe
的妙处之后,下文我们来展开详细介绍。
pdpipe
中的API按照不同分工被划分到若干子模块,下面将针对常用的几类API展开介绍。
basic_stages
中包含了对数据框中的行、列进行丢弃/保留、重命名以及重编码的若干类:
ColDrop: 这个类用于对指定单个或多个列进行丢弃,其主要参数如下:
下面是举例演示(注意单个流水线部件可以直接传入源数据执行apply
方法直接得到结果),我们分别对单列和多列进行删除操作:
# 删除budget列
pdp.ColDrop(columns='budget').apply(data).head(3)
删除后得到的结果如图4:
图4
# 删除budget之外的所有列
del_col = data.columns.tolist()
del_col.remove('budget')
pdp.ColDrop(columns=del_col).apply(data).head(3)
得到的结果中只有budget
列被保留,如图5:
图5
ColRename: 这个类用于对指定列名进行重命名,其主要参数如下:
下面是举例演示:
# 将budget重命名为Budget
pdp.ColRename(rename_map={'budget': 'Budget'}).apply(data).head(3)
结果如图6:
图6
ColReorder: 这个类用于修改列的顺序,其主要参数如下:
下面是举例演示:
# 将budget从第0列挪动为第3列
pdp.ColReorder(positions={'budget': 3}).apply(data).head(3)
结果如图7:
图7
DropNa: 这个类用于丢弃数据中空值元素,其主要参数与
pandas
中的dropna()
保持一致,核心参数如下:
下面是举例演示,首先我们创造一个包含缺失值的数据框:
import numpy as np
# 创造含有缺失值的示例数据
df = pd.DataFrame({'a': [1, 4, 1, 5],
'b': [4, None, np.nan, 7]})
df
图8
# 删除含有缺失值的行
pdp.DropNa(axis=0).apply(df)
结果如图9:
图9
# 删除含有缺失值的列
pdp.DropNa(axis=1).apply(df)
结果如图10:
图10
FreqDrop: 这个类用于删除在指定的一列数据中出现频次小于所给阈值对应的全部行,主要参数如下:
threshold
参数具体作用的列 下面是举例演示,首先我们来查看电影数据集中original_language
列对应的频次分布情况:
# 查看original_language频次分布
pd.value_counts(data['original_language'])
图11
下面我们来过滤删除original_language
列出现频次小于10的行:
# 过滤original_language频次低于10的行,再次查看过滤后的数据original_language频次分布
pd.value_counts(pdp.FreqDrop(threshold=10, column='original_language').apply(data)['original_language'])
图12
RowDrop: 这个类用于删除满足指定限制条件的行,主要参数如下:
下面是举例演示,我们以budget小于100000000,genres不包含Action,release_date缺失以及vote_count小于1000作为组合删除条件,分别查看在三种不同删除策略下的最终得以保留的数据行数:
pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,
'genres': lambda x: 'Action' not in x,
'release_date': lambda x: x == np.nan,
'vote_count': lambda x: x <= 1000},
reduce='any').apply(data).shape[0]
pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,
'genres': lambda x: 'Action' not in x,
'release_date': lambda x: x == np.nan,
'vote_count': lambda x: x <= 1000},
reduce='all').apply(data).shape[0]
pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,
'genres': lambda x: 'Action' not in x,
'release_date': lambda x: x == np.nan,
'vote_count': lambda x: x <= 1000},
reduce='xor').apply(data).shape[0]
对应的结果如下:
图13
col_generation
中包含了从原数据中产生新列的若干功能:
AggByCols: 这个类用于将指定的函数作用到指定的列上以产生新结果(可以是新的列也可以是一个聚合值),即这时函数真正传入的最小计算对象是列,主要参数如下:
drop
参数设置为False时,结果列的列名变为其对应列
+suffix
参数指定的后缀名;当drop
设置为False时,此参数将不起作用(因为新列直接继承了对应旧列的名称)columns
参数一一对应的结果列名称,当你想要自定义结果新列名称时这个参数就变得非常有用,默认为None下面我们来举例演示帮助理解上述各个参数:
pdp.AggByCols(columns='budget',
func=np.log).apply(data).head(3)
对应的结果如图14,可以看到在只传入columns
和func
这两个参数,其他参数均为默认值时,对budget
列做对数化处理后的新列直接覆盖了原有的budget
列:
图14
设置drop
参数为False,并将suffix
参数设置为'_log':
# 设置drop参数为False,并将suffix参数设置为'_log'
pdp.AggByCols(columns='budget',
func=np.log,
drop=False,
suffix='_log').apply(data).head(3)
图15
可以看到这时原有列得以保留,新的列以旧列名+后缀名的方式被添加到旧列之后,下面我们修改result_columns
参数以自定义结果列名:
# 设置drop参数为False,并将suffix参数设置为'_log'
pdp.AggByCols(columns='budget',
func=np.log,
result_columns='budget(log)').apply(data).head(3)
图16
pdp.AggByCols(columns=['budget', 'revenue'],
func=np.log,
drop=False,
suffix='_log').apply(data).head(3)
图17
pdp.AggByCols(columns='budget',
func=np.mean, # 这里传入的函数是聚合类型的
drop=False,
suffix='_mean').apply(data).loc[:, ['budget', 'budget_mean']]
这时为了保持整个数据框形状的完整,计算得到的聚合值填充到新列的每一个位置上:
图18
ApplyByCols: 这个类用于实现
pandas
中对列的apply
操作,不同于AggByCols
中函数直接处理的是列,ApplyByCols
中函数直接处理的是对应列中的每个元素。主要参数如下:
drop
参数设置为False时,结果列的列名变为其对应列
+suffix
参数指定的后缀名;当drop
设置为False时,此参数将不起作用(因为新列直接继承了对应旧列的名称)columns
参数一一对应的结果列名称,当你想要自定义结果新列名称时这个参数就变得非常有用,默认为None下面我们来举例演示帮助理解上述各个参数:
spoken_languages
涉及语言数量
下面的示例对每部电影中涉及的语言语种数量进行计算:pdp.ApplyByCols(columns='spoken_languages',
func=lambda x: [item['name'] for item in eval(x)].__len__(),
drop=False,
result_columns='spoken_languages_num').apply(data)[['spoken_languages', 'spoken_languages_num']]
对应的结果:
图19
ApplyToRows: 这个类用于实现
pandas
中对行的apply
操作,传入的计算函数直接处理每一行,主要参数如下:
ApplyToRows
作用的对象是一整行,因此只能形成一列返回值),默认为'new_col'下面我们来举例演示帮助理解上述各个参数:
pdp.ApplyToRows(func=lambda row: f"{row['original_title']}: {round(((row['revenue'] / row['budget'] -1)*100), 2)}%" if row['budget'] != 0
else f"{row['original_title']}: 因成本为0故不进行计算",
colname='movie_desc',
follow_column='budget',
func_desc='输出对应电影的盈利百分比').apply(data).head(3)
对应的结果:
图20
Bin: 这个类用于对连续型数据进行分箱,主要参数如下:
下面我们以计算电影盈利率小于0,大于0小于100%以及大于100%作为三个分箱区间,首先我们用到上文介绍过的RowDrop
丢掉那些成本或利润为0的行,再用ApplyToRows
来计算盈利率,最终使用Bin
进行分箱:
pipeline = pdp.PdPipeline([pdp.RowDrop(conditions={'budget': lambda x: x == 0,
'revenue': lambda x: x == 0},
reduce='any'),
pdp.ApplyToRows(func=lambda row: row['revenue'] / row['budget'] - 1,
colname='rate of return',
follow_column='budget'),
pdp.Bin(bin_map={'rate of return': [0, 1]}, drop=False)])
pipeline(data).head(3)
对应的结果:
图21
OneHotEncode: 这个类用于为类别型变量创建哑变量(即独热处理),效果等价于
pandas
中的get_dummies,主要参数如下:
columns
参数设置为None时,这个参数传入的列名列表中指定的列将不进行哑变量处理,默认为None,即不对任何列进行排除drop_dirst
设置为False时,原始变量有几个类别就对应几个哑变量被创造;当设置为指定类别值时(譬如设置drop_first = '男性'
),这个值对应的类别将不进行哑变量生成下面我们伪造包含哑变量的数据框:
# 伪造的数据框
df = pd.DataFrame({
'a': ['x', 'y', 'z'],
'b': ['i', 'j', 'q']
})
df
图22
默认参数下执行OneHotEncode
:
pdp.OneHotEncode().apply(df)
图23
设置drop_first
为False:
pdp.OneHotEncode(drop_first=False).apply(df)
图23
text_stages
中包含了对数据框中文本型变量进行处理的若干类,下文只介绍其中我认为最有用的:
RegexReplace: 这个类用于对文本型列进行基于正则表达式的内容替换,其主要参数如下:
columns
参数一一对应的结果列名称,当你想要自定义结果新列名称时这个参数就变得非常有用,默认为None,即直接替换原始列下面是举例演示:
pdp.RegexReplace(columns='original_language',
pattern='en|cn',
replace='英文/中文').apply(data)['original_language'].unique()
结果如图24:
图24
上文中我们主要演示了单一pipeline部件工作时的细节,接下来我们来了解pdpipe
中组装pipeline的几种方式:
这是我们在2.1中举例说明使用到的创建pipeline的方法,直接传入由按顺序的pipeline组件组成的列表便可生成所需pipeline,而除了直接将其视为函数直接传入原始数据和一些辅助参数(如verbose控制是否打印过程)之外,还可以用类似scikit-learn
中的fit_transform
方法:
# 调用pipeline的fit_transform方法作用于data直接得到所需结果,并打印流程信息
first_pipeline.fit_transform(data, verbose=1)
图25
与PdpPipeline
相似,make_pdpipeline
不可以传入pipeline组件形成的列表,只能把每个pipeline组件当成位置参数按顺序传入:
# 以make_pdpipeline将pipeline组件作为位置参数传入的方式创建pipeline
first_pipeline1 = pdp.make_pdpipeline(pdp.ColDrop("original_title"),
pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),
pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),
pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),
pdp.RowDrop({'genres_num': lambda x: x <= 5}))
# 以pdp.PdPipeline传入流程列表的方式创建pipeline
first_pipeline2 = pdp.PdPipeline([pdp.ColDrop("original_title"),
pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),
pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),
pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),
pdp.RowDrop({'genres_num': lambda x: x <= 5})])
# 比较两种不同方式创建的pipeline产生结果是否相同
first_pipeline1.fit_transform(data) == first_pipeline2(data)
比较结果如图26,两种方式殊途同归:
图26
以上就是本文全部内容,如有笔误望指出!
参考资料: https://pdpipe.github.io/pdpipe/doc/pdpipe/ https://tirthajyoti.github.io/Notebooks/Pandas-pipeline-with-pdpipe