是否可以在Beam中应用嵌套查询?我试图在Beam中运行这样的查询,但得到了错误。
我正在运行的查询是:
PCollection<BeamRecord> Query_Output = Query.apply(
BeamSql.queryMulti("Select Orders.OrderID From Orders Where Orders.CustomerID IN (Select Customers.CustomerID From Customers WHERE Customers.CustomerID = 2)"));
我得到的错误是:
在测试Beam时,我使用了Github的模型类示例,运行本地机器( POJo on Local,DirectRunner)的示例运行良好,但在使用DataflowRunner运行时异常失败。
例外:
java.lang.IllegalArgumentException: Unable to encode element 'com.test.Customer1@523377ea' with coder 'org.apache.beam.sdk.schemas.SchemaCoder@2574fe3c'.
at org.apache.beam.sdk.cod
我正在运行这个beam tutorial的简化版本,但是在我的本地机器上使用DirectRunner运行它。 import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
import os
with beam.Pipeline() as p:
rows = (p |
beam.Create([
beam.Row(col1="val1", col2="col2_val1"),
我正在尝试使用apache中可用的jdbcio连接器从mysql读取一些数据。由于我想使用默认的扩展服务,按照文档:,我安装了Java并确保java命令是available.And,然后我尝试运行以下代码。
我使用的是Python版本3.9.6和apache版本2.35.0
from apache_beam.io.jdbc import ReadFromJdbc
import apache_beam as beam
with beam.Pipeline() as p:
result = (
p
| 'Read from jdbc' >>
我有以下代码:
PCollection<KV<String, Cell>> first = ...;
PCollection<String> lines = first
.apply("Build lines", Combine.<String, Cell, String>perKey(new MergeCellsFn()))
;
Cell看起来像这样:
public class Cell {
public final int index;
public final String value;
publ
运行Apache后,我成功地运行了12天(11月5-17日)的流管道,然后DataFlow作业停止了数据处理。我在接触AI平台预测时看到了SSL错误,DataFlow显示:
Processing stuck in step <step_id> for at least <time_interval> without outputting or completing in state finish at <stack_trace>
通过处理SSL异常就足够了吗?在DataFlow中防止这种死锁的最佳方法是什么。
相关文章
版本
Streaming Job
Py
我正在使用Apache beam和Cloud Dataflow Runner,当我尝试运行我的管道时,得到了以下异常:
java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkState(ZLjava/lang/String;Ljava/lang/Object;)V
at org.apache.beam.sdk.io.gcp.pubsub.PubsubClient$TopicPath.getV1Beta1Path(PubsubClient.java:264)
at org.apache.beam.runner
尝试在GCP云函数中创建GCP数据流。我已经部署了一个简单的apache函数,它工作得很好,但是当我试图读取文件时会出现路径错误。当我使用参数-runner从本地运行时,与Dataflowrunner一样,相同的脚本运行,有人建议我必须执行pip安装apache-beamgcp。我已经在当地做过了,而且效果很好。如果我试图在GCP中安装它,它会在一段时间后进行会话超时。下面是我的密码。
#import print library
# This script will read all avro files on a path and print them
import logging
imp
我正在尝试通过数据流模板"Pub/Sub Avro to Bigquery“将数据从Pub/Sub流式传输到Bigquery。Pub/Sub中的数据是AVRO格式的,来自Kafka主题。我从模式注册表中获得的相应模式文件。它看起来是这样的: {"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":
使用“数据流作业”从bigQuery表读取数据时,要尽量避免集合中的重复。对于这一点,使用beam.sdk.transforms.Distinct来读取带有distinct的记录。但却在错误之下
java.lang.IllegalStateException: the keyCoder of a GroupByKey must be deterministic
at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:193)
at org.apache.beam.sdk.transforms.Gro
Java版本:8 Runner:数据流
在我升级到beam 2.5.0之前一切正常。例外:
Exception in thread "main" java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMet
我正试图把MutationGroups变成SpannerIO的扳手。我们的目标是每10秒编写一次新的MuationGroups,因为我们将使用扳手来查询接近时间的KPI。
当我不使用任何窗口时,我会得到以下错误:
Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.tr