当我运行Beam程序时,我得到了以下错误。
2021-05-20T17:04:42.166994441ZError message from worker: generic::unknown:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 887, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "first.py", line 68, in process AttributeError: '_DoFnParam' object has no attribute 'start'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda> lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1001, in process_bundle element.data)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 229, in process_encoded self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 358, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 717, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 887, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "first.py", line 68, in process AttributeError: '_DoFnParam' object has no attribute 'start' [while running 'Write to GCS-ptransform-146']
代码:
import argparse
import logging
import random
from datetime import datetime
import apache_beam as beam
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--output_path",
type=str,
help="Path of the output GCS file including the prefix.",
)
class WriteToGCS(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, custom_options, output_path, window=DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
output_path = custom_options.output_path.get()
filename = "-".join([output_path, window_start, window_end, str(shard_id)])
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body in batch:
f.write("{}\n".format(message_body).encode("utf-8"))
def run(input_topic, num_shards, window_size):
global custom_options
# Set `save_main_session` to True so DoFns can access globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Write to GCS" >> ParDo(WriteToGCS(custom_options.output_path))
)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from."
'"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
)
parser.add_argument(
"--num_shards",
default=5,
type=int,
help="Number of shards to use when writing windowed elements to GCS.",
)
parser.add_argument(
"--window_size",
default=1,
type=int,
help="Output file's window size in minutes.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.num_shards,
known_args.window_size
)
发布于 2021-06-02 13:41:59
在WriteToGCS DoFn中,您在process方法中声明DoFn将采用arg custom_options
和output_path
,但是它们没有默认值,而且参数转换似乎会将WindowParam映射到错误的arg上。
您需要从process方法中删除未使用的参数,以使DoFn参数转换正确,可以在process中从**kwargs
中传递和检索其他参数。
https://stackoverflow.com/questions/67625087
复制相似问题