光束作业的流水线给出了以下异常
java.lang.RuntimeException: java.lang.RuntimeException: Exception while fetching side input:
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunn
我正在使用GCP数据流运行Apache Beam管道,并从worker那里获得了以下错误:
Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: Got poison pill or timeout but stream is not done
两分钟内就有了。
我正在使用管道将消息从PubSub写到BigQuery。在管道中,当将PubSub消息转换为TableRow时,我使用的是FailsafeElement<PubsubMessage, String>,并
我正在尝试通过数据流模板"Pub/Sub Avro to Bigquery“将数据从Pub/Sub流式传输到Bigquery。Pub/Sub中的数据是AVRO格式的,来自Kafka主题。我从模式注册表中获得的相应模式文件。它看起来是这样的: {"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":
我刚开始使用scio和dataflow。尝试将我的代码转换为一个输入文件,运行良好。但是当我向输入中添加更多的文件时,得到了以下异常: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null String
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
at org.apache.beam.runners.core.Si
我有一个数据流,它向API发出请求以检索一些数据。最近API中的密码有了更新,数据流突然开始失败。我使用的是java 1.8和beam SDK 2.19.0。同样的代码在本地运行时也可以正常运行。我尝试升级到java 11和same SDK 2.24.0,以防我使用的版本不支持新的密码,但我得到了相同的结果,它在本地运行,但我在数据流中得到了相同的错误。这是我用来向API发出请求的代码:
URL url = new URL(urlString);
HttpsURLConnection con = null;
outer: for (int ret
我有一条管道如下:
import base64
import gzip
import logging
import apache_beam as beam
import data.build.common.v1.common_pb2 as common_pb2
from data.pipeline.steps.console_proto_list import CONSOLE_PROTO_LIST
from google.protobuf.message import DecodeError
class GetClearMessage(beam.DoFn):
def process
我使用AvroIO.<MyCustomType>writeCustomTypeToGenericRecords()将通用记录写入流数据流作业中的GCS。在前几分钟,一切似乎都正常,但是,大约10分钟后,作业开始抛出以下错误:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: org.apache.avro.AvroRuntimeException: not open
com.google.cloud.dataflow.worker.GroupAlsoByWindowsPar
我正在尝试使用Apache读取RabbitMQ队列。我编写了一些转换代码,以便将消息写入Kafka。我甚至用简单的文本消息测试了我的场景。
现在,我尝试使用这个转换器所要运行的有效消息来部署它。这些是相当中等大小的JSON消息。
奇怪的是,当我试图读取“生产”消息时,我会得到这个异常堆栈跟踪。
java.lang.IllegalArgumentException: Unable to encode element 'ValueWithRecordId{id=[], value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@f179a7f
目前,我正在尝试从管道将数据提交到防火墙。
但我一直在犯错误:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 609, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, i
在流式处理过程中,我尝试将json文件发布到pubsub,并使用cloud Dataflow写入数据存储。
from __future__ import absolute_import
import apache_beam as beam
import json
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from google.cloud.p
当我通过以下步骤使用数据流时:- Read from bigquery - Convert table row to json string - Insert to elasticsearch (7.5.2)它看起来可以很好地处理大约100k的记录,但在实际(8m记录~65 im )的数据流中,300k插入后会抛出异常。
Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffl
运行Apache后,我成功地运行了12天(11月5-17日)的流管道,然后DataFlow作业停止了数据处理。我在接触AI平台预测时看到了SSL错误,DataFlow显示:
Processing stuck in step <step_id> for at least <time_interval> without outputting or completing in state finish at <stack_trace>
通过处理SSL异常就足够了吗?在DataFlow中防止这种死锁的最佳方法是什么。
相关文章
版本
Streaming Job
Py
我们的SDK版本是Apache Beam Python 3.7 SDK 2.25.0。 有一个从Pub/Sub读取数据、转换数据并将结果保存到GCS的管道。通常它在1-2周内工作良好。在那之后,它就变得糟糕了。 "Operation ongoing in step s01 for at least 05m00s without outputting or completing in state process
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.par