我创建了一个简单的Apache光束流管道,它从Kafka读取数据,进行一些处理,并通过调用一些外部服务的API来持久化结果。我希望确保在管道重启或失败期间不会丢失数据,因此我希望在特定doFun执行结束时成功调用接口后,手动将记录偏移量提交给Kafka。在我之前的Kafka经验中,我知道通过使用Kafka Co
如果我需要使用特定消费者组的最新提交的偏移量(将在startingOffset中使用Spark结构化流),我应该使用什么。 val last=consumer.committed(partition) <groupId>org.apache.kafka此偏移量充当该分区内记录的唯一标识符,并表示使用者在</em
需要从卡夫卡主题中从特定的偏移量中获取消息如果我不使用assign(),那么使用者就不会执行“查找”,因为这是一个延迟操作实际用途:需要从预先确定的偏移量到结束时在主题上迭代消息.这个预先确定的偏移量是在markOffset()计算的。java.lang.IllegalStateException: Subscription to topics, part
我想从一个特定的分区和特定的偏移量消费。我抬头一看,发现有一个seek方法,但它抛出了一个异常。有没有人有类似的用例或解决方案?), 4);java.lang.IllegalStateException: No current assignment for partition mytopic-1
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.j