有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

操作场景

该任务以 Python 客户端为例,指导您使用消息队列 CKafka 版弹性 Topic并收发消息。

前提条件

操作步骤

步骤1:准备环境

执行以下命令安装添加 Python 依赖库。
pip install kafka-python

步骤2:创建 Topic 和订阅关系

1. 在控制台的弹性 Topic 列表页面创建一个 Topic。


2. 单击 Topic 的 “ID” 进入基本信息页面,获取用户名、密码和地址信息。


3. 订阅关系页签,新建一个订阅关系(消费组)。
img



步骤3:生产消息

1. 修改生产消息程序 producer.py中配置参数。
producer = KafkaProducer(
bootstrap_servers = ['xx.xx.xx.xx:port'],#地址
api_version = (1, 1),
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = "PLAIN",
sasl_plain_username = "username",#用户名
sasl_plain_password = "password",#密码
)

message = "Hello World! Hello Ckafka!"
msg = json.dumps(message).encode()
producer.send('topic_name', value = msg)#topic名称
print("produce message " + message + " success.")
producer.close()
参数
描述
bootstrapServers
接入地址,在控制台的弹性 Topic 基本信息页面获取。


sasl_plain_username
用户名,在控制台的弹性 Topic 基本信息页面获取。
sasl_plain_password
用户密码,在控制台的弹性 Topic 基本信息页面获取。
topic_name
Topic 名称,在控制台的弹性 Topic 基本信息页面获取。
2. 编译并运行 producer.py。
3. 查看运行结果。




步骤4:消费消息

1. 修改消费消息程序 consumer.py 中配置参数。
consumer = KafkaConsumer(
'topic_name',#topic名称
group_id = "group_id",#消费组
bootstrap_servers = ['xx.xx.xx.xx:port'],#地址
api_version = (1,1),

security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "username",#用户名
sasl_plain_password = "password",#密码
)

for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
参数
描述
bootstrapServers
接入地址,在控制台的弹性 Topic 基本信息页面获取。


sasl_plain_username
用户名,在控制台的弹性 Topic 基本信息页面获取。
sasl_plain_password
用户密码,在控制台的弹性 Topic 基本信息页面获取。
topic_name
Topic 名称,在控制台的弹性 Topic 基本信息页面获取。
group.id
消费组名称,在控制台的弹性 Topic 的订阅关系列表获取。


2. 编译并运行 consumer.py。
3. 查看运行结果。