Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >AttributeError:'_DoFnParam‘对象没有属性'start’[在运行‘写到GCS 146’时‘]

AttributeError:'_DoFnParam‘对象没有属性'start’[在运行‘写到GCS 146’时‘]
EN

Stack Overflow用户
提问于 2021-05-20 09:14:18
回答 1查看 239关注 0票数 0

当我运行Beam程序时,我得到了以下错误。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
2021-05-20T17:04:42.166994441ZError message from worker: generic::unknown: 
Traceback (most recent call last): 
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process 
File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
File "apache_beam/runners/common.py", line 887, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
File "first.py", line 68, in process AttributeError: '_DoFnParam' object has no attribute 'start' 
During handling of the above exception, another exception occurred: 
Traceback (most recent call last): 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda> lambda: self.create_worker().do_instruction(request), request) 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1001, in process_bundle element.data) 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 229, in process_encoded self.output(decoded_value) 
File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output 
File "apache_beam/runners/worker/operations.py", line 358, in apache_beam.runners.worker.operations.Operation.output 
File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
File "apache_beam/runners/worker/operations.py", line 717, in apache_beam.runners.worker.operations.DoOperation.process 
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process 
File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process 
File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented 
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) 
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process 
File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
File "apache_beam/runners/common.py", line 887, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
File "first.py", line 68, in process AttributeError: '_DoFnParam' object has no attribute 'start' [while running 'Write to GCS-ptransform-146']

代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import argparse
import logging
import random
from datetime import datetime

import apache_beam as beam
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

class CustomPipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            "--output_path",
            type=str,
            help="Path of the output GCS file including the prefix.",
        )
class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path
    def process(self, custom_options, output_path, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        output_path = custom_options.output_path.get()
        filename = "-".join([output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body in batch:
                f.write("{}\n".format(message_body).encode("utf-8"))

def run(input_topic, num_shards, window_size):

    global custom_options
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    
    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Write to GCS" >> ParDo(WriteToGCS(custom_options.output_path))
        )

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    
    parser.add_argument(
            "--input_topic",
            help="The Cloud Pub/Sub topic to read from."
            '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
        )
    parser.add_argument(
            "--num_shards",
            default=5,
            type=int,
            help="Number of shards to use when writing windowed elements to GCS.",
        )
    parser.add_argument(
            "--window_size",
            default=1,
            type=int,
            help="Output file's window size in minutes.",
        )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.num_shards,
        known_args.window_size
        )
EN

回答 1

Stack Overflow用户

发布于 2021-06-02 13:41:59

在WriteToGCS DoFn中,您在process方法中声明DoFn将采用arg custom_optionsoutput_path,但是它们没有默认值,而且参数转换似乎会将WindowParam映射到错误的arg上。

您需要从process方法中删除未使用的参数,以使DoFn参数转换正确,可以在process中从**kwargs中传递和检索其他参数。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67625087

复制
相关文章
JavaScript 生态圈和技术趋势,这一年发生了这些变化 ...
大家好,我是 ConardLi, 拖了这么久,JS 生态圈最权威的调查报告 state-of-js 终于是出来了 ...
ConardLi
2022/04/08
6390
JavaScript 生态圈和技术趋势,这一年发生了这些变化 ...
c# 判断文件是否发生了变化
监控d盘下的所有.txt文件的修改 { FileSystemWatcher watcher = new FileSystemWatcher(); try { watcher.Path = @"d:\Test"; } catch (ArgumentException e) { Console.Write
zls365
2020/08/19
9490
字段加密后长度变化
加密算法有哪些 非对称加密算法:RSA,DSA/DSS 对称加密算法:AES,RC4,3DES HASH算法:MD5,SHA1,SHA256 数据加密的长度变化 加密流出 字符串转换成 bytes -> padding -> base64 编码 将字符串转换成 byte 根据字符串所占长度不同,长度会扩充到不同倍数 ascii 码的字符,如数字字母等,每个字符只占一个字节,长度不扩充 正常情况下,汉字等 unicode 编码,一个字符占 3 个字节,长度扩充3倍 如果是 mysql 中的 utf
王小明_HIT
2021/07/05
2.3K0
Flutter 中键盘弹起时,Scaffold 发生了什么变化
最近刚好有网友咨询一个问题,那就顺便借着这个问题给大家深入介绍下 Flutter 中键盘弹起时,Scaffold 的内部发生了什么变化,让大家更好理解 Flutter 中的输入键盘和 Scaffold 的关系。
GSYTech
2021/03/02
2.2K0
Flutter 中键盘弹起时,Scaffold 发生了什么变化
MySQL从5.5升级到5.6,TIMESTAMP的变化
本文介绍了MySQL从5.5升级到5.6后,TIMESTAMP字段的变化。在MySQL5.5中,TIMESTAMP默认使用UTC时区,并且不支持NULL值。而在MySQL5.6中,TIMESTAMP支持了更多的默认值,并支持了NULL值。但是,在MySQL5.6中,TIMESTAMP的行为变得更为诡异,需要使用explicit_defaults_for_timestamp参数来控制。总的来说,升级到MySQL5.6后,需要更加小心地处理TIMESTAMP字段,以避免出现数据异常等问题。
高爽
2017/12/28
1.1K0
战“疫”期间,移动直播行业都发生了哪些变化?
国内疫情逐渐被控制,正往好的方向发展。在此之前,疫情对各行各业都产生了巨大影响。尤其是互联网相关行业,疫情以来,网民对移动互联网的依赖加大,根据Questmobile发布的数据来看,互联网的使用市场比日常增加21.5%。
云豹通讯员
2020/03/12
6000
战“疫”期间,移动直播行业都发生了哪些变化?
7月份,腾讯云云函数发生了这些变化
7月份,我们做了这些优化。 1. HTTP Service内测发布 HTTP Service支持通用框架的无缝迁移,已有代码直接迁移Serverless架构。它具备高性能、无运行时长限制的特性,让您保留原有的开发习惯。并集成域名,默认提供了三级子域名,省去域名注册和备案等复杂流程,自动配置解析,天然集成SSL证书。 同时,可以通过控制台或者CLI快速上传部署代码,支持Websocket,可以轻松搭建聊天室、小游戏等业务。可点击如下链接了解详情:https://cloud.tencent.com/docum
腾讯云serverless团队
2019/08/16
3.7K0
7月份,腾讯云云函数发生了这些变化
浏览器输入URL后发生了什么
在网络世界,你肯定记得住网站的名称,但是很难记住网站的 IP 地址,因而也需要一个地址簿,就是 DNS 服务器。DNS 服务器是高可用、高并发和分布式的,它是树状结构,如图:
前端迷
2020/02/19
4.3K0
浏览器输入URL后发生了什么
部署hexo后样式丢失问题
如果遇到hexo部署到gitee后样式丢失,并且控制台没有报错的话,可以修改hexo配置文件
阿超
2022/08/16
1.5K0
部署hexo后样式丢失问题
Linux 系统开机加电后发生了什么?
电脑启动后,CPU逻辑电路被设计为只能运行内存中的程序,没有能力直接运行存在于软盘或硬盘中的操作系统,如果想要运行,必须要加载到内存(RAM)中。
用户8826052
2022/03/02
2K0
linux系统开机加电后发生了什么?
电脑启动后,CPU逻辑电路被设计为只能运行内存中的程序,没有能力直接运行存在于软盘或硬盘中的操作系统,如果想要运行,必须要加载到内存(RAM)中。
用户7686797
2021/09/29
2.5K0
【报告推荐】2022年,消费趋势发生了哪些重大变化?
旷日持久的疫情,深刻改变了整个消费市场的全貌,更重要的是改变了消费者对于未来的“预期”。根据埃森哲研究报告,消费倾向持续走低,预防性储蓄持续增长,消费更加理性。对比2017年,曾经盛极一时的“月光族”概念悄然淡出历史舞台。 对企业来说,也告别了一度野蛮生长的年代,转而开始精细化运营。与经济发展初期大家追求仅仅盲目财富不同,如今的消费者更加成熟,开始更加关注家庭,健康以及事业。Work Life Balance重新回到了大众视线,也就是更关注“生活”本身。包括最近不少通过我们发布招聘信息的企业中,不少也开始把
iCDO互联网数据官
2022/06/09
4770
【报告推荐】2022年,消费趋势发生了哪些重大变化?
如何升级到 React 18发布候选版
上周 react 官网 发布了 react@rc 版本,该版本是候选版本(Release Candidate),这意味 API 已经基本稳定,跟最终版本不会有太大差别,官网也发布博客《如何升级到 react18 RC 版本》,鼓励大家尝试升级,所以我们可以在项目组中使用了!下面内容来自是官方文档的翻译。
狂奔滴小马
2022/03/30
2.4K0
如何升级到 React 18发布候选版
睡眠剥夺后皮层微结构的广泛变化
大脑皮层的微观结构受到昼夜节律和睡眠剥夺的影响,但这些影响的确切基础尚不清楚。t1加权和t2加权磁共振图像之间的比率(T1w/T2w比率)与髓磷脂水平和树突密度有关,这可能为研究睡眠剥夺大脑的皮质内微观结构提供新的见解。在这里,我们检测了41名健康青年(26名女性)在睡眠剥夺(n = 18)或正常睡眠-觉醒周期(n = 23)前后32小时的皮质内T1w/T2w比值。线性模型显示,在32小时后,四组患者的T1w/T2w比值变化有显著的组间差异,包括岛叶、扣带回和颞上皮质的双侧效应,包括涉及注意、听觉和疼痛处理的区域。在整组中,睡眠剥夺组T1w/T2w比值增加,而正常睡眠-觉醒组T1w/T2w比值降低。这些变化不能用扫描仪内头部运动来解释,在调整皮质厚度和水合作用后,簇间95%的影响仍然显著。与正常的睡眠-觉醒周期相比,32小时的睡眠剥夺使皮质内T1w/T2w比值增加。虽然本研究检测到的皮质内变化可以反映髓磷脂或树突密度的变化,或两者的变化,但需要组织学分析来明确具体的皮质过程。
悦影科技
2022/08/18
2750
hive sql(五)—— 按照时间轴顺序, 发生了状态变化的数据行
https://blog.csdn.net/luo981695830/article/details/111211773
大数据最后一公里
2021/08/05
1.1K0
超融合三年,用户对它的期待发生了哪些变化?
从2016年开始,著名研究机构Evaluator Group就在全球范围内针对企业级超融合基础设施(Hyper-Converged Infrastructure,简称“HCI”)的应用展开了调查,如今已是第三年。
用户6543014
2019/10/27
4600
自 Adam 出现以来,深度学习优化器发生了什么变化?
如果将 Adam 优化出现以来产生的关于优化过程的有趣想法按时间顺序排列的话,结果如下:
机器之心
2018/12/25
9560
自 Adam 出现以来,深度学习优化器发生了什么变化?
自2013到2019年大数据领域发生了什么变化
在网上看到这篇文章之后发现还挺有意思,文章也算比较简短,就试着联系了一下作者说我想把他翻译成中文,不做商业用途只是练习和技术布道。作者的回应也非常快,当晚就给我回复,所以就有了这篇翻译,如果翻译有不准确的地方还请大家指出。
黑光技术
2019/02/21
3.9K0
自2013到2019年大数据领域发生了什么变化
点击加载更多

相似问题

使用javascript的铯多边形回调

112

如何使用铯中的可用性间隔来定义多边形实体?

17

将HTML元素放在铯实体的位置上

12

铯中的解析实体

10

铯:基于位置数组动态地改变实体的位置。

11
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文