以下文章来源于气象杂货铺 ,作者bugsuse
利用Python进行数据处理时经常使用的是pandas和numpy,这两个工具的功能都很强大,尤其是pandas,更是Python中数据处理方面最强大的工具之一。
但是如果不能有效利用pandas和numpy中的各种函数和方法,反而会降低数据处理的效率。
以下就以PyGotham 2019的一个演讲介绍如何大幅提升数据处理的速度。notebook和数据见文末链接,代码较多,建议下载notebook和数据测试。
import pandas as pd
import numpy as np
import re
import time
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
pd.set_option('max_columns', 15)
pd.set_option('chained_assignment', None)
此CSV示例数据是用于练习如何向量化.apply
函数时使用。数据已经进行了清洗和预处理。
df = pd.read_csv('sample_data_pygotham2019.csv',
parse_dates=['Date Created', 'Original Record: Date Created'])
注:parse_dates
参数可将给定的列进行解析为日期格式。
df.shape
df.head(5)
df.dtypes
Internal ID int64
ID int64
Category object
Lead Source object
Lead Source Category object
Current Status object
Status at Time of Lead object
Original Record: Date Created datetime64[ns]
Date Created datetime64[ns]
Inactive object
Specialty object
Providers float64
duplicate_leads bool
bad_leads bool
dtype: object
常规条件处理都是使用if...else...
语句,将函数应用到.apply
方法。
def set_lead_status(row):
if row['Current Status'] == '- None -':
return row['Status at Time of Lead']
else:
return row['Current Status']
%%timeit
test = df.apply(set_lead_status, axis=1)
8.15 s ± 722 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
但是这种方法的执行速度非常慢,如果涉及数据量更大,那么无疑非常消耗时间。
np.where
给定一个条件表达式,当条件表达式为真或假时返回对应的值。
%%timeit
# Pandas Series Vectorized baby!!
# you can pass the output directly into a pandas Series
df['normalized_status'] = np.where(
df['Status at Time of Lead'] == '- None -', # <-- condition
df['Current Status'], # <-- return if true
df['Status at Time of Lead'] # <-- return if false
)
20.8 ms ± 784 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.where` is {round((8.15 * 1000) / 20.8, 1)}x faster than `.apply`")
`np.where` is 391.8x faster than `.apply`
直接使用numpy
数组比pandas.Series
的速度要快。在上述情况下,只需要处理numpy
数组而无需处理pandas.Series
的所有信息,因此要更快一些。
%%timeit
# NumPy Vectorized baby!!
df['normalized_status'] = np.where(
df['Status at Time of Lead'].values == '- None -',
df['Current Status'].values,
df['Status at Time of Lead'].values
)
9.59 ms ± 310 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
print(f"`np.where` w/ numpy vectorization is {round((8.15 * 1000) / 9.59, 1)}x faster than `.apply`")
`np.where` w/ numpy vectorization is 849.8x faster than `.apply`
当使用 raw=True
选项时,会显著改善.apply
的速度。此选项可将numpy数组传递给自定义函数,从而代替pd.Series
对象。
但按照上述方法执行之后,耗时依然较长:
%%timeit test = df.apply(works_but_slow, axis=1, raw=True)
8.55 s ± 160 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
np.vectorize
可以将python函数转换为numpy ufunc
,可以处理向量化方法。可以向量化函数,而不需要应用到数据。
# Here is our previous function that I tried to vectorize but couldn't due to the ValueError.
def works_fast_maybe(col1, col2):
if col1 == '- None -':
return col2
else:
return col1
# with the np.vectorize method --> returns a vectorized callable
vectfunc = np.vectorize(works_fast_maybe) #otypes=[np.float],cache=False)
%%timeit test3 = list(vectfunc(df['Status at Time of Lead'], df['Current Status']))
96 ms ± 4.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
有人认为使用索引设置速度更快,但其实这不是向量化方式。
def can_I_go_any_faster(status_at_time, current_status):
# this works fine if you're setting static values
df['test'] = 'test'# status_at_time
df.loc[status_at_time == '- None -', 'test'] = current_status # <-- ValueError, indexes don't match!
df.loc[status_at_time != '- None -', 'test'] = status_at_time
%%timeit test4 = can_I_go_any_faster(df['Status at Time of Lead'], df['Current Status'])
40.5 ms ± 443 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
以下方法和上面的方法相差不大。
def can_I_go_any_faster2(status_at_time, current_status):
# this works fine if you're setting static values
df['test'] = 'test'# status_at_time
df.loc[status_at_time == '- None -', 'test'] = 'statys1_isNone'
df.loc[status_at_time != '- None -', 'test'] = 'statys2_notNone'
%%timeit test5 = can_I_go_any_faster2(df['Status at Time of Lead'], df['Current Status'])
37.8 ms ± 568 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
主要类别
list1 = ['LEAD-3 Flame No Contact', 'LEAD-Campaign', 'LEAD-Claim', 'LEAD-Contact Program',
'LEAD-General Pool', 'LEAD-No Contact', 'LEAD-Nurture', 'LEAD-Unqualified', 'PROSPECT-LOST']
list2 = ['- None -', 'CLIENT-Closed-Sold', 'CLIENT-Handoff', 'CLIENT-Implementation', 'CLIENT-Implementation (EMR)',
'CLIENT-Live', 'CLIENT-Partner', 'CLIENT-Referring Consultant', 'CLIENT-Transferred', 'LEAD-Engaged',
'LEAD-Long-Term Opportunity', 'PROSPECT-CURRENT', 'PROSPECT-LONG TERM', 'PROSPECT-NO DECISION']
# apply version
def lead_category(row):
if row['Original Record: Date Created'] == row['Date Created']:
return 'New Lead'
elif row['normalized_status'].startswith('CLI'):
return 'Client Lead'
elif row['normalized_status'] in list1:
return 'MTouch Lead'
elif row['normalized_status'] in list2:
return 'EMTouch Lead'
return 'NA'
%%timeit df['lead_category0'] = df.apply(lead_category, axis=1)
6.91 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
当然也可以使用np.where
实现上述功能,但是实现代码很难读。
%%timeit
df['lead_category'] = \
np.where(df['Original Record: Date Created'].values == df['Date Created'].values, 'New Lead',
np.where(df['normalized_status'].str.startswith('CLI').values, 'Client Lead',
np.where(df['normalized_status'].isin(list1).values, 'MTouch Lead',
np.where(df['normalized_status'].isin(list2).values, 'EMTouch Lead',
'NA')
)
)
)
96.7 ms ± 2.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
对多个条件选择或嵌套条件而言,np.select
的实现方法更简单甚至速度更快。
%%timeit
conditions = [
df['Original Record: Date Created'].values == df['Date Created'].values,
df['normalized_status'].str.startswith('CLI').values,
df['normalized_status'].isin(list1).values,
df['normalized_status'].isin(list2).values
]
choices = [
'New Lead',
'Client Lead',
'MTouch Lead',
'EMTouch Lead'
]
df['lead_category1'] = np.select(conditions, choices, default='NA') # Order of operations matter!
82.4 ms ± 865 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
np.select
和 np.where
的输出结果是相同的。
# Their output logic is the same
(df.lead_category == df.lead_category1).all()
True
print(f"`np.select` w/ numpy vectorization is {round((6.9 * 1000) / 82.5, 2)} faster than nested .apply()")
`np.select` w/ numpy vectorization is 83.64 faster than nested .apply()
print(f"`np.select` is {round(96.7 / 83.64, 2)} faster than nested `np.where`")
`np.select` is 1.16 faster than nested `np.where`
# This is something you might think you can't vectorize, but you sure can!
def sub_conditional(row):
if row['Inactive'] == 'No':
if row['Providers'] == 0:
return 'active_no_providers'
elif row['Providers'] < 5:
return 'active_small'
else:
return 'active_normal'
elif row['duplicate_leads']:
return 'is_dup'
else:
if row['bad_leads']:
return 'active_bad'
else:
return 'active_good'
%%timeit
# Let's time how long it takes to apply a nested multiple condition func
df['lead_type'] = df.apply(sub_conditional, axis=1)
5.72 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
以下是利用np.select
实现的多条件处理方法,速度有明显的提升。
%%timeit
# With np.select, could do .values here for additional speed, but left out to avoid too much text
conditions = [
((df['Inactive'] == 'No') & (df['Providers'] == 0)),
((df['Inactive'] == 'No') & (df['Providers'] < 5)),
df['Inactive'] == 'No',
df['duplicate_leads'], # <-- you can also just evaluate boolean arrays
df['bad_leads'],
]
choices = [
'active_no_providers',
'active_small',
'active_normal',
'is_dup',
'active_bad',
]
df['lead_type_vec'] = np.select(conditions, choices, default='NA')
61.1 ms ± 1.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.select` is {round((5.82 * 1000) / 60.4, 2)} faster than nested .apply()")
`np.select` is 96.36 faster than nested .apply()
使用np.select
时,直接获取原始数据可以进一步加速:
%%timeit
# With np.select
conditions = [
((df['Inactive'].values == 'No') & (df['Providers'].values == 0)),
((df['Inactive'].values == 'No') & (df['Providers'].values < 5)),
df['Inactive'].values == 'No',
df['duplicate_leads'].values, # <-- you can also just evaluate boolean arrays
df['bad_leads'].values,
]
choices = [
'active_no_providers',
'active_small',
'active_normal',
'is_dup',
'active_bad',
]
df['lead_type_vec'] = np.select(conditions, choices, default='NA')
35.8 ms ± 257 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.select` w/ vectorization is {round((5.72 * 1000) / 35.8, 2)} faster than nested .apply()")
`np.select` w/ vectorization is 159.78 faster than nested .apply()
实际应用中可能会遇到各种各样的问题,下面展示一些其他情况示例:
# Doing a regex search to find string patterns
def find_paid_nonpaid(s):
if re.search(r'non.*?paid', s, re.I):
return 'non-paid'
elif re.search(r'Buyerzone|^paid\s+', s, re.I):
return 'paid'
else:
return 'unknown'
%%timeit
# our old friend .apply()
df['lead_source_paid_unpaid'] = df['Lead Source'].apply(find_paid_nonpaid)
480 ms ± 12.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
那么如何使用np.vectorize
进行向量化呢?
%%timeit
vect_str = np.vectorize(find_paid_nonpaid)
df['lead_source_paid_unpaid1'] = vect_str(df['Lead Source'])
535 ms ± 9.08 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
# How does a list comprehension do?
df['lead_source_paid_unpaid2'] = ['non-paid' if re.search(r'non.*?paid', s, re.I)
else 'paid' if re.search(r'Buyerzone|^paid\s+', s, re.I)
else 'unknown' for s in df['Lead Source']]
524 ms ± 16.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
pandas
提供了.str
方法应用于字符串操作。
%%timeit
# How does this compare?
conditions = [
df['Lead Source'].str.contains(r'non.*?paid', case=False, na=False),
df['Lead Source'].str.contains(r'Buyerzone|^paid\s+', case=False, na=False),
]
choices = [
'non-paid',
'paid'
]
df['lead_source_paid_unpaid1'] = np.select(conditions, choices, default='unknown')
493 ms ± 13.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
如果不搜索字符串,而直接对字符串进行操作,效率如何呢?
%%timeit
df['lowerls'] = df['Lead Source'].apply(lambda x: x.lower())
58 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
df['lowerls1'] = df['Lead Source'].str.lower()
69.9 ms ± 1.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
channel_dict = {
'Billing Service': 'BS', 'Consultant': 'PD', 'Educational': 'PD',
'Enterprise': 'PD', 'Hospital': 'PD', 'IPA': 'PD', 'MBS': 'RCM',
'MSO': 'PD', 'Medical practice': 'PD', 'Other': 'PD', 'Partner': 'PD',
'PhyBillers': 'BS', 'Practice': 'PD', 'Purchasing Group': 'PD',
'Reseller': 'BS', 'Vendor': 'PD', '_Other': 'PD', 'RCM': 'RCM'
}
def a_dict_lookup(row):
if row['Providers'] > 7:
return 'Upmarket'
else:
channel = channel_dict.get(row['Category'])
return channel
%%timeit
df['dict_lookup'] = df.apply(a_dict_lookup, axis=1)
5.15 s ± 58.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
df['dict_lookup1'] = np.where(
df['Providers'].values > 7,
'Upmarket',
df['Category'].map(channel_dict)
)
17.5 ms ± 144 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
速度提升了 (5.15 * 1000) / 17.5 = 294.2857142857143 倍!
%%timeit
channel_values = df['Category'].map(channel_dict)
df['dict_lookup1'] = np.where(
df['Providers'] > 7,
'Upmarket',
channel_values
)
19.6 ms ± 452 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%%timeit
# Using np.vectorize to vectorize a dictionary .get() method works, but is slower than .map()
channel_values = np.vectorize(channel_dict.get)(df['Category'].values)
df['dict_lookup2'] = np.where(
df['Providers'] > 7,
'Upmarket',
channel_values
)
37.7 ms ± 730 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
两种方法得到的结果是完全相同的。
print((df['dict_lookup'] == df['dict_lookup1']).all())
print((df['dict_lookup'] == df['dict_lookup2']).all())
True
True
# make a new column called 'Start Date' for dummy testing
# ONly do a fraction so we have some NaN values
df['Start Date'] = df['Date Created'].sample(frac=0.8)
def weeks_to_complete(row) -> float:
"""Calculate the number of weeks between two dates"""
if pd.isnull(row['Start Date']):
return (row['Original Record: Date Created'] - row['Date Created']).days / 7
else:
return (row['Date Created'] - row['Start Date']).days / 7
%%timeit
wtc1 = df.apply(weeks_to_complete, axis=1)
12.7 s ± 115 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
一个比较方便的向量化方法是使用pandas
的.dt
获取方法,其有很多便捷的方法/属性。
%%timeit
wtc2 = np.where(
df['Start Date'].isnull().values,
(df['Original Record: Date Created'].values - df['Date Created']).dt.days / 7,
(df['Date Created'].values - df['Start Date']).dt.days / 7
)
18.6 ms ± 250 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
另一个方法是使用ndarray
类型,即转换series
为numpy timedelta
数组。此方法更快,但是代码更冗余,涉及到一些基础的内容。
%%timeit
wtc3 = np.where(
df['Start Date'].isnull().values,
((df['Original Record: Date Created'].values - df['Date Created'].values).astype('timedelta64[D]') / np.timedelta64(1, 'D')) / 7,
((df['Date Created'].values - df['Start Date'].values).astype('timedelta64[D]') / np.timedelta64(1, 'D')) / 7
)
7.91 ms ± 113 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
速度提升了 (12.7 * 1000) / 7.91 = 1605.5625790139063 倍!
此任务主要是想实现Excel
中的函数:
=IF(A2=A1, IF(L2-L1 < 5, 0, 1), 1))
A
列表示id
,L
列表示日期。
def time_looper(df):
""" Using a plain Python for loop"""
output = []
for i, row in enumerate(range(0, len(df))):
if i > 0:
# compare the current id to the row above
if df.iloc[i]['Internal ID'] == df.iloc[i-1]['Internal ID']:
# compare the current date to the row above
if (df.iloc[i]['Date Created'] - df.iloc[i-1]['Original Record: Date Created']).days < 5:
output.append(0)
else:
output.append(1)
else:
output.append(1)
else:
output.append(np.nan)
return output
def time_looper2(df):
"""Using pandas dataframe `.iterrows()` method for iterating over rows"""
output = []
for i, row in df.iterrows():
if i > 0:
if df.iloc[i]['Internal ID'] == df.iloc[i-1]['Internal ID']:
if (df.iloc[i]['Date Created'] - df.iloc[i-1]['Original Record: Date Created']).days < 5:
output.append(0)
else:
output.append(1)
else:
output.append(1)
else:
output.append(np.nan)
return output
df.sort_values(['Internal ID', 'Date Created'], inplace=True)
%%time df['time_col_raw_for'] = time_looper(df)
CPU times: user 2min 1s, sys: 7.17 ms, total: 2min 1s
Wall time: 2min 1s
%%time
df['time_col_iterrows'] = time_looper2(df)
CPU times: user 2min 19s, sys: 67.5 ms, total: 2min 19s
Wall time: 2min 19s
我们按照如下方法进行向量化:
pandas.shift
函数,将之前的值向下移动,这样就可以对比相同轴上的值np.select
向量化条件逻辑检查%%timeit
previous_id = df['Internal ID'].shift(1).fillna(0).astype(int)
previous_date = df['Original Record: Date Created'].shift(1).fillna(pd.Timestamp('1900'))
conditions = [
((df['Internal ID'].values == previous_id) &
(df['Date Created'] - previous_date).astype('timedelta64[D]') < 5),
df['Internal ID'].values == previous_id
]
choices = [0, 1]
df['time_col1'] = np.select(conditions, choices, default=1)
11.1 ms ± 49.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
速度提升了(((2 * 60) + 19) * 1000) / 11.1 = 12522.522522522522倍!
(df['time_col1'] == df['time_col_iterrows']).all()
False
from multiprocessing import Pool
def p_apply(df, func, cores=4):
"""Pass in your dataframe and the func to apply to it"""
df_split = np.array_split(df, cores)
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
df = p_apply(df, func=some_big_function)
参考链接
https://gitlab.com/cheevahagadog/talks-demos-n-such/blob/master/PyGotham2019/PyGotham-updated.ipynb