腾讯云
开发者社区
文档
建议反馈
控制台
首页
学习
活动
专区
工具
TVP
最新优惠活动
文章/答案/技术大牛
搜索
搜索
关闭
发布
登录/注册
精选内容/技术社群/优惠产品,
尽在小程序
立即前往
文章
问答
(9999+)
视频
沙龙
1
回答
在
DoFn
中
包装
WriteToText
python-3.x
、
google-cloud-platform
、
google-cloud-dataflow
、
apache-beam
我正在尝试将
WriteToText
包装
在
DoFn
中
,以便在编写文件的方式上允许一些自定义/灵活性。具体地说,我希望根据参数/输入(基于值提供程序参数)编写不同的文件。Header5,Header6,Header7,Header8" yield
WriteToText
shard_name_template='',he
浏览 21
提问于2021-02-11
得票数 0
1
回答
数据流管道
中
WriteToText
文件的
WriteToText
python
、
google-cloud-platform
、
google-cloud-storage
、
google-cloud-dataflow
、
apache-beam
我有一个分支管道,包含多个ParDo转换,这些转换被合并并写入到GCS桶
中
的文本文件记录
中
。Not found: gs://MYBUCKET/JOBNAME.00000-of-00001.avro [while running '
WriteToText
/
WriteToText
/Write/WriteImpl/WriteBundles/WriteBundles'] 似乎找不到它一直
在
写的日志文件。
在<
浏览 5
提问于2020-12-01
得票数 0
回答已采纳
1
回答
在
我的python代码
中
,有没有一种方法可以
在
某个管道之后使用apache光束创建一个空文件呢?
python
、
apache-beam
、
dataflow
"'+"Hi, This msg is from Whirlpool DL" + '"' for column in row))if __name__ == '__main__': run() 在此代码
中
,
浏览 8
提问于2021-02-03
得票数 0
1
回答
如何在
中
获得窗口时间戳的结束
python
、
google-cloud-dataflow
、
apache-beam
我正在从批处理日志数据
中
创建每5秒长20秒的滑动时间窗口:如何访问Python
中
每个窗口的时间戳信息
浏览 0
提问于2017-09-15
得票数 5
回答已采纳
2
回答
从单个PCollection写入多个文件(Beam)
python
、
apache-beam
、
dataflow
我正在处理多个文件输入,并希望将它们输入到Dataflow管道
中
。但是,我要保留产出,使之与投入的数量相匹配。//bucket/<file1_dir>', 'gs://bucket/<file2_dir>', 'gs://bucket/<file3_dir>'] from a
浏览 32
提问于2022-07-13
得票数 0
回答已采纳
2
回答
如何在Python
中
创建从发布/订阅到GCS的数据流管道
python
、
google-cloud-dataflow
、
apache-beam
、
google-cloud-pubsub
因此,基本上我希望Dataflow
在
固定的时间内(例如15分钟)积累一些消息,然后
在
该时间段过去后将这些数据作为文本文件写入GCS。>> beam.transforms.core.CombineGlobally(CombineFn).without_defaults()res.wait_until
浏览 6
提问于2019-02-18
得票数 7
回答已采纳
2
回答
在
Beam
中
读写序列化的协议
python
、
protocol-buffers
、
apache-beam
、
apache-beam-io
message PhoneNumber { string country = 2;我有下面的python代码,它实现了一个简单的光束管道来将文本写入序列化的协议
中
。apache_beam.options.pipeline_options import PipelineOptions class ToProtoFn(beam.
DoFn
| beam.Create(["123-456-789,us", "345-567-789,ca"])
浏览 1
提问于2018-01-22
得票数 4
1
回答
在
pardo类名中使用args
python
、
apache-beam
是否可以
在
pardo(classname(args))中提供参数。因为我试过了,但是它说进程函数需要3个args,而给定的是2个。我也试过init func。没有希望。请帮帮忙。import apache_beam as beam def process(self,elements):return [elements.split(',')] def process(se
浏览 1
提问于2019-11-18
得票数 0
回答已采纳
1
回答
对大型BigQuery响应进行分块,并使用Apache光束和数据流将这些分块保存在CSV文件
中
python
、
google-cloud-dataflow
、
apache-beam
我必须将它分成1000条记录,并将这些分块保存在单独的CSV文件
中
。我知道如何从BQ读取和写入CSV,但不能理解如何使用波束变换来分块文件,或者是否有任何其他方法。此外,ParDo不会打印我
在
以下代码
中
传递的元素。apache_beam as beam class Printer(beam.
DoFn
浏览 0
提问于2021-05-08
得票数 0
1
回答
如何处理Apache / Google
中
多个ParDo转换上的本地文件操作
python
、
google-cloud-platform
、
google-cloud-dataflow
、
apache-beam
提取过程从单个GCS桶位置下载文件,然后
在
转换完成后删除它们,以使存储处于控制之下。在这个粗略的实现
中
,每个分支下载并删除文件,需要进行大量的双重处理。
在
我的实现
中
,我有8个分支,因此每个文件被下载和删除8次。是否可以将GCS桶安装在每个工作人员身上,而不是从远程下载文件?def finish_bundle(self):
浏览 3
提问于2020-11-29
得票数 1
回答已采纳
1
回答
Python Apache光束多个输出和处理
python
、
apache-beam
我正在尝试使用以下流程
在
Google Dataflow上运行作业: 本质上是采用单个数据源,根据字典
中
的某些值进行过滤,并为每个过滤条件创建单独的输出。| 'Dump_json_'+filename >> beam.Map(json.dumps) | "Save_"+filename >> beam.io.
WriteToText
(output_fp+filename,num_shards=0,shard_name_template="&q
浏览 11
提问于2018-08-29
得票数 2
2
回答
如何处理Apache管道
中
的大内存数据以在上运行
google-cloud-dataflow
、
apache-beam
内存
中
变量word_to_id的大小为50 is。这会导致将管道提交给Dataflow Runner时出错。
浏览 2
提问于2020-06-05
得票数 1
回答已采纳
1
回答
Apache :使用ParDo类返回条件语句
python
、
python-3.x
、
google-cloud-platform
、
apache-beam
我想检查一下,如果我们
在
apache beam管道
中
读取的CSV文件
在
执行任何转换之前满足了我期望它在Ex
中
的格式:字段检查、类型检查、空值检查等等。
在
管道外对每个文件执行这些检查将消除并行性的概念,所以我只想知道是否有可能在管道
中
执行它。代码可能是什么样子的示例: def process(input_colle
浏览 1
提问于2022-04-16
得票数 0
回答已采纳
1
回答
Apache
中
多个csv文件的连接
python
、
pandas
、
parallel-processing
、
google-cloud-dataflow
、
apache-beam
我正在尝试使用csv读取几个fileio.MatchFiles文件,将它们转换为pd.DataFrame,然后将它们连接到一个csv文件
中
。为此,我创建了两个ParDo类,将文件隐藏到DataFrame
中
,然后将它们合并到merged csv
中
。整个片段如下所示: def process(self, element):
浏览 4
提问于2021-12-29
得票数 1
2
回答
WriteToText
只写临时文件
python
、
apache-beam
、
google-cloud-pubsub
我是Apache的新手,我试图用Python编写我的第一个管道,以便将Google Pub/Sub订阅
中
的数据输出到平面文件,以便以后使用;理想情况下,我希望每隔半小时将这些数据分批到一个文件
中
。我有以下代码作为我的管道
中
的最终转换:然而,所创建的所有文件都位于一个以“beam-temp-TestNew管道”为前缀的目录
中
,并分批成1
浏览 0
提问于2019-07-09
得票数 1
回答已采纳
1
回答
Apache Beam DirectRunner支持不同ParDo/
DoFn
的多线程处理
java
、
kotlin
、
apache-beam
我有5个用ParDos
包装
的DoFns,它们一个接一个地被应用。当管道运行()时,首先
DoFn
处理它的所有工作,然后是第二个,然后是第三个。我希望
在
第一个
DoFn
发出输出时,第二个
DoFn
就开始工作,这样处理将是并行的,因为目前在任何给定的时间最多有一个Thread Synchronization for
DoFn
in Apache Beam
在
工作(相关:Thread Synchronization for
DoFn
in Apache Beam)。
浏览 34
提问于2021-01-04
得票数 0
2
回答
如何使用Apache Beam Python将输出写入动态路径
python
、
google-cloud-platform
、
google-cloud-storage
、
apache-beam
、
dataflow
我的场景如下所示:import apache_beam as beamfrom apache_beam.pvalue importTaggedOutputimport time def process(self, eleme
浏览 0
提问于2020-08-27
得票数 2
2
回答
Apache维表加载,有例子吗?
python
、
google-cloud-dataflow
、
apache-beam
我正在考虑将文件加载到一维表
中
。我的解决办法是: 想问一下是否有人实现了这一点?你能给我举个例子吗?
浏览 2
提问于2017-08-05
得票数 1
回答已采纳
1
回答
使用创建大型CSV数据
csv
、
google-cloud-dataflow
2500 , 2017-03-18备注 ItemId可以用任意随机数
在
0001
浏览 3
提问于2017-03-29
得票数 1
1
回答
未定义全局名称“bigquery”
python
、
google-bigquery
、
google-cloud-dataflow
loggingimport json from apache_beam.io import ReadFromText,
WriteToText
apache_beam.options.pipeline_options import StandardOptions这是返回错误的类: class CheckExistance(beam.
DoFn
顺便说一句,我只有
在
将它部署到google的数据流作业时
浏览 0
提问于2018-09-26
得票数 3
回答已采纳
点击加载更多
扫码
添加站长 进交流群
领取专属
10元无门槛券
手把手带您无忧上云
相关
资讯
【工业包装#运输包装】蜂窝纸板在包装中的应用案例
ULP睿池周转箱 物流包装新趋势:循环包装租赁在供应链中的兴起
在燕窝行业中,品鉴即食燕窝所采用的铝碗包装背后优势!
在药品包装检测时,送检过程中需要特别注意哪些事项呢?
ULP循环包装箱美固笼租赁:仓储笼在电商物流中的效益
热门
标签
更多标签
云服务器
ICP备案
实时音视频
对象存储
即时通信 IM
活动推荐
运营活动
广告
关闭
领券