有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

操作场景

该任务以 Node.js 客户端为例,指导您使用公网 SASL_PLAINTEXT 方式接入消息队列 CKafka 版并收发消息。

前提条件

操作步骤

步骤1:安装 C++ 依赖库

1. 执行以下命令切换到 yum 源配置目录 /etc/yum.repos.d/
cd /etc/yum.repos.d/
2. 创建 yum 源配置文件 confluent.repo。
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
3. 执行以下命令安装 C++ 依赖库。
yum install librdkafka-devel

步骤2:安装 Node.js 依赖库

1. 执行以下命令为预处理器指定 OpenSSL 头文件路径。
export CPPFLAGS=-I/usr/local/opt/openssl/include
2. 执行以下命令为连接器指定 OpenSSL 库路径。
export LDFLAGS=-L/usr/local/opt/openssl/lib
3. 执行以下命令安装 Node.js 依赖库。
npm install i --unsafe-perm node-rdkafka

步骤3:准备配置

创建消息队列 CKafka 版配置文件 setting.js。
module.exports = {
'sasl_plain_username': 'ckafka-xxxxxxx#ckafkademo',
'sasl_plain_password': 'ckafkademo123',
'bootstrap_servers': ["xxx.ckafka.tencentcloudmq.com:6018"],
'topic_name': 'xxx',
'group_id': 'xxx'
}
参数
描述
sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在 CKafka 控制台 的实例详情页面的基本信息获取,用户在ACL策略管理下的用户管理创建用户时设置。
sasl_plain_password
用户密码,在 CKafka 控制台实例详情页面ACL策略管理下的用户管理创建用户时设置。
bootstrap_servers
SASL 接入点,在 CKafka 控制台的实例详情页面的基本信息 > 接入方式获取。
img


topic_name
Topic名称,在 CKafka 控制台实例详情页面的topic管理创建和获取。



group_id
消费者的组 ID,根据业务需求自定义

步骤4:发送消息

1. 编写生产消息程序 producer.js
const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log("features:" + Kafka.features);
console.log(Kafka.librdkafkaVersion);

var producer = new Kafka.Producer({
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
'dr_cb': true,
'dr_msg_cb': true,
'security.protocol' : 'SASL_PLAINTEXT',
'sasl.mechanisms' : 'PLAIN',
'sasl.username' : config['sasl_plain_username'],
'sasl.password' : config['sasl_plain_password']
});

var connected = false

producer.setPollInterval(100);

producer.connect();

producer.on('ready', function() {
connected = true
console.log("connect ok")

});

function produce() {
try {
producer.produce(
config['topic_name'],
new Buffer('Hello CKafka SASL'),
null,
Date.now()
);
} catch (err) {
console.error('Error occurred when sending message(s)');
console.error(err);
}
}

producer.on("disconnected", function() {
connected = false;
producer.connect();
})

producer.on('event.log', function(event) {
console.log("event.log", event);
});

producer.on("error", function(error) {
console.log("error:" + error);
});

producer.on('delivery-report', function(err, report) {
console.log("delivery-report: producer ok");
});
// Any errors we encounter, including connection errors
producer.on('event.error', function(err) {
console.error('event.error:' + err);
})

setInterval(produce,1000,"Interval");
2. 执行以下命令发送消息。
node producer.js
3. 查看运行结果。


4. CKafka 控制台topic 管理页面,选择对应的 Topic,单击更多 > 消息查询,查看刚发送的消息。



步骤5:订阅消息

1. 创建消费消息程序consumer.js。
consumer.on('event.log', function(event) {
console.log("event.log", event);
});

consumer.on('error', function(error) {
console.log("error:" + error);
});

consumer.on('event', function(event) {
console.log("event:" + event);
});const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log(Kafka.features);
console.log(Kafka.librdkafkaVersion);
console.log(config)

var consumer = new Kafka.KafkaConsumer({
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
'security.protocol' : 'SASL_PLAINTEXT',
'sasl.mechanisms' : 'PLAIN',
'message.max.bytes': 32000,
'fetch.message.max.bytes': 32000,
'max.partition.fetch.bytes': 32000,
'sasl.username' : config['sasl_plain_username'],
'sasl.password' : config['sasl_plain_password'],
'group.id' : config['group_id']
});

consumer.connect();

consumer.on('ready', function() {
console.log("connect ok");
consumer.subscribe([config['topic_name']]);
consumer.consume();
})

consumer.on('data', function(data) {
console.log(data);
});
2. 执行以下命令消费消息。
node consumer.js
3. 查看运行结果。


4. CKafka 控制台Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查看消费者详情,查看消费详情。