操作场景
本文介绍 Go 客户端如何在公网环境下,使用SASL_SSL方式接入消息队列 CKafka 版收发消息。
前提条件
安装 Go
下载 Demo
操作步骤
步骤1:准备 Go 依赖库
1. 将下载的 Demo 中的 gokafkademo 上传至 Linux 服务器,需要客户提前安装好go环境和librdkafka。
2. 登录 Linux 服务器,进入 gokafkademo 目录,执行以下命令添加依赖库。
go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
步骤2:配置参数
参数 | 描述 |
topic | Topic 名称,您可以在控制台上 topic 管理页面复制。 ![]() |
sasl.username | 用户名,在控制台ACL策略管理下的用户管理页面创建用户时设置。 |
sasl.password | 用户密码,在控制台ACL策略管理下的用户管理页面创建用户时设置。 |
bootstrapServers | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 ![]() |
consumerGroupId | 您可以自定义设置,Demo 运行成功后可以在 Consumer Group 页面看到该消费者。 |
步骤3:发送消息
1. 编写生产消息程序。
// main.gopackage mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// ======== 请替换为你的实际配置 ========brokers := "$bootstrapServers" // Kafka 地址topic := "$topic" // topic名字saslUsername := "$sasl.username" // SASL 用户名saslPassword := "$sasl.password" // SASL 密码caFile := "./CARoot.pem" // CA 证书路径,控制台下载retries := 5 // 重试次数message := "Hello from Go with SASL+SSL"// =====================================config := &kafka.ConfigMap{"bootstrap.servers": brokers,"security.protocol": "sasl_ssl", // 启用 SASL over SSL"ssl.ca.location": caFile, // 指定 CA 证书"sasl.mechanisms": "PLAIN", // 认证机制"sasl.username": saslUsername,"sasl.password": saslPassword,"acks": "1", // 只等待 leader 确认"message.send.max.retries": retries,"retry.backoff.ms": 1000,"socket.timeout.ms": 30000,"session.timeout.ms": 30000,"enable.idempotence": false, // 关闭幂等,避免强制 acks=all"max.in.flight.requests.per.connection": 5,}// 创建生产者p, err := kafka.NewProducer(config)if err != nil {fmt.Printf("创建生产者失败: %v\\n", err)return}defer p.Close()fmt.Printf("生产者已启动,连接到 %s\\n", brokers)// 发送消息err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic,Partition: kafka.PartitionAny,},Value: []byte(message),}, nil)if err != nil {fmt.Printf("消息发送失败: %v\\n", err)} else {fmt.Printf("已发送消息: %s\\n", message)}p.Flush(3*1000)}
2. 编译并运行程序发送消息。
go run main.go
3. 查看运行结果,示例如下。

4. 在 CKafka 控制台 的topic管理页面,选择对应的 Topic , 单击更多 > 消息查询,查看刚刚发送的消息。

步骤4:消费消息
1. 编写消费消息程序。
// main.gopackage mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// ======== 替换为你的配置 ========brokers := "$bootstrapServers" // Kafka 地址groupID := "$consumerGroupId" // 消费组名字topic := "$topic" // topic名字saslUsername := "$sasl.username" // SASL 用户名saslPassword := "$sasl.password" // SASL 密码caFile := "./CARoot.pem" // CA 证书路径,控制台下载// ==============================// 配置消费者config := &kafka.ConfigMap{"bootstrap.servers": brokers,"security.protocol": "sasl_ssl","ssl.ca.location": caFile,"sasl.mechanisms": "PLAIN","sasl.username": saslUsername,"sasl.password": saslPassword,"group.id": groupID,"auto.offset.reset": "earliest","enable.auto.commit": true, // 启用自动提交"auto.commit.interval.ms": 5000, // 每 5 秒提交一次}// 创建消费者c, err := kafka.NewConsumer(config)if err != nil {panic(err)}defer c.Close()// 订阅主题c.SubscribeTopics([]string{topic}, nil)// 持续拉取消息for {ev := c.Poll(1000)if ev == nil {continue}// 类型断言if msg, ok := ev.(*kafka.Message); ok {if msg.Value != nil {fmt.Printf("收到: %s\\n", string(msg.Value))}}}}
2. 编译并运行程序消费消息。
go run main.go
3. 查看运行结果,示例如下。

4. 在 CKafka 控制台 的 Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。
