python︱大规模数据存储与读取、并行计算:Dask库简述

数据结构与pandas非常相似,比较容易理解。

github:https://github.com/dask

dask的内容很多,挑一些我比较看好的内容着重点一下。 .

一、数据读取与存储

先来看看dask能读入哪些内容:

1、csv

dask并不能读入excel,这个注意

# pandas
import pandas as pd                    
df = pd.read_csv('2015-01-01.csv')      
df.groupby(df.user_id).value.mean()     

#dask
 import dask.dataframe as dd
 df = dd.read_csv('2015-*-*.csv')
 df.groupby(df.user_id).value.mean().compute()

非常相似,除了.compute() .

2、Dask Array读取hdf5

import numpy as np                       import dask.array as da
f = h5py.File('myfile.hdf5')             f = h5py.File('myfile.hdf5')
x = np.array(f['/small-data'])           x = da.from_array(f['/big-data'],
                                                           chunks=(1000, 1000))
x - x.mean(axis=1)                       x - x.mean(axis=1).compute()

左是Pandas,右边是dask .

3、Dask Bag

import dask.bag as db
b = db.read_text('2015-*-*.json.gz').map(json.loads)
b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()

读取大规模json文件,几亿都很easy

>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')

读取txt

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()

变为dataframe格式的内容 .

4、Dask Delayed 并行计算

from dask import delayed
L = []
for fn in filenames:                  # Use for loops to build up computation
    data = delayed(load)(fn)          # Delay execution of function
    L.append(delayed(process)(data))  # Build connections between variables

result = delayed(summarize)(L)
result.compute()

.

5、concurrent.futures自定义任务

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

.

二、Delayed 并行计算模块

一个先行例子,本来的案例:

def inc(x):
    return x + 1

def double(x):
    return x + 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)

再来看看用delay加速的:

from dask import delayed

output = []
for x in data:
    a = delayed(inc)(x)
    b = delayed(double)(x)
    c = delayed(add)(a, b)
    output.append(c)

total = delayed(sum)(output)

还可以将计算流程可视化:

total.visualize()  # see image to the right

.

三、和SKLearn结合的并行算法

广义回归GLM:https://github.com/dask/dask-glm tensorflow深度学习库:Dask-Tensorflow

以XGBoost为例,官方:https://github.com/dask/dask-xgboost 来看一个案例code .

1、加载数据

import dask.dataframe as dd

# Subset of the columns to use
cols = ['Year', 'Month', 'DayOfWeek', 'Distance',
        'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']

# Create the dataframe
df = dd.read_csv('s3://dask-data/airline-data/20*.csv', usecols=cols,
                  storage_options={'anon': True})

df = df.sample(frac=0.2) # we blow out ram otherwise

is_delayed = (df.DepDelay.fillna(16) > 15)

df['CRSDepTime'] = df['CRSDepTime'].clip(upper=2399)
del df['DepDelay']

df, is_delayed = persist(df, is_delayed)
progress(df, is_delayed)

2、One hot encode编码

df2 = dd.get_dummies(df.categorize()).persist()

.

3、准备训练集和测试集 + 训练

data_train, data_test = df2.random_split([0.9, 0.1], 
                                         random_state=1234)
labels_train, labels_test = is_delayed.random_split([0.9, 0.1], 
                                                    random_state=1234)

训练

import dask_xgboost as dxgb

params = {'objective': 'binary:logistic', 'nround': 1000, 
          'max_depth': 16, 'eta': 0.01, 'subsample': 0.5, 
          'min_child_weight': 1}

bst = dxgb.train(client, params, data_train, labels_train)
bst

.

4、预测

# Use normal XGBoost model with normal Pandas
import xgboost as xgb
dtest = xgb.DMatrix(data_test.head())
bst.predict(dtest)
predictions = dxgb.predict(client, bst, data_test).persist()
predictions.head()

.

5、模型评估

from sklearn.metrics import roc_auc_score, roc_curve
print(roc_auc_score(labels_test.compute(), 
                    predictions.compute()))
import matplotlib.pyplot as plt
%matplotlib inline

fpr, tpr, _ = roc_curve(labels_test.compute(), predictions.compute())
# Taken from http://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html#sphx-glr-auto-examples-model-selection-plot-roc-py
plt.figure(figsize=(8, 8))
lw = 2
plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve')
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

.

四、计算流程可视化部分——Dask.array

来源:https://gist.github.com/mrocklin/b61f795004ec0a70e43de350e453e97e

import numpy as np
import dask.array as da
x = da.ones(15, chunks=(5,))
x.visualize('dask.svg')
(x + 1).sum().visualize('dask.svg')

来一个二维模块的:

x = da.ones((15, 15), chunks=(5, 5))
x.visualize('dask.svg')
(x.dot(x.T + 1) - x.mean(axis=0)).std().visualize('dask.svg')

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏我和未来有约会

Silverlight第三方控件专题

这里我收集整理了目前网上silverlight第三方控件的专题,若果有所遗漏请告知我一下。 名称 简介 截图 telerik 商 RadC...

4425
来自专栏c#开发者

为什么nhibernate 不能保存on-to-many的结构

下面是主类文件 Code namespace EasyTalk.Module {     /// <summary>     /// SiteAddre...

2585
来自专栏码匠的流水账

聊聊spring cloud netflix的HystrixCommands

本文主要研究一下spring cloud netflix的HystrixCommands。

1042
来自专栏ASP.NETCore

ASP.NET Core 整合Autofac和Castle实现自动AOP拦截

除了ASP.NETCore自带的IOC容器外,我们还可以使用其他成熟的DI框架,如Autofac,StructureMap等(笔者只用过Unity,Ninjec...

754
来自专栏张善友的专栏

LINQ via C# 系列文章

LINQ via C# Recently I am giving a series of talk on LINQ. the name “LINQ via C...

3025
来自专栏hbbliyong

WPF Trigger for IsSelected in a DataTemplate for ListBox items

<DataTemplate DataType="{x:Type vm:HeaderSlugViewModel}"> <vw:HeaderSlug...

4224
来自专栏Ceph对象存储方案

Luminous版本PG 分布调优

Luminous版本开始新增的balancer模块在PG分布优化方面效果非常明显,操作也非常简便,强烈推荐各位在集群上线之前进行这一操作,能够极大的提升整个集群...

3685
来自专栏陈仁松博客

ASP.NET Core 'Microsoft.Win32.Registry' 错误修复

今天在发布Asp.net Core应用到Azure的时候出现错误InvalidOperationException: Cannot find compilati...

5248
来自专栏张善友的专栏

Miguel de Icaza 细说 Mix 07大会上的Silverlight和DLR

Mono之父Miguel de Icaza 详细报道微软Mix 07大会上的Silverlight和DLR ,上面还谈到了Mono and Silverligh...

3007
来自专栏芋道源码1024

熔断器 Hystrix 源码解析 —— 断路器 HystrixCircuitBreaker

本文主要基于 Hystrix 1.5.X 版本 1. 概述 2. HystrixCircuitBreaker 3. HystrixCircuitBreaker....

5827

扫码关注云+社区