SASL_SSL 方式接入

最近更新时间:2025-08-28 17:40:13

我的收藏

操作场景

该任务以 Python 客户端为例,指导您使用公网 SASL_SSL 方式接入消息队列 CKafka 版并收发消息。

前提条件

操作步骤

步骤1:准备工作

1. 创建接入点。
1.1 实例列表 页面,单击目标实例 ID,进入实例详情页。
1.2 基本信息 > 接入方式 中,单击添加路由策略,在打开窗口中选择:路由类型:公网域名接入,接入方式:SASL_SSL



2. 创建角色。 在 ACL 策略管理下的用户管理页面新建角色,设置密码。



3. 创建 Topic。 在控制台 topic 管理页面新建 Topic(参见 创建 Topic)。
4. 添加 Python 依赖库。 执行以下命令安装:
pip install kafka-python

步骤2:生产消息

1. 修改生产消息程序 producer.py 中配置参数。
producer = KafkaProducer(
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1, 1),

#
# SASL_SSL 公网接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = "PLAIN",
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)

message = "Hello World! Hello Ckafka!"
msg = json.dumps(message).encode()
producer.send('topic_name', value = msg)
print("produce message " + message + " success.")
producer.close()
参数
描述
bootstrap_servers
接入网络,在控制台的实例详情页面接入方式模块的网络列复制。



sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在 CKafka 控制台 的实例详情页面的基本信息获取,用户在ACL策略管理下的用户管理创建用户时设置。
sasl_plain_password
用户密码,在 CKafka 控制台实例详情页面ACL策略管理下的用户管理创建用户时设置。
topic_name
Topic 名称,您可以在控制台上 topic管理页面复制。



CARoot.pem
采用 SASL_SSL 方式接入时,所需的证书路径。
2. 编译并运行 producer.py。
3. 查看运行结果。


4. CKafka 控制台topic管理页面,选择对应的 Topic , 单击更多 > 消息查询,查看刚刚发送的消息。




步骤3:消费消息

1. 修改消费消息程序 consumer.py 中配置参数。
consumer = KafkaConsumer(
'topic_name',
group_id = "group_id",
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1,1),

#
# SASL_SSL 公网接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,

)

for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
参数
描述
bootstrap_servers
接入网络,在控制台的实例详情页面接入方式模块的网络列复制。



group_id
消费者的组 ID,根据业务需求自定义。
sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在CKafka 控制台的实例详情页面的基本信息获取,用户在ACL策略管理下的用户管理创建用户时设置。
sasl_plain_password
用户名密码,在 CKafka 控制台实例详情页面ACL策略管理下的用户管理创建用户时设置
topic_name
Topic 名称,您可以在控制台上 topic管理页面复制。



CARoot.pem
采用 SASL_SSL 方式接入时,所需的证书路径。
2. 编译并运行 consumer.py。
3. 查看运行结果。


4. CKafka 控制台Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。




问题排查

SSL 证书错误

如您使用以上 Demo 报如下 SSL CERTIFICATE_VERIFY_FAILED 错误,请先检查下载的证书文件(SSL 证书)是否正确,如依然报错,请 提交工单 ,联系后端工程师排查。
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/sender-py", line 160, in run_once
self._client.poll(timeout_ms=poll_timeout_ms)
File"/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/client_async.py", line602, in poll
self._poll(timeout / 1000)
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/client_async.py", line 648, in _poll
conn.connect()
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/conn.py", line 429, in connect
if self._try_handshake():
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/conn.py", line 508, in _try_handshake
self._sock.do_handshake()
File "/root/anaconda3/envs/py39/lib/python3.9/ssl.py", line 1343, in do_handshake
self._sslobj.do_handshake()
ssl.SSLCertVerificationError:[SSL:CERTIFICATE_VERIFY_FAILED]certificate verify failed:certificate signature failure(_ssl.c:1133)
WARNING:kafka.conn:SSL connection closed by server during handshake.
INF0:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=ckafka-xxx.ap-beijing.ckafka.tencentcloudmq.com:50001 <handshake>[IPv4('x.x.x.x', 50001)]>:Closing connection.Kafka Connection Error:SSL connection closed by server during handshake
^CTraceback(most recent call last):
File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py",line 49,in<module>
main()
File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py", line 43, in main
send_message(producer, 'skdy_osr_1005',message)
File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py", line 32, in send_message
future = producer.send(topic, value=msg)
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/kafka.py", line 576, in send
self._wait_on_metadata(topic, self.config['max_block_ms']/1000.0)
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/kafka.py", line 699, in _wait_on_metadata
metadata_event.wait(max_wait-elapsed)
File "/root/anaconda3/envs/py39/lib/python3.9/threading-py", line 581,in wait
signaled = self._cond.wait(timeout)
File "/root/anaconda3/envs/py39/lib/python3.9/threading-py", line 316, in wait
gotit =waiter.acquire(True, timeout)
KeyboardInterrupt
INFo:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFo:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed with in timeout 0.