我们正在使用低级处理器API开发一个Kafka Streams应用程序。 根据Kafka上的文档,所有的线程和并行性都是由Stream线程和流任务处理的。使用主题上的分区,并行性也是可扩展的。 当前代码如下所示: public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Do processing on the stream thread itself
...
// Write b
Kafka消费者API提交返回null偏移量。不确定出了什么问题。 val partitions = new util.HashSet[TopicPartition]
for (partitionInfo <- consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}
consumer.assign(partitions)
val latestOffset= consumer.committed(part
我正在使用SpringBoot应用程序的Kstreams。我已经添加了下面给出的代码来处理流的关闭。
KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void
当我在对一个老话题做一些测试时,我发现了一些奇怪的行为。读了一下卡夫卡的日志,我注意到这条“删除了8个过期的偏移量”的信息:
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)
这是一个基于Armando Ballaci提供的答案的"Where do zookeeper store Kafka cluster and related information?“的后续问题。 现在很明显,消费者偏移量存储在Kafka集群中的一个名为__consumer_offsets的特殊主题中。很好,我只是想知道这些偏移量的检索是如何工作的。 主题不像RDBS,我们可以在RDBS上根据某个谓词查询任意数据。例如,如果数据存储在RDBMS中,下面这样的查询可能会获得某个消费者组的特定消费者的特定主题分区的消费者偏移量。 select consumer_offset__read,