我有一个.csv文件,我试图在管道中使用apache_beam.io.ReadFromText()读取该文件(beam是apache_beam的别名): reader = beam.io.ReadFromText(csv_path, skip_header_lines=1) 我有一个行读取类,如下所示: class RowReader(beam.DoFn):
def pro
我正在开发一个从Google Cloud Storage (GCS)目录中读取大约500万个文件的管道。我将其配置为在Google Cloud Dataflow上运行。问题是,当我启动管道时,它需要几个小时来“计算”所有文件的大小: INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input在撰写本文时,该作业在Dataflow控制台中仍然不可用,这使
我有一个csv文件,我知道如何使用pandas实现这一点,基本上将csv作为一个df ->组按字段‘aaa’、‘bbb’读取数据,然后构造一个新的'id‘。我的问题是如何在ApacheBeam中实现相同的功能,我以前从未使用过它,我试图使用Beam读取这个csv文件和分组多个记录,但是我对熊猫使用的相同功能不支持Beam,下面是我的当前代码:
importapache_beam a
我刚接触Apache光束和Dataflow。我正在尝试获取大约20000条记录的大数据集。我必须将它分成1000条记录,并将这些分块保存在单独的CSV文件中。我知道如何从BQ读取和写入CSV,但不能理解如何使用波束变换来分块文件,或者是否有任何其他方法。
我的尝试:我从简单的代码开始,将我从BQ读取的数据传递给ParDo函数。此外,ParDo不会打印我在以下代码中传递的元素。import apache_beam as
通过调用函数ReadableFile.readFullyAsUTF8String,尝试使用数据流作业中的apache光束FileIO读取大小为10+ GB的CSV文件。而且,它失败了,错误如下。读取大小超过INTEGER.MAX_VALUE的文件似乎失败。敬请指教。ByteArrayOutputStream.java:93) at