首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使应用程序自恢复从反应堆卡夫卡的RetriableCommitFailedException引起的请求超时?

如何使应用程序自恢复从反应堆卡夫卡的RetriableCommitFailedException引起的请求超时?
EN

Stack Overflow用户
提问于 2020-08-05 01:02:48
回答 1查看 774关注 0票数 1

我有一个卡夫卡处理器是这样定义的。

代码语言:javascript
运行
复制
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import reactor.kafka.receiver.ReceiverRecord
import reactor.kotlin.core.publisher.toMono
import reactor.util.retry.Retry
import java.time.Duration
import java.util.*

@Component
class KafkaProcessor {
    private val logger = LoggerFactory.getLogger(javaClass)
    
    private val consumerProps = hashMapOf(
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::javaClass,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::javaClass,
        ConsumerConfig.GROUP_ID_CONFIG to "groupId",
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"
    )

    private val receiverOptions = ReceiverOptions.create<String, String>(consumerProps)
        .subscription(Collections.singleton("some-topic"))
        .commitInterval(Duration.ofSeconds(1))
        .commitBatchSize(1000)
        .maxCommitAttempts(1)

    private val kafkaReceiver: KafkaReceiver<String, String> = KafkaReceiver.create(receiverOptions)

    @Bean
    fun processKafkaMessages(): Unit {
        kafkaReceiver.receive()
            .groupBy { m -> m.receiverOffset().topicPartition() }
            .flatMap { partitionFlux ->
                partitionFlux.publishOn(Schedulers.elastic())
                    .concatMap { receiverRecord ->
                        processRecord(receiverRecord)
                            .map { it.receiverOffset().acknowledge() }
                    }
            }
            .retryWhen(
                Retry.backoff(3, Duration.ofSeconds(1))
                    .maxBackoff(Duration.ofSeconds(3))
                    .doBeforeRetry { rs ->
                        logger.warn("Retrying: ${rs.totalRetries() + 1}/3 due to ${rs.failure()}")
                    }
                    .onRetryExhaustedThrow { _, u ->
                        logger.error("All ${u.totalRetries() + 1} attempts failed with the last exception: ${u.failure()}")
                        u.failure()
                    }
            )
            .subscribe()
    }
    
    private fun processRecord(record: ReceiverRecord<String, String>): Mono<ReceiverRecord<String, String>> {
        return record.toMono()
    }
}

有时,我会犯这个错误。

代码语言:javascript
运行
复制
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed out.

第一次重试看起来是这样的。

代码语言:javascript
运行
复制
Retrying: 1/3 due to org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets

第二个和第三个看起来是这样的。

代码语言:javascript
运行
复制
Retrying: 2/3 due to reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable

Retrying: 3/3 due to reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable

一旦所有3次重试都用尽了,消息就会如下所示。

代码语言:javascript
运行
复制
All 4 attempts failed with the last exception: reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable 

当我得到该错误时,我需要重新启动应用程序,以便重新连接到Kafka broker并提交记录。

我知道,将maxCommitAttempts设置为1意味着一旦它到达RetriableCommitFailedException,就不会再重试。我认为,我在retryWhen函数末尾添加的processKafkaMessages()子句可以完成这个任务,这样管道就可以自己恢复。

我设置maxCommitAttempts的原因是因为它没有被讨论过的带有退避的重试,默认的100次最大提交尝试是在10 is内完成的。所以,我认为我应该写我自己的重试逻辑与退却。

问题是,对于自动提交,我应该如何使用退避进行重试?是否可以使用EmbeddedKafka编写单元测试?

语言:Kotlin

反应堆卡夫卡库:io.projectreactor.kafka:reactor-kafka:1.2.2.RELEASE

EN

回答 1

Stack Overflow用户

发布于 2022-03-16 14:18:16

retryWhen()只是尝试重新订阅。由于卡夫卡消费者处于错误状态,它将拒绝重新订阅.您需要推迟kafkaReceiver.receive()调用,因此:

代码语言:javascript
运行
复制
Flux.defer(() -> kafkaReceiver.receive())
            .groupBy { m -> m.receiverOffset().topicPartition() }
// etc

因此,重新订阅将再次调用kafkaReceiver.receive()并创建一个新的使用者。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63256984

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档