首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何将数据处理速度提升1000+倍

如何将数据处理速度提升1000+倍

作者头像
气象学家
发布2020-02-17 17:24:21
2.8K0
发布2020-02-17 17:24:21
举报
文章被收录于专栏:气象学家气象学家

以下文章来源于气象杂货铺 ,作者bugsuse

利用Python进行数据处理时经常使用的是pandas和numpy,这两个工具的功能都很强大,尤其是pandas,更是Python中数据处理方面最强大的工具之一。

但是如果不能有效利用pandas和numpy中的各种函数和方法,反而会降低数据处理的效率。

以下就以PyGotham 2019的一个演讲介绍如何大幅提升数据处理的速度。notebook和数据见文末链接,代码较多,建议下载notebook和数据测试。

import pandas as pd
import numpy as np
import re
import time

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

pd.set_option('max_columns', 15)
pd.set_option('chained_assignment', None)

读取数据

此CSV示例数据是用于练习如何向量化.apply函数时使用。数据已经进行了清洗和预处理。

df = pd.read_csv('sample_data_pygotham2019.csv', 
                 parse_dates=['Date Created', 'Original Record: Date Created'])

注:parse_dates 参数可将给定的列进行解析为日期格式。

数据检查
df.shape
df.head(5)
df.dtypes
Internal ID                               int64
ID                                        int64
Category                                 object
Lead Source                              object
Lead Source Category                     object
Current Status                           object
Status at Time of Lead                   object
Original Record: Date Created    datetime64[ns]
Date Created                     datetime64[ns]
Inactive                                 object
Specialty                                object
Providers                               float64
duplicate_leads                            bool
bad_leads                                  bool
dtype: object

条件表达式的向量化

常规条件处理都是使用if...else...语句,将函数应用到.apply方法。

def set_lead_status(row):
    if row['Current Status'] == '- None -':
        return row['Status at Time of Lead']
    else:
        return row['Current Status']
%%timeit
test = df.apply(set_lead_status, axis=1)
8.15 s ± 722 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

但是这种方法的执行速度非常慢,如果涉及数据量更大,那么无疑非常消耗时间。

np.where

np.where给定一个条件表达式,当条件表达式为真或假时返回对应的值。

%%timeit
# Pandas Series Vectorized baby!!

# you can pass the output directly into a pandas Series

df['normalized_status'] = np.where(
    df['Status at Time of Lead'] == '- None -',   # <-- condition
    df['Current Status'],                         # <-- return if true
    df['Status at Time of Lead']                  # <-- return if false
)
20.8 ms ± 784 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.where` is {round((8.15 * 1000) / 20.8, 1)}x faster than `.apply`")
`np.where` is 391.8x faster than `.apply`

直接使用numpy数组比pandas.Series的速度要快。在上述情况下,只需要处理numpy数组而无需处理pandas.Series的所有信息,因此要更快一些。

 %%timeit
# NumPy Vectorized baby!!

df['normalized_status'] = np.where(
    df['Status at Time of Lead'].values == '- None -',
    df['Current Status'].values, 
    df['Status at Time of Lead'].values
)
9.59 ms ± 310 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
print(f"`np.where` w/ numpy vectorization is {round((8.15 * 1000) / 9.59, 1)}x faster than `.apply`")
`np.where` w/ numpy vectorization is 849.8x faster than `.apply`

当使用 raw=True选项时,会显著改善.apply的速度。此选项可将numpy数组传递给自定义函数,从而代替pd.Series对象。

但按照上述方法执行之后,耗时依然较长:

%%timeit test = df.apply(works_but_slow, axis=1, raw=True) 
8.55 s ± 160 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

np.vectorize

np.vectorize可以将python函数转换为numpy ufunc,可以处理向量化方法。可以向量化函数,而不需要应用到数据。

# Here is our previous function that I tried to vectorize but couldn't due to the ValueError. 
def works_fast_maybe(col1, col2):
    if col1 == '- None -':
        return col2
    else:
        return col1
# with the np.vectorize method --> returns a vectorized callable
vectfunc = np.vectorize(works_fast_maybe) #otypes=[np.float],cache=False)
%%timeit  test3 = list(vectfunc(df['Status at Time of Lead'], df['Current Status']))
96 ms ± 4.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

有人认为使用索引设置速度更快,但其实这不是向量化方式。

def can_I_go_any_faster(status_at_time, current_status):
    # this works fine if you're setting static values
    df['test'] = 'test'# status_at_time
    df.loc[status_at_time == '- None -', 'test'] = current_status  # <-- ValueError, indexes don't match!
    df.loc[status_at_time != '- None -', 'test'] = status_at_time
%%timeit test4 = can_I_go_any_faster(df['Status at Time of Lead'], df['Current Status'])
40.5 ms ± 443 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

以下方法和上面的方法相差不大。

def can_I_go_any_faster2(status_at_time, current_status):
    # this works fine if you're setting static values
    df['test'] = 'test'# status_at_time
    df.loc[status_at_time == '- None -', 'test'] = 'statys1_isNone' 
    df.loc[status_at_time != '- None -', 'test'] = 'statys2_notNone'
%%timeit test5 = can_I_go_any_faster2(df['Status at Time of Lead'], df['Current Status'])
37.8 ms ± 568 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

多条件处理

主要类别

list1 = ['LEAD-3 Flame No Contact', 'LEAD-Campaign', 'LEAD-Claim', 'LEAD-Contact Program', 
         'LEAD-General Pool', 'LEAD-No Contact', 'LEAD-Nurture', 'LEAD-Unqualified', 'PROSPECT-LOST']

list2 = ['- None -', 'CLIENT-Closed-Sold', 'CLIENT-Handoff', 'CLIENT-Implementation', 'CLIENT-Implementation (EMR)',
         'CLIENT-Live', 'CLIENT-Partner', 'CLIENT-Referring Consultant', 'CLIENT-Transferred', 'LEAD-Engaged', 
         'LEAD-Long-Term Opportunity', 'PROSPECT-CURRENT', 'PROSPECT-LONG TERM', 'PROSPECT-NO DECISION']

# apply version
def lead_category(row):
    if row['Original Record: Date Created'] == row['Date Created']:
        return 'New Lead'
    elif row['normalized_status'].startswith('CLI'):
        return 'Client Lead'
    elif row['normalized_status'] in list1:
        return 'MTouch Lead'
    elif row['normalized_status'] in list2:
        return 'EMTouch Lead'
    return 'NA'
%%timeit df['lead_category0'] = df.apply(lead_category, axis=1)
6.91 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

当然也可以使用np.where实现上述功能,但是实现代码很难读。

%%timeit
df['lead_category'] = \
    np.where(df['Original Record: Date Created'].values == df['Date Created'].values, 'New Lead', 
            np.where(df['normalized_status'].str.startswith('CLI').values, 'Client Lead', 
                    np.where(df['normalized_status'].isin(list1).values, 'MTouch Lead', 
                            np.where(df['normalized_status'].isin(list2).values, 'EMTouch Lead', 
                                     'NA') 
                                  )
                         )
                )
96.7 ms ± 2.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

np.select

对多个条件选择或嵌套条件而言,np.select的实现方法更简单甚至速度更快。

%%timeit
conditions = [
    df['Original Record: Date Created'].values == df['Date Created'].values,
    df['normalized_status'].str.startswith('CLI').values,
    df['normalized_status'].isin(list1).values,
    df['normalized_status'].isin(list2).values
]

choices = [
    'New Lead', 
    'Client Lead', 
    'MTouch Lead',
    'EMTouch Lead'
]


df['lead_category1'] = np.select(conditions, choices, default='NA')  # Order of operations matter!
82.4 ms ± 865 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

np.selectnp.where 的输出结果是相同的。

# Their output logic is the same
(df.lead_category == df.lead_category1).all()
True
print(f"`np.select` w/ numpy vectorization is {round((6.9 * 1000) / 82.5, 2)} faster than nested .apply()")
`np.select` w/ numpy vectorization is 83.64 faster than nested .apply()
print(f"`np.select` is {round(96.7 / 83.64, 2)} faster than nested `np.where`")
`np.select` is 1.16 faster than nested `np.where`

向量化多条件表达式

# This is something you might think you can't vectorize, but you sure can!
def sub_conditional(row):
    if row['Inactive'] == 'No':
        if row['Providers'] == 0:
            return 'active_no_providers'
        elif row['Providers'] < 5:
            return 'active_small'
        else:
            return 'active_normal'
    elif row['duplicate_leads']:
        return 'is_dup'
    else:
        if row['bad_leads']:
            return 'active_bad'
        else:
            return 'active_good'
%%timeit
# Let's time how long it takes to apply a nested multiple condition func
df['lead_type'] = df.apply(sub_conditional, axis=1)
5.72 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

以下是利用np.select实现的多条件处理方法,速度有明显的提升。

%%timeit

# With np.select, could do .values here for additional speed, but left out to avoid too much text
conditions = [
    ((df['Inactive'] == 'No') & (df['Providers'] == 0)),
    ((df['Inactive'] == 'No') & (df['Providers'] < 5)),
    df['Inactive'] == 'No',
    df['duplicate_leads'],  # <-- you can also just evaluate boolean arrays
    df['bad_leads'],
]

choices = [
    'active_no_providers',
    'active_small',
    'active_normal',
    'is_dup',
    'active_bad',
]

df['lead_type_vec'] = np.select(conditions, choices, default='NA')
61.1 ms ± 1.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.select` is {round((5.82 * 1000) / 60.4, 2)} faster than nested .apply()")
`np.select` is 96.36 faster than nested .apply()

使用np.select时,直接获取原始数据可以进一步加速:

%%timeit

# With np.select
conditions = [
    ((df['Inactive'].values == 'No') & (df['Providers'].values == 0)),
    ((df['Inactive'].values == 'No') & (df['Providers'].values < 5)),
    df['Inactive'].values == 'No',
    df['duplicate_leads'].values,  # <-- you can also just evaluate boolean arrays
    df['bad_leads'].values,
]

choices = [
    'active_no_providers',
    'active_small',
    'active_normal',
    'is_dup',
    'active_bad',
]

df['lead_type_vec'] = np.select(conditions, choices, default='NA')
35.8 ms ± 257 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.select` w/ vectorization is {round((5.72 * 1000) / 35.8, 2)} faster than nested .apply()")
`np.select` w/ vectorization is 159.78 faster than nested .apply()

复杂示例

实际应用中可能会遇到各种各样的问题,下面展示一些其他情况示例:

字符串
 # Doing a regex search to find string patterns

def find_paid_nonpaid(s):
    if re.search(r'non.*?paid', s, re.I):
        return 'non-paid'
    elif re.search(r'Buyerzone|^paid\s+', s, re.I):
        return 'paid'
    else:
        return 'unknown'
%%timeit
# our old friend .apply()
df['lead_source_paid_unpaid'] = df['Lead Source'].apply(find_paid_nonpaid)
480 ms ± 12.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

那么如何使用np.vectorize进行向量化呢?

%%timeit

vect_str = np.vectorize(find_paid_nonpaid)

df['lead_source_paid_unpaid1'] = vect_str(df['Lead Source'])
535 ms ± 9.08 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
# How does a list comprehension do?
df['lead_source_paid_unpaid2'] = ['non-paid' if re.search(r'non.*?paid', s, re.I) 
                                  else 'paid' if re.search(r'Buyerzone|^paid\s+', s, re.I) 
                                  else 'unknown' for s in df['Lead Source']]
524 ms ± 16.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

pandas提供了.str方法应用于字符串操作。

 %%timeit
# How does this compare?
conditions = [
    df['Lead Source'].str.contains(r'non.*?paid', case=False, na=False),
    df['Lead Source'].str.contains(r'Buyerzone|^paid\s+', case=False, na=False),
]

choices = [
    'non-paid',
    'paid'
]

df['lead_source_paid_unpaid1'] = np.select(conditions, choices, default='unknown')
493 ms ± 13.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

如果不搜索字符串,而直接对字符串进行操作,效率如何呢?

%%timeit
df['lowerls'] = df['Lead Source'].apply(lambda x: x.lower())
58 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
df['lowerls1'] = df['Lead Source'].str.lower()
69.9 ms ± 1.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
字典查表
channel_dict = {
    'Billing Service': 'BS', 'Consultant': 'PD', 'Educational': 'PD', 
    'Enterprise': 'PD', 'Hospital': 'PD', 'IPA': 'PD', 'MBS': 'RCM', 
    'MSO': 'PD', 'Medical practice': 'PD', 'Other': 'PD', 'Partner': 'PD',
    'PhyBillers': 'BS', 'Practice': 'PD', 'Purchasing Group': 'PD',
    'Reseller': 'BS', 'Vendor': 'PD', '_Other': 'PD', 'RCM': 'RCM'
}

def a_dict_lookup(row):
    if row['Providers'] > 7:
        return 'Upmarket'
    else:
        channel = channel_dict.get(row['Category'])
        return channel
%%timeit
df['dict_lookup'] = df.apply(a_dict_lookup, axis=1)
5.15 s ± 58.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
df['dict_lookup1'] = np.where(
    df['Providers'].values > 7, 
    'Upmarket',
    df['Category'].map(channel_dict)
)
17.5 ms ± 144 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

速度提升了 (5.15 * 1000) / 17.5 = 294.2857142857143 倍!

%%timeit
channel_values = df['Category'].map(channel_dict)
df['dict_lookup1'] = np.where(
    df['Providers'] > 7, 
    'Upmarket',
    channel_values
)
19.6 ms ± 452 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%%timeit
# Using np.vectorize to vectorize a dictionary .get() method works, but is slower than .map()
channel_values = np.vectorize(channel_dict.get)(df['Category'].values)
df['dict_lookup2'] = np.where(
    df['Providers'] > 7, 
    'Upmarket',
    channel_values
)
37.7 ms ± 730 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

两种方法得到的结果是完全相同的。

print((df['dict_lookup'] == df['dict_lookup1']).all())
print((df['dict_lookup'] == df['dict_lookup2']).all())
True
True
日期操作
# make a new column called 'Start Date' for dummy testing
# ONly do a fraction so we have some NaN values
df['Start Date'] = df['Date Created'].sample(frac=0.8)
def weeks_to_complete(row) -> float:
    """Calculate the number of weeks between two dates"""
    if pd.isnull(row['Start Date']):
        return (row['Original Record: Date Created'] -  row['Date Created']).days / 7
    else:
        return (row['Date Created'] - row['Start Date']).days / 7
%%timeit
wtc1 = df.apply(weeks_to_complete, axis=1)
12.7 s ± 115 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

一个比较方便的向量化方法是使用pandas.dt获取方法,其有很多便捷的方法/属性。

%%timeit
wtc2 = np.where(
    df['Start Date'].isnull().values,
    (df['Original Record: Date Created'].values - df['Date Created']).dt.days / 7,
    (df['Date Created'].values - df['Start Date']).dt.days / 7
)
18.6 ms ± 250 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

另一个方法是使用ndarray类型,即转换seriesnumpy timedelta数组。此方法更快,但是代码更冗余,涉及到一些基础的内容。

%%timeit
wtc3 = np.where(
    df['Start Date'].isnull().values,
    ((df['Original Record: Date Created'].values - df['Date Created'].values).astype('timedelta64[D]') / np.timedelta64(1, 'D')) / 7,
    ((df['Date Created'].values - df['Start Date'].values).astype('timedelta64[D]') / np.timedelta64(1, 'D')) / 7
)
7.91 ms ± 113 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

速度提升了 (12.7 * 1000) / 7.91 = 1605.5625790139063 倍!

逻辑表达式所需要的值在其他行

此任务主要是想实现Excel中的函数:

=IF(A2=A1, IF(L2-L1 < 5, 0, 1), 1))

A列表示idL列表示日期。

 def time_looper(df):
    """ Using a plain Python for loop"""
    output = []
    for i, row in enumerate(range(0, len(df))):
        if i > 0:

            # compare the current id to the row above
            if df.iloc[i]['Internal ID'] == df.iloc[i-1]['Internal ID']:

                # compare the current date to the row above
                if (df.iloc[i]['Date Created'] - df.iloc[i-1]['Original Record: Date Created']).days < 5:
                    output.append(0)
                else:
                    output.append(1)
            else:
                output.append(1)
        else:
            output.append(np.nan)
    return output
def time_looper2(df):
    """Using pandas dataframe `.iterrows()` method for iterating over rows"""
    output = []
    for i, row in df.iterrows():
        if i > 0:
            if df.iloc[i]['Internal ID'] == df.iloc[i-1]['Internal ID']:
                if (df.iloc[i]['Date Created'] - df.iloc[i-1]['Original Record: Date Created']).days < 5:
                    output.append(0)
                else:
                    output.append(1)
            else:
                output.append(1)
        else:
            output.append(np.nan)
    return output
df.sort_values(['Internal ID', 'Date Created'], inplace=True)
%%time df['time_col_raw_for'] = time_looper(df)
CPU times: user 2min 1s, sys: 7.17 ms, total: 2min 1s
Wall time: 2min 1s
%%time
df['time_col_iterrows'] = time_looper2(df)
CPU times: user 2min 19s, sys: 67.5 ms, total: 2min 19s
Wall time: 2min 19s

我们按照如下方法进行向量化:

  • 使用pandas.shift函数,将之前的值向下移动,这样就可以对比相同轴上的值
  • 使用np.select向量化条件逻辑检查
%%timeit
previous_id = df['Internal ID'].shift(1).fillna(0).astype(int)
previous_date = df['Original Record: Date Created'].shift(1).fillna(pd.Timestamp('1900'))

conditions = [
    ((df['Internal ID'].values ==  previous_id) & 
     (df['Date Created'] - previous_date).astype('timedelta64[D]') < 5),
    df['Internal ID'].values ==  previous_id
]
choices = [0, 1]
df['time_col1'] = np.select(conditions, choices, default=1)
11.1 ms ± 49.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

速度提升了(((2 * 60) + 19) * 1000) / 11.1 = 12522.522522522522倍!

(df['time_col1'] == df['time_col_iterrows']).all()
False

其他方法

并行函数
from multiprocessing import Pool

def p_apply(df, func, cores=4):
    """Pass in your dataframe and the func to apply to it"""
    df_split = np.array_split(df, cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df
df = p_apply(df, func=some_big_function)

参考链接

https://gitlab.com/cheevahagadog/talks-demos-n-such/blob/master/PyGotham2019/PyGotham-updated.ipynb

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-01-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 气象学家 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 读取数据
    • 数据检查
    • 条件表达式的向量化
    • np.where
    • np.vectorize
    • 多条件处理
    • np.select
    • 向量化多条件表达式
    • 复杂示例
      • 字符串
        • 字典查表
          • 日期操作
            • 逻辑表达式所需要的值在其他行
            • 其他方法
              • 并行函数
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档