首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >docker和k3s安装kafka,go语言发送和接收kafka消息

docker和k3s安装kafka,go语言发送和接收kafka消息

作者头像
福大大架构师每日一题
发布2025-12-18 13:13:51
发布2025-12-18 13:13:51
1560
举报

docker安装命令,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:

代码语言:javascript
复制
docker run -d \
  --name kafka \
  -p 14818:9092 \
  -p 9093:9093 \
  -v /tmp/kraft-combined-logs:/tmp/kraft-combined-logs \
  -e TZ=Asia/Shanghai \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.11.111:14818 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
  -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
  -e KAFKA_NUM_PARTITIONS=3 \
  -e KAFKA_LOG_DIRS=/tmp/kraft-combined-logs \
  -e CLUSTER_ID=5L6g3nShT-eMCtK--X86sw \
  apache/kafka-native:4.1.0

k3s的yaml,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:

.

代码语言:javascript
复制
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka
  name: kafka
  namespace: moonfdd
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      initContainers:
        - name: kafka-fix-data-volume-permissions
          image: alpine
          imagePullPolicy: IfNotPresent
          command:
          - sh
          - -c
          - "chown -R 1000:1000 /tmp/kraft-combined-logs"
          volumeMounts:
            - mountPath: /tmp/kraft-combined-logs
              name: volv
      containers:
        - env:
            - name: TZ
              value: Asia/Shanghai
            - name: KAFKA_NODE_ID
              value: "1"
            - name: KAFKA_PROCESS_ROLES
              value: broker,controller
            - name: KAFKA_LISTENERS
              value: PLAINTEXT://:9092,CONTROLLER://:9093
            - name: KAFKA_ADVERTISED_LISTENERS
              value: PLAINTEXT://172.16.11.111:14818
            - name: KAFKA_CONTROLLER_LISTENER_NAMES
              value: CONTROLLER
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
            - name: KAFKA_CONTROLLER_QUORUM_VOTERS
              value: 1@localhost:9093
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
              value: "1"
            - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
              value: "0"
            - name: KAFKA_NUM_PARTITIONS
              value: "3"
            - name: KAFKA_LOG_DIRS
              value: /tmp/kraft-combined-logs
            - name: CLUSTER_ID
              value: "5L6g3nShT-eMCtK--X86sw"  # 固定集群ID,仅首次启动格式化使用
          image: 'apache/kafka-native:4.1.0'
          imagePullPolicy: IfNotPresent
          name: kafka
          volumeMounts:
            - mountPath: /tmp/kraft-combined-logs
              name: volv
      volumes:
        - hostPath:
            path: /root/k8s/moonfdd/kafka/tmp/kraft-combined-logs
            type: DirectoryOrCreate
          name: volv
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka
  name: kafka
  namespace: moonfdd
spec:
  ports:
    - port: 9092
      protocol: TCP
      targetPort: 9092
      name: 9092-9092
    - port: 9093
      protocol: TCP
      targetPort: 9093
      name: 9093-9093
  selector:
    app: kafka
type: NodePort

go发送kafka消息:github.com/segmentio/kafka-go

.

代码语言:javascript
复制
package main

import (
    "context"
    "log"

    "github.com/segmentio/kafka-go"
)

func main() {
    // 创建一个Kafka writer(Producer)
    w := kafka.NewWriter(kafka.WriterConfig{
        Brokers:  []string{"172.16.11.111:14818"}, // Kafka broker 地址
        Topic:    "test-topic",                    // 发送的 topic
        Balancer: &kafka.LeastBytes{},             // 负载均衡策略
    })

    // 写入消息
    err := w.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("Key-A"),
            Value: []byte("Hello Kafka from Go!"),
        },
    )

    if err != nil {
        log.Fatalf("could not write message: %v", err)
    }

    log.Println("Message sent successfully!")

    // 关闭 writer
    w.Close()
}

go接收kafka消息:github.com/segmentio/kafka-go

.

代码语言:javascript
复制
package main

import (
    "context"
    "log"

    "github.com/segmentio/kafka-go"
)

func main() {
    // 创建 Kafka reader(Consumer)
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{"172.16.11.111:14818"}, // Kafka broker 地址
        Topic:    "test-topic",                    // 订阅的 topic
        GroupID:  "my-consumer-group",             // 消费者组,确保相同组会读取上一 offset
        MinBytes: 10e3,                            // 最小fetch字节数
        MaxBytes: 10e6,                            // 最大fetch字节数
    })

    for {
        // 读取消息(会自动从上次的 offset 开始)
        m, err := r.ReadMessage(context.Background())
        if err != nil {
            log.Fatalf("could not read message: %v", err)
        }
        log.Printf("offset:%d | key:%s | value:%s\n", m.Offset, string(m.Key), string(m.Value))
    }

    // r.Close() // 如果你打算退出循环时关闭
}

go发送kafka消息:github.com/IBM/sarama

.

代码语言:javascript
复制
package main

import (
    "fmt"
    "log"
    "time"

    "github.com/IBM/sarama"
)

func main() {
    // 配置生产者
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true          // 确保消息发送成功
    config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
    config.Producer.Retry.Max = 3                    // 重试次数

    // 重要:配置客户端使用正确的主机
    config.Net.SASL.Enable = false
    config.Net.TLS.Enable = false
    config.Version = sarama.MaxVersion

    // 创建同步生产者
    producer, err := sarama.NewSyncProducer([]string{"172.16.11.111:14818"}, config)
    if err != nil {
        log.Fatalf("创建生产者失败: %v", err)
    }
    defer producer.Close()

    // 构造消息
    message := &sarama.ProducerMessage{
        Topic: "test-topic",
        Key:   sarama.StringEncoder("message-key"),
        Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka! %v", time.Now())),
    }

    // 发送消息
    partition, offset, err := producer.SendMessage(message)
    if err != nil {
        log.Fatalf("发送消息失败: %v", err)
    }

    fmt.Printf("消息发送成功! 分区: %d, 偏移量: %d\n", partition, offset)
}

go接收kafka消息:github.com/IBM/sarama

.

代码语言:javascript
复制
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"

    "github.com/IBM/sarama"
)

type Consumer struct{}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // 会话初始化,可以在这里做一些准备工作
    returnnil
}

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    // 会话结束时的清理操作
    returnnil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    // claim.Messages() 会不断返回新消息
    for msg := range claim.Messages() {
        fmt.Printf("Topic:%s Partition:%d Offset:%d Value:%s\n",
            msg.Topic, msg.Partition, msg.Offset, string(msg.Value))

        // 标记该消息已被处理,Kafka会自动保存offset
        session.MarkMessage(msg, "")
    }
    returnnil
}

func main() {
    // Kafka集群地址
    brokers := []string{"172.16.11.111:14818"}
    groupID := "my-group"// 消费者组ID,保持不变才能从上次offset消费
    topics := []string{"test-topic"}

    // 配置
    config := sarama.NewConfig()
    config.Version = sarama.MaxVersion // Kafka版本
    config.Consumer.Return.Errors = true

    // 非首次启动时自动从上次位置开始
    config.Consumer.Offsets.Initial = sarama.OffsetNewest
    // OffsetNewest: 如果没有历史offset,从最新开始;
    // OffsetOldest: 如果没有历史offset,从最旧开始。

    // 创建消费者组
    consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
    if err != nil {
        log.Fatalf("Error creating consumer group: %v", err)
    }
    defer consumerGroup.Close()

    consumer := &Consumer{}

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    gofunc() {
        for err := range consumerGroup.Errors() {
            log.Printf("Error: %v", err)
        }
    }()

    log.Println("Kafka consumer started...")
    // 优雅退出
    gofunc() {
        sigchan := make(chan os.Signal, 1)
        signal.Notify(sigchan, os.Interrupt)
        <-sigchan
        cancel()
    }()

    // 循环消费
    for {
        if err := consumerGroup.Consume(ctx, topics, consumer); err != nil {
            log.Printf("Error from consumer: %v", err)
        }

        // 检查退出
        if ctx.Err() != nil {
            return
        }
    }
}

我们相信人工智能为普通人提供了一种“增强工具”,并致力于分享全方位的AI知识。在这里,您可以找到最新的AI科普文章、工具评测、提升效率的秘籍以及行业洞察。 欢迎关注“福大大架构师每日一题”,发消息可获得面试资料,让AI助力您的未来发展

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-09-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 福大大架构师每日一题 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • docker安装命令,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:
  • k3s的yaml,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:
  • go发送kafka消息:github.com/segmentio/kafka-go
  • go接收kafka消息:github.com/segmentio/kafka-go
  • go发送kafka消息:github.com/IBM/sarama
  • go接收kafka消息:github.com/IBM/sarama
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档