前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >go: 优雅处理kafka消费退出

go: 优雅处理kafka消费退出

作者头像
超级大猪
修改2023-11-30 15:57:48
4560
修改2023-11-30 15:57:48
举报
文章被收录于专栏:大猪的笔记大猪的笔记

在业务中,kafka的消费者服务非常常见。主要流程是从kafka中取出消息,处理消息。

本文使用kafka-go(github.com/segmentio/kafka-go),调研kafka优雅退出的方式和注意事项。

在这之前,先准备一个多 partitions的 kafka作为实验环境。

代码语言:txt
复制
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic testkafka # 创建kafka topic
kafka-topics.sh --delete --bootstrap-server localhost:9092  --topic testkafka # 删除topic

写入端

代码语言:txt
复制
func WriteMsg2Kafka() {
	kafkawriter := &kafka.Writer{
		Addr:         kafka.TCP("127.0.0.1:9092"),
		Topic:        "testkafka",
		Balancer:     &kafka.RoundRobin{},
		Async:        true,
		Compression:  kafka.Snappy,
		RequiredAcks: kafka.RequireOne,
		Completion: func(messages []kafka.Message, err error) {
			if err != nil {
				logrus.Errorf("write kafka error:%v", err)
			}
		},
		ErrorLogger: logrus.StandardLogger(),
	}
	i := 0
	for {
		i += 1
		kafkawriter.WriteMessages(context.Background(), kafka.Message{
			Value: []byte(fmt.Sprint(i)),
		})
		logrus.Infof("send msg")
		time.Sleep(time.Millisecond * 10)
	}
}

为了方便观察现象,写入端会向kafka中顺序写入 1、2、3...。这样在消费者就能知道是否丢失了消息。

消费者

代码语言:txt
复制
func ReadKafkaWithKafkago(ctx context.Context, task string) {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:        []string{"127.0.0.1:9092"},
		Topic:          "testkafka",
		CommitInterval: time.Second, // 重要的配置,如果不配置将严重影响写入性能
		GroupID:        "test1",
		// Partition: partition.ID,
		// MaxWait: time.Millisecond * 500,
		// Logger:        logrus.StandardLogger(),
		QueueCapacity: 50,
	})

loop:
	for {
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
				logrus.Infof("kafka get context canceled:%v", err)
				break loop
			}
			if errors.Is(err, io.EOF) {
				// 当reader.Close后,进入这个分支
				logrus.Infof("kafka get eof")
				break loop
			}
			logrus.Errorf("kafka get msg error:%v", err)
			continue
		}
		// logrus.Infof("%s", msg.Value)

		t := &Testkafka{}
		id, _ := datautils.ToInt(string(msg.Value))
		t.Id = id
		t.Task = task
		t.Insert()
		err = reader.CommitMessages(context.TODO(), msg)
		if err != nil {
			logrus.Errorf("commit error:%v", err)
		}
	}
	reader.Close()
}
  1. 在这个函数中,传入了ctx用来控制消费者的生命周期。
  2. ctx.Done的时候,触发kafka get context canceled调用。循环被终止。
  3. 在循环跳出后,调用reader.Close()
  4. 为了验证是否丢数据,会将每条消息(id)写入到mysql的表中。

在main中,监听退出信号:

代码语言:txt
复制
  ctx, cancel := context.WithCancel(context.Background())
  sigs := make(chan os.Signal, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    sig := <-sigs
    logrus.Infof("GraceFullyExit has exited, sig:%v", sig)
    cancel() // 给kafka消费者发信号让它退出
  }()
  task := os.Args[2]
  kafkatest.ReadKafkaWithKafkago(ctx, task)
  time.Sleep(time.Second)

结论

  1. 使用SELECT id FROM testkafka t1 WHERE NOT EXISTS (SELECT * FROM testkafka t2 WHERE t2.id = t1.id + 1)可以观察mysql的表是否ID连续。用以判定是否有数据丢失。
  2. 多次kill 1-N个消费者并重启消费者,不影响kafka数据消费的完整性。这个示例满足数据不丢失这一要求。
  3. 如果只有一个消费者,在kill掉并拉起时,不会有数据重消费的问题。
  4. 如果有多个消费者,kill掉其中一个会偶尔出现少量已入库的消息被重消费。具体原因不明,猜测原因和rebalance机制有关。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-11-07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写入端
  • 消费者
  • 结论
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档