前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >怎么还蹦出来个 “ 数据管道 ”

怎么还蹦出来个 “ 数据管道 ”

作者头像
数据森麟
发布2020-02-27 13:27:57
3930
发布2020-02-27 13:27:57
举报
文章被收录于专栏:数据森麟数据森麟
作者:厅长大人

来源:Python知识大全

问题

你想以数据管道 (类似 Unix 管道) 的方式迭代处理数据。比如,你有个大量的数据 需要处理,但是不能将它们一次性放入内存中。

解决方案

生成器函数是一个实现管道机制的好办法。

假定你要处理一个非常大的 日志文件目录:

代码语言:javascript
复制
foo/
access-log-012007.gz
access-log-022007.gz
access-log-032007.gz
...
access-log-012008
bar/
access-log-092007.bz2
...
access-log-022008

假设每个日志文件包含这样的数据:

代码语言:javascript
复制
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...

为了处理这些文件,你可以定义一个由多个执行特定任务独立任务的简单生成器函数组成的容器。

就像这样:

代码语言:javascript
复制
import os
import fnmatch
import gzip
import bz2
import re


def gen_find(filepat, top):
      '''
      查找目录树中与外壳通配符模式匹配的所有文件名
      '''
      for path, dirlist, filelist in os.walk(top):
          for name in fnmatch.filter(filelist, filepat):
              yield os.path.join(path,name)
              
              
def gen_opener(filenames):
      '''
      每次打开一个文件名序列,生成一个file对象。在进行下一次迭代时,文件将立即关闭。
      '''
      for filename in filenames:
          if filename.endswith('.gz'):
              f = gzip.open(filename, 'rt')
          elif filename.endswith('.bz2'):
              f = bz2.open(filename, 'rt')
          else:
              f = open(filename, 'rt')
          yield f
          f.close()
          
def gen_concatenate(iterators):
    '''
    将一个迭代器序列链接到一个单独的序列中
    '''
    for it in iterators:
        yield from it
        
        
def gen_grep(pattern, lines):
    '''
    在一个行序列中查找regex模式
    '''
    pat = re.compile(pattern)
        for line in lines:
            if pat.search(line):
                  yield line

现在你可以很容易的将这些函数连起来创建一个处理管道。比如,为了查找包含单 词 Python 的所有日志行.

你可以这样做:

代码语言:javascript
复制
lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)
for line in pylines:
    print(line)

如果将来的时候你想扩展管道,你甚至可以在生成器表达式中包装数据。比如,下面这个版本计算出传输的字节数并计算其总和。

代码语言:javascript
复制
lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)
bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
bytes = (int(x) for x in bytecolumn if x != '-')
print('Total', sum(bytes))

结论

以管道方式处理数据可以用来解决各类其他问题,包括解析,读取实时数据,定时轮询等。

为了理解上述代码,重点是要明白yield 语句作为数据的生产者而 for 循环语句 作为数据的消费者。当这些生成器被连在一起后,每个 yield 会将一个单独的数据元 素传递给迭代处理管道的下一阶段。

在例子最后部分sum() 函数是最终的程序驱动者,每次从生成器管道中提取出一个元素。这种方式一个非常好的特点是每个生成器函数很小并且都是独立的。这样的话就 很容易编写和维护它们了。

事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小 的内存。

在调用 gen_concatenate() 函数的时候你可能会有些不太明白。这个函数的目的 是将输入序列拼接成一个很长的行序列。itertools.chain() 函数同样有类似的功能, 但是它需要将所有可迭代对象最为参数传入。

在上面这个例子中,你可能会写类似这样 的语句 lines = itertools.chain(*files) ,这将导致 gen_opener() 生成器被提前 全部消费掉。但由于 gen_opener() 生成器每次生成一个打开过的文件,等到下一个迭 代步骤时文件就关闭了,因此 chain() 在这里不能这样使用,,当然上面的方案可以避免这种情况。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据森麟 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档