我正在使用数据流kafka到bigquery模板。启动数据流作业后,它会在队列中停留一段时间,然后失败,出现以下错误:
Error occurred in the launcher container: Template launch failed. See console logs.
在查看日志时,我看到以下堆栈跟踪:
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:192)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
a
无模式的Bigquery接收器连接器SMT无法在布尔值上将数据保存到bigquery。
MapsUtil.debugPrint on recordValue,然后从apply(R record)返回。
active = true java.lang.String
模式定义
{
"mode": "NULLABLE",
"name": "active",
"type": "BOOLEAN"
}
去列剂
public class BooleanDeserialiser ex
你好,我正在尝试运行一个作业来读取GCP外部托管的kafka中的一些事件,这个作业是在VPC网络上运行的。
问题是kafka被配置为使用主机名而不是IPs来回答问题,因此在引导程序中指定ips会导致在数据流上运行作业时无法连接到目标节点。
Reader-1: Timeout while initializing partition 'placeholder'. Kafka client may not be able to connect to servers.
另一方面,如果我用kafka创建一个VM,并在etc/host中指定映射主机名-ip,我就能够正确地使用它。
为了
我试图每120秒将从Kafka提取的数据写入Bigquery表。我想做一些额外的操作,通过文档可以在.foreach()或foreachBatch()方法中实现这些操作。
作为一个测试,我想打印一条简单的信息,每次数据从卡夫卡提取并写入BigQuery。
batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime='120 seconds') \
.foreachBatch(print("do i get printed every batch?"))
.format("bigque
当源主题分区计数= 1时工作正常。如果我将分区增加到任何大于1的值,我会看到下面的错误。既适用于低级,也适用于DSL API。有什么建议吗?可能会遗漏什么?
org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410)
at org.apach
我们使用selectKey()来更改密钥。在我们迁移到IBM Cloud上的新标准计划事件流之前,它可以很好地工作。然后我们在下面遇到了异常。它说我们的主题retentions.ms不适合范围3600000..2592000000。所以我想知道我们怎样才能解决这个问题。
谢谢,
[WARNING]
org.apache.kafka.streams.errors.StreamsException: Could not create topic employeeFilter-KSTREAM-KEY-SELECT-0000000047-repartition.
at org.apache.k
这是这个问题的后续问题:Kafka Connect BigQuery Sink Connector requests incorrect subject names from the Schema Registry 当尝试在我们的Kafka (Avro)事件上使用confluentinc/kafka-connect-bigquery时,我遇到了以下错误: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic domain.rating.annotated to Avro:
查询:将路由器接口作为kafka集群的生产者。
问题:我的路由器接口正在尝试将数据推送到运行kafka的端口。(默认为9092)。
问题1但是kafka broker可以在不创建主题的情况下接受这些数据吗?
问题2. kafka消费者可以在不指定主题的情况下拉取数据吗?
If yes, How ?
If not, What is work around this and how can i achieve this ?
第一次编辑:
我刚刚检查了Kafka broker配置是否有"auto.create.topics.enable“字段。如果我设置为true,