首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在apache beam数据流中将csv转换为字典

如何在apache beam数据流中将csv转换为字典
EN

Stack Overflow用户
提问于 2016-12-16 02:30:29
回答 2查看 10.8K关注 0票数 9

我想读取一个csv文件,并将其写入到BigQuery使用阿帕奇光束数据流。为此,我需要将数据以字典的形式呈现给BigQuery。如何使用apache beam转换数据以实现此目的?

我的输入csv文件有两列,我想在BigQuery中创建一个后续的两列的表。我知道如何在BigQuery中创建数据,这很简单,我不知道的是如何将csv转换成字典。下面的代码是不正确的,但应该给出了我想要做什么的想法。

代码语言:javascript
运行
复制
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   beam.io.BigQuerySink(
   output_table,
   schema='month:INTEGER, tornado_count:INTEGER',
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-12-16 03:25:32

编辑:从2.12.0版本开始,Beam提供了新的fileio转换,允许您从CSV读取,而不必重新实现源。您可以这样做:

代码语言:javascript
运行
复制
def get_csv_reader(readable_file):
  # You can return whichever kind of reader you want here
  # a DictReader, or a normal csv.reader.
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with Pipeline(...) as p:
  content_pc = (p
                | beam.io.fileio.MatchFiles("/my/file/name")
                | beam.io.fileio.ReadMatches()
                | beam.Reshuffle()  # Useful if you expect many matches
                | beam.FlatMap(get_csv_reader))

我最近为Apache光束写了一个测试。你可以在the Github repository上看看。

旧的答案依赖于重新实现源代码。这不再是主要的推荐方法:)

其想法是拥有一个返回解析的CSV行的源。您可以通过将FileBasedSource类子类化以包含CSV解析来实现这一点。具体地说,read_records函数看起来就像这样:

代码语言:javascript
运行
复制
class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)

    reader = csv.reader(self._file)

    for rec in reader:
      yield rec
票数 26
EN

Stack Overflow用户

发布于 2017-12-19 22:11:26

作为Pablo的帖子的补充,我想分享我自己对他的样本所做的一点改变。(+1为你!)

已更改:reader = csv.reader(self._file)更改为reader = csv.DictReader(self._file)

csv.DictReader使用CSV文件的第一行作为Dict密钥。其他行用于使用每行的值填充每行的字典。它会根据列的顺序自动将正确的值放入正确的键中。

一个小细节是,Dict中的每个值都存储为字符串。这可能会与您的BigQuery模式冲突,如果您使用的是例如。某些字段的整数。所以你需要在之后注意正确的造型。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41170997

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档