前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka消费端消费失败后怎么做后续处理?

kafka消费端消费失败后怎么做后续处理?

作者头像
用户4283147
发布2022-10-27 11:49:49
3.8K0
发布2022-10-27 11:49:49
举报
文章被收录于专栏:对线JAVA面试
代码语言:javascript
复制
@KafkaListener(topics = {"${kafka.topic.topicB}"}, groupId = "groupB")
    public void consumeTopicB(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
      

        Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());

        if(kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get(); 

            /*
             * 执行消费逻辑处理的代码,
             *
             */
            
          acknowledgment.acknowledge();// 消费成功后手动提交offset

            logger.info("消费者B消费topicB:{} partition:{}的消息 -> {}", consumerRecord.topic(), consumerRecord.partition(),message);

        }

比如在上面的消费逻辑处理过程中,失败了。那么此条消费要怎么处理呢?我是设置手动提交offset的。 第一种方案: 如果失败了以后,把失败的数据存入到数据库中,然后在提交offset。然后后续在定时的从数据库中把失败的数据再次发送到对应的topic下,等待下次的消费。

但是这样的话有个问题,比如某条消息一直失败,不可能无限重复上面的操作吧? 所以我想的是在消息模型中添加一个失败重试次数属性:

代码语言:javascript
复制
public class KafkaMsg  implements Serializable {

    private static final long serialVersionUID = -1532915942422600087L;

    private String msgId;
    private  String  content;
    private  Integer  retryTime;  // 重试次数记录
    public String getMsgId() {
        return msgId;
    }

    public String getContent() {
        return content;
    }

    public void setMsgId(String msgId) {
        this.msgId = msgId;
    }

    public void setContent(String content) {
        this.content = content;
    }

     public Integer getRetryTime() {
        return retryTime;
    }

     public Integer setRetryTime(Integer time) {
         this.retryTime = time;
    }

    @Override
    public String toString() {
        return "KafkaMsg{" +
                "msgId='" + msgId + '\'' +
                ", content='" + content + '\'' +
                '}';
    }
}

然后消费失败后,先记录一下重试次数再把它存入数据库,然后定时再次发送到topic时,先判断它的重试次数是否达到上限,没有就再次写入topic等待再次被消费

其实不光是Kafka还有rabbitmq消费端消费失败后,重试也可以使用这样的方式处理。

第二种方案:

消费失败后把消息转发到另一个主题中,然后对于失败的消息你想怎么处理都可以,入库,写文件,程序处理都随你便,相当于 rabbitmq 的死信队列

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-03-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 对线JAVA面试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档