首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >WriteToText在DirectRunner中工作,但在DataflowRunner中与TypeError一起失败

WriteToText在DirectRunner中工作,但在DataflowRunner中与TypeError一起失败
EN

Stack Overflow用户
提问于 2017-02-12 07:38:44
回答 1查看 1.3K关注 0票数 4

我可以使用DirectRunner运行这段代码,它运行得很好。对于DataflowRunner,它通过以下方式崩溃:

TypeError: process()在运行‘Write_text/Write/WriteImpl/WriteBundles’时只接受4个参数(3个给定)

我的apache是按照说明中的指示从主人克隆和构建的。它构建为apache-beam-sdk==0.6.0.dev0。不过,我对这个版本表示怀疑,因为(我想)最近我看到了没有版本更改的代码更改(NewDoFn消失了,但版本没有改变)。

我不确定这是否是问题的根源,但似乎安装的sdk和数据流容器不匹配。我得到另一个不匹配类型错误,其中DirectRunner直接将element传递给我的DoFn.process(),而DataflowRunner则传递context

我试图将其与最简单的代码隔离开来:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import uuid
import apache_beam.utils.pipeline_options
import apache_beam as beam

runner = 'DataflowRunner'
# runner = 'DirectRunner'

options = beam.utils.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'a' + str(uuid.uuid4())
gcloud_options.project = 'your-project'
gcloud_options.staging_location = 'gs://your-bucket/beam/staging'
gcloud_options.temp_location = 'gs://your-bucket/beam/temp'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = runner

p = beam.Pipeline(options=options)
(p
 | 'some_strings' >> beam.Create(tuple('asdfqwert'))
 | 'write_text' >> beam.io.WriteToText('strings', file_name_suffix='.txt')
 )
p.run().wait_until_finish()

全部产出:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
No handlers could be found for logger "oauth2client.contrib.multistore_file"
/Users/john/miniconda3/envs/py2/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:136: UserWarning: Using fallback coder for typehint: Any.
  warnings.warn('Using fallback coder for typehint: %r.' % typehint)
DEPRECATION: pip install --download has been deprecated and will be removed in the future. Pip now has a download command that should be used instead.
Collecting google-cloud-dataflow==0.5.1
  Using cached google-cloud-dataflow-0.5.1.tar.gz
  Saved /var/folders/v3/61xx4nnn6p36n5m9fp4qdwtr0000gn/T/tmpuCWoeh/google-cloud-dataflow-0.5.1.tar.gz
Successfully downloaded google-cloud-dataflow
Traceback (most recent call last):
  File "reproduce_bug.py", line 28, in <module>
    p.run().wait_until_finish()
  File "/Users/john/miniconda3/envs/py2/lib/python2.7/site-packages/apache_beam/runners/dataflow_runner.py", line 706, in wait_until_finish
    (self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
(70278eb56b40fd94): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 514, in do_work
    work_executor.execute()
  File "dataflow_worker/executor.py", line 899, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:26452)
    op.start()
  File "dataflow_worker/executor.py", line 191, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7575)
    def start(self):
  File "dataflow_worker/executor.py", line 196, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7480)
    with self.spec.source.reader() as reader:
  File "dataflow_worker/executor.py", line 206, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7425)
    self.output(windowed_value)
  File "dataflow_worker/executor.py", line 136, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:5749)
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "dataflow_worker/executor.py", line 83, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:3884)
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/executor.py", line 505, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:15525)
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 163, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:4862)
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 270, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7749)
    self.reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 281, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:8108)
    raise type(exn), args, sys.exc_info()[2]
  File "apache_beam/runners/common.py", line 268, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7660)
    self.old_dofn_process(element)
  File "apache_beam/runners/common.py", line 173, in apache_beam.runners.common.DoFnRunner.old_dofn_process (apache_beam/runners/common.c:5182)
    self._process_outputs(element, self.dofn_process(self.context))
  File "apache_beam/runners/common.py", line 152, in apache_beam.runners.common.DoFnRunner.__init__.lambda3 (apache_beam/runners/common.c:3640)
    self.dofn_process = lambda context: fn.process(context, *args)
TypeError: process() takes exactly 4 arguments (3 given) [while running 'write_text/Write/WriteImpl/WriteBundles']
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-02-13 15:08:20

您的环境似乎安装了0.5.1版本(请参阅堆栈跟踪的顶部),但是您正在使用python的头进行构建。

您可以创建一个具有正确版本的SDK的新的virtualenv环境。

  • 如果要针对Python运行,则需要在运行管道时设置sdk_location标志。
  • 如果要针对发布的版本运行,请使用pip install google-cloud-dataflow安装SDK,并正常运行管道。(最好在虚拟环境中使用virtualenv)

请注意,最好使用已发布的版本。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42189550

复制
相关文章
带有-i选项的sed命令在Linux上执行成功,但在MacOS上失败
就地编辑文件(如果提供了后缀,则进行备份),可见参数后缀 SUFFIX 是可选的,即带或者不带这个参数都可以执行。
程序熵
2023/09/25
3670
带有-i选项的sed命令在Linux上执行成功,但在MacOS上失败
await 只在 async 函数中工作
关于 promise 的一种更优雅的写法 async/await 中,await 只会出现在 async 函数中,我们使用 async/await 时,几乎不需要 .then,因为 await 为我们处理等待;但是在代码的顶层,当我们在 async 函数的外部时,我们在语法上是不能使用 await 的,所以通常添加 .then/catch 去处理最终结果或者 error。
前端开发博客
2020/11/04
1.5K0
await 只在 async 函数中工作
在IT开发中工作种类的分类
可能很多同学在学习python之前都听说过什么:前端程序员,后端程序员,安全工程师,运维,爬虫,全栈程序员等等各种各样的头衔名称,搞得大家都不知道该怎么选择了。我当初学编程之前也有过类似的经历,所以这里我尽可能给大家解释明白。
python自学网
2021/11/27
9550
在IT开发中工作种类的分类
一起聊聊工作中的功能安全测试
Tech    导读   本文旨在站在测试开发工程师的角度将功能安全测试归入日常测试中,简单剖析了功能安全测试与功能测试的异同点以及SDL中各环节的职责所在,同时分析了针对不同的安全场景如何进行功能安全测试用例的设计。通过本文,读者可以对SDL有一个简单明了的理解,针对SDL中各个环节,产研测的职责和关注点是什么都能有一个明确的概念。通过阅读本文,可以重点关注如何结合实际功能安全点设计符合需求的功能安全测试用例。 00前言    测试开发工程师一直想将安全测试真正融入测试工作中,在测试工作
京东技术
2022/03/04
1.2K0
[脑书笔记]《刻意练习》(中):在工作和生活中应用刻意练习!
美国在越战初期的空战水平是击落9架敌机,要损失10架战机;但是在越战后期可以达到结果12.5架敌机,才损失1架战机。帮助美军完成这个巨大改变的就是“王牌飞行计划”:每次训练结束就立刻召开战斗报告会,教练们针对训练中的问题给出有效的反馈,飞行员渐渐把学到的东西内化于心。 这种训练方法达到了惊人的效果,后来在海湾战争时期美军可以做到击落33架敌机才损失一架战机(我相信这里除了技能因素以外还有飞机等级的差异,但是不可否认这种惊人的数据肯定有飞行员的因素)
rocket
2018/09/14
8980
[脑书笔记]《刻意练习》(中):在工作和生活中应用刻意练习!
SignalR 在IE中无法工作 - Internet Explorer
运行基于SignalR的超线程上载器的代码,发现SignalR 在IE 9上居然没法工作了,提示如下: 提示很明显,需要json2.js的支持。 使用Nuget 搜索json2.js 并安装: 在引用
张善友
2018/01/29
3.3K0
SignalR 在IE中无法工作 - Internet Explorer
RPM索引在Artifactory中是如何工作
RPM是用于保存和管理RPM软件包的仓库。我们在RHEL和Centos系统上常用的Yum安装就是安装的RPM软件包,而Yum的源就是一个RPM软件包的仓库。JFrog Artifactory是成熟的RPM和YUM存储库管理器。JFrog的官方Wiki页面提供有关Artifactory RPM存储库的详细信息。
JFrog杰蛙科技
2020/07/03
2K0
RPM索引在Artifactory中是如何工作
在mysql中order by是怎样工作的?
还有一种就是通过rowId 排序(这种情况是当一行数据过大的时候) 直接上 流程图 :
袁新栋-jeff.yuan
2020/08/26
2.4K0
在mysql中order by是怎样工作的?
产品原型在工作推动中重要作用
从早期接触互联网产品相关工作,最初在需求传递的整个流程中,我们将想法页面上形成草图,表现形式上看到的是一系列由图片占位符、文字、线框、按钮等元件组成作为一个静态页面呈现。随着需求传递过程中沟通成本的提升,需求细节的严谨性、在与UI/UE的沟通中,需要对表现层之下按钮、布局、交互和元素的位置,进行优化设计布局,以达到这些元素在页面上的最佳体验效果,让用户在需要的时候,用户如何到达某个页面能快速找到所需位置,并且在他们在当前页面完成任务后,快速给出反馈和需要去的下一个页面这是产品需要思考的路径。
奔跑的小鹿
2022/12/20
5500
产品原型在工作推动中重要作用
CSReid库在NetCore工作场景中的使用
过去 .net 最有名望的 ServiceStack.Redis 早已沦为商业用途,在 .NETCore 中使用只能充值。后来居上的 StackExchange.Redis 虽然能用,但是之前出现的各种Timeout错误也是让人很无语,所以也不作为使用的首选。经过网上的一些整理和推荐,发现了一款开源库CSReidsCore。
happlyfox
2021/04/30
2K0
Kubernetes在IT工作搜索中占主导地位
根据就业板Dice的最新报告,Kubernetes是IT公司在2018年要求的主导技术技能。
CNCF
2019/12/05
9000
Kubernetes在IT工作搜索中占主导地位
在ONLYOFFICE12.5工作区中如何与他人共享文件夹
如何更高效的共享文件夹,这其实是很多企业日常办公中的痛点,不管是同事之间,还是上下游客户,多数都需要相互传输共享文件夹,小文件还好说,但是大文件就很难受了,基本都不能很方便的进行共享,接下来我让我们看看在ONLYOFFICE12.5工作区中如何与他人共享文件夹。
用户10264843
2023/03/28
1.4K0
transactionscope mysql_c# – 嵌套的TransactionScope在测试中失败
MySQL Connector和使用EntityFramework 4.3对MysqL 5.5.19数据库运行一些自动化测试.
全栈程序员站长
2022/09/15
2.1K0
在Hadoop系统中运行WordCount案例失败解决方法
报错提示: mapreduce.shuffle set in yarn.nodemanager.aux-services is invalid 请在yarn-site.xml中添加 <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.mapredu
指剑
2022/07/15
8780
详解HashMap在JAVA中的怎么工作的?
最简单形式的 hash,是一种在对任何变量/对象的属性应用任何公式/算法后, 为其分配唯一代码的方法。
挨踢小子部落阁
2023/03/16
6520
详解HashMap在JAVA中的怎么工作的?
尴尬:在Excel中为指定数据插入饼图失败
本来是非常非常简单的一个需求,即便不会,随便百度下也都有说明。 可自己却在一次紧急工作中因此耽误了时间,需求是需要插入一个饼图但因操作错误一直无法正确显示饼图数据,非常尴尬,干脆记录下这一刻。
Alfred Zhao
2022/06/16
1.7K0
在 JavaScript 中对象的深拷贝(及其工作原理)[每日前端夜话0x8F]
当你不想改变原始对象时,就需要克隆对象。例如,如果你有一个接受对象并改变它的函数,可能不想改变其原始对象。
疯狂的技术宅
2019/07/10
2.3K0
Shell在日常工作中的应用实践
Tech 导读 本文将从测试开发工作痛点出发,重在探讨Shell在日常工作中的实战应用,由浅入深,层层递进,将用户命令转化成计算机内核所能够理解的指令,逐步实现与操作系统的完美交互。另外,为了应对高频使用场景,Shell通过函数化封装来实现工具调用,避免陷入战术上勤奋的误区。
京东技术
2023/08/22
2300
Shell在日常工作中的应用实践
node-rdkafka在docker build中失败的解决方法
> @ start / > node app.js /node_modules/bindings/bindings.js:88 throw e ^ Error: /node_modules/node-rdkafka/build/Release/node-librdkafka.node: invalid ELF header at Object.Module._extensions..node (module.js:664:18) at Module.lo
bdcn
2018/09/12
3.3K0
MySQL中group by 与 order by 一起使用排序问题
没有得到我们需要的结果,这是因为group by 和 order by 一起使用时,会先使用group by 分组,并取出分组后的第一条数据,所以后面的order by 排序时根据取出来的第一条数据来排序的,但是第一条数据不一定是分组里面的最大数据。
星哥玩云
2022/08/18
1.8K0
MySQL中group by 与 order by 一起使用排序问题

相似问题

BatchElements批处理在DirectRunner和DataflowRunner中的工作方式不同(GCP/Dataflow)

12

Google :使用for解析时DirectRunner与DataFlowRunner的不同行为

18

为什么DirectRunner不执行管道中的步骤,而DataflowRunner却能工作

10

Apache束管道使用DirectRunner运行,但在初始读取步骤中使用DataflowRunner (SDK线束sdk-0-0断开连接)失败。

217

让Dataflowrunner与--experiments=upload_graph一起工作

243
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文