在我的kafka streams应用程序中创建状态存储时,我得到了这个错误Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1。下面是应用程序的完整堆栈跟踪
[2016-08-30 12:43:09,408] ERROR [StreamThread-1] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group string-monitor failed on partiti
当在只有一个Kafka代理的测试设置中启动我们的Kafka Streams应用程序时,我们看到以下错误,大约15次运行中就有1次:
org.apache.kafka.streams.errors.StreamsException: Existing internal topic alarm-message-streams-by-organization-repartition has invalid partitions: expected: 32; actual: 12. Use 'kafka.tools.StreamsResetter' tool to clean up i
我正在运行一个卡夫卡流消费者,它试图从合流云中消费Avro记录。我一直收到错误:Error retrieving Avro unknown schema for id 100007和unauthorized; error code: 401。我在src/main/resources/中有一个src/main/resources/文件。
以下是我的错误信息:
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 100007
at io.conf
我正在访问一个状态存储来查询它,并且不得不用一个try/catch块包装store()语句来重试它,因为有时我会得到这个异常:
org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store customers-store because the stream thread is PARTITIONS_REVOKED, not RUNNING
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvid
我正在做一个卡夫卡消费者项目。最近我们在PROD环境中部署了它。在那里,我们遇到了如下问题:
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - No. of records fetched: 1
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, gr
我有一个卡夫卡流应用程序版本- 0.11,它从几个主题中获取数据,并将数据加入到另一个主题中。
Kafka配置:
5 kafka brokers - version 0.11
Kafka Topics - 15 partitions and 3 replication factor.
每小时消耗/产生数以百万计的记录。每当我把卡夫卡经纪人打倒,它就会抛到异常之下:
org.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10
at o
我们在生产中有一个由4个节点组成的集群。我们观察到,其中一个节点遇到了一种情况,它不断地收缩和扩展ISR超过1个小时,并且无法恢复,直到代理被弹回。
[2017-02-21 14:52:16,518] INFO Partition [skynet-large-stage,5] on broker 0: Shrinking ISR for partition [skynet-large-stage,5] from 2,0 to 0 (kafka.cluster.Partition)
[2017-02-21 14:52:16,543] INFO Partition [skynet-large-st
我有一个简单的生产者-消费者设置:1个生产者(作为线程)和2个消费者(作为2个进程)。producer的run方法:
def run(self):
producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,
api_version=(0, 10))
while not self.stop_event.is_set():
self.logger.info("Checking for ne
我有一个KafkaStream应用程序,它过去工作得很好。现在,不管我用新的Application id重新启动了多少次,它都不会开始使用这个主题,并且我收到了以下日志:
INFO org.apache.kafka.streams.KafkaStreams stream-client [score_redeemX-7e37d43e-12ab-4c66-984e-5f959d4e5e08] State transition from REBALANCING to RUNNING
INFO org.apache.kafka.clients.consumer.KafkaConsumer [Consu
我有一个1.0.0的kafka流应用程序,有两个类,在更新。在我的应用程序中,我读取事件,执行一些条件检查,然后转发到另一个主题中的相同kafka。在我的评估过程中,我在全局表存储的帮助下从Kafka中获得了一些表达式。观察到大部分时间是在从store获取值时花费的(示例代码如下所示)。
是否只从Kafka读取一次,并在本地商店进行维护?或者当我们调用org.apache.kafka.streams.state.ReadOnlyKeyValueStore.get(String密钥时,它是从Kafka中读取的吗?如果是,那么如何维护本地存储,而不是每次都从Kafka读取?
请帮帮忙。
例如:
p
我正在使用SpringBoot应用程序的Kstreams。我已经添加了下面给出的代码来处理流的关闭。
KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void