公网 SASL_SSL 方式接入

最近更新时间:2025-10-10 15:42:32

我的收藏

操作场景

本文介绍 Go 客户端如何在公网环境下,使用SASL_SSL方式接入消息队列 CKafka 版收发消息。

前提条件

安装 Go

操作步骤

步骤1:准备 Go 依赖库

1. 将下载的 Demo 中的 gokafkademo 上传至 Linux 服务器,需要客户提前安装好go环境和librdkafka。
2. 登录 Linux 服务器,进入 gokafkademo 目录,执行以下命令添加依赖库。
go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

步骤2:配置参数

参数
描述
topic
Topic 名称,您可以在控制台上 topic 管理页面复制。



sasl.username
用户名,在控制台ACL策略管理下的用户管理页面创建用户时设置。
sasl.password
用户密码,在控制台ACL策略管理下的用户管理页面创建用户时设置。
bootstrapServers
接入网络,在控制台的实例详情页面接入方式模块的网络列复制。
img


consumerGroupId
您可以自定义设置,Demo 运行成功后可以在 Consumer Group 页面看到该消费者。

步骤3:发送消息

1. 编写生产消息程序。
// main.go
package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// ======== 请替换为你的实际配置 ========
brokers := "$bootstrapServers" // Kafka 地址
topic := "$topic" // topic名字
saslUsername := "$sasl.username" // SASL 用户名
saslPassword := "$sasl.password" // SASL 密码
caFile := "./CARoot.pem" // CA 证书路径,控制台下载
retries := 5 // 重试次数
message := "Hello from Go with SASL+SSL"
// =====================================

config := &kafka.ConfigMap{
"bootstrap.servers": brokers,
"security.protocol": "sasl_ssl", // 启用 SASL over SSL
"ssl.ca.location": caFile, // 指定 CA 证书
"sasl.mechanisms": "PLAIN", // 认证机制
"sasl.username": saslUsername,
"sasl.password": saslPassword,
"acks": "1", // 只等待 leader 确认
"message.send.max.retries": retries,
"retry.backoff.ms": 1000,
"socket.timeout.ms": 30000,
"session.timeout.ms": 30000,
"enable.idempotence": false, // 关闭幂等,避免强制 acks=all
"max.in.flight.requests.per.connection": 5,
}

// 创建生产者
p, err := kafka.NewProducer(config)
if err != nil {
fmt.Printf("创建生产者失败: %v\\n", err)
return
}
defer p.Close()

fmt.Printf("生产者已启动,连接到 %s\\n", brokers)

// 发送消息
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Value: []byte(message),
}, nil)

if err != nil {
fmt.Printf("消息发送失败: %v\\n", err)
} else {
fmt.Printf("已发送消息: %s\\n", message)
}

p.Flush(3*1000)
}

2. 编译并运行程序发送消息。
go run main.go
3. 查看运行结果,示例如下。

4. CKafka 控制台topic管理页面,选择对应的 Topic , 单击更多 > 消息查询,查看刚刚发送的消息。


步骤4:消费消息

1. 编写消费消息程序。

// main.go
package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// ======== 替换为你的配置 ========
brokers := "$bootstrapServers" // Kafka 地址
groupID := "$consumerGroupId" // 消费组名字
topic := "$topic" // topic名字
saslUsername := "$sasl.username" // SASL 用户名
saslPassword := "$sasl.password" // SASL 密码
caFile := "./CARoot.pem" // CA 证书路径,控制台下载
// ==============================

// 配置消费者
config := &kafka.ConfigMap{
"bootstrap.servers": brokers,
"security.protocol": "sasl_ssl",
"ssl.ca.location": caFile,
"sasl.mechanisms": "PLAIN",
"sasl.username": saslUsername,
"sasl.password": saslPassword,
"group.id": groupID,
"auto.offset.reset": "earliest",
"enable.auto.commit": true, // 启用自动提交
"auto.commit.interval.ms": 5000, // 每 5 秒提交一次
}

// 创建消费者
c, err := kafka.NewConsumer(config)
if err != nil {
panic(err)
}
defer c.Close()

// 订阅主题
c.SubscribeTopics([]string{topic}, nil)

// 持续拉取消息
for {
ev := c.Poll(1000)
if ev == nil {
continue
}
// 类型断言
if msg, ok := ev.(*kafka.Message); ok {
if msg.Value != nil {
fmt.Printf("收到: %s\\n", string(msg.Value))
}
}

}
}

2. 编译并运行程序消费消息。
go run main.go
3. 查看运行结果,示例如下。

4. CKafka 控制台Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。