操作场景
该任务以 Node.js 客户端为例指导您使用 VPC 网络接入消息队列 CKafka 版并收发消息。
前提条件
操作步骤
准备工作
1. 将下载的 Demo 中的 nodejskafkademo 上传至 Linux 服务器。
2. 登录 Linux 服务器,进入 nodejskafkademo 目录。
步骤1:安装 C++ 依赖库
1. 执行以下命令切换到 yum 源配置目录
/etc/yum.repos.d/
。cd /etc/yum.repos.d/
2. 创建 yum 源配置文件 confluent.repo。
[Confluent.dist]name=Confluent repository (dist)baseurl=https://packages.confluent.io/rpm/5.1/7gpgcheck=1gpgkey=https://packages.confluent.io/rpm/5.1/archive.keyenabled=1[Confluent]name=Confluent repositorybaseurl=https://packages.confluent.io/rpm/5.1gpgcheck=1gpgkey=https://packages.confluent.io/rpm/5.1/archive.keyenabled=1
3. 执行以下命令安装 C++ 依赖库。
yum install librdkafka-devel
步骤2:安装 Node.js 依赖库
1. 执行以下命令为预处理器指定 OpenSSL 头文件路径。
export CPPFLAGS=-I/usr/local/opt/openssl/include
2. 执行以下命令为连接器指定 OpenSSL 库路径。
export LDFLAGS=-L/usr/local/opt/openssl/lib
3. 执行以下命令安装 Node.js 依赖库。
npm install i --unsafe-perm node-rdkafka
步骤3:准备配置
创建消息队列 CKafka 版配置文件 setting.js。
module.exports = {'bootstrap_servers': ["xxx.xx.xxx:xxxx"],'topic_name': 'xxx','group_id': 'xxx'}
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 |
topic_name | Topic 名称,您可以在控制台上 topic管理 页面复制。 |
group_id | 您可以自定义设置,Demo 运行成功后可以在 Consumer Group 页面看到该消费者。 |
步骤4:发送消息
1. 编写生产消息程序 producer.js。
const Kafka = require('node-rdkafka');const config = require('./setting');console.log("features:" + Kafka.features);console.log(Kafka.librdkafkaVersion);var producer = new Kafka.Producer({'api.version.request': 'true',// 设置入口服务,请通过控制台获取对应的服务地址。'bootstrap.servers': config['bootstrap_servers'],'dr_cb': true,'dr_msg_cb': true,// 请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失'retries': '0',// 发送请求失败时到下一次重试请求之间的时间"retry.backoff.ms": 100,// producer 网络请求的超时时间。'socket.timeout.ms': 6000,});var connected = falseproducer.setPollInterval(100);producer.connect();producer.on('ready', function() {connected = trueconsole.log("connect ok")});producer.on("disconnected", function() {connected = false;producer.connect();})producer.on('event.log', function(event) {console.log("event.log", event);});producer.on("error", function(error) {console.log("error:" + error);});function produce() {try {producer.produce(config['topic_name'],null,new Buffer('Hello CKafka Default'),null,Date.now());} catch (err) {console.error('Error occurred when sending message(s)');console.error(err);}}producer.on('delivery-report', function(err, report) {console.log("delivery-report: producer ok");});producer.on('event.error', function(err) {console.error('event.error:' + err);})setInterval(produce, 1000, "Interval");
2. 执行以下命令发送消息。
node producer.js
3. 查看运行结果。
4. 在 CKafka 控制台topic管理页面,选择对应的 Topic,单击更多 > 消息查询,查看刚刚发送的消息。
步骤5:订阅消息
1. 创建消费消息程序 consumer.js。
const Kafka = require('node-rdkafka');const config = require('./setting');console.log(Kafka.features);console.log(Kafka.librdkafkaVersion);console.log(config)var consumer = new Kafka.KafkaConsumer({'api.version.request': 'true',// 设置入口服务,请通过控制台获取对应的服务地址。'bootstrap.servers': config['bootstrap_servers'],'group.id' : config['group_id'],// 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,// 认为该消费者故障失败,Broker 发起重新 Rebalance 过程。'session.timeout.ms': 10000,// 客户端请求超时时间,如果超过这个时间没有收到应答,则请求超时失败'metadata.request.timeout.ms': 305000,// 设置客户端内部重试间隔。'reconnect.backoff.max.ms': 3000});consumer.connect();consumer.on('ready', function() {console.log("connect ok");consumer.subscribe([config['topic_name']]);consumer.consume();})consumer.on('data', function(data) {console.log(data);});consumer.on('event.log', function(event) {console.log("event.log", event);});consumer.on('error', function(error) {console.log("error:" + error);});consumer.on('event', function(event) {console.log("event:" + event);});
2. 执行以下命令消费消息。
node consumer.js
3. 查看运行结果
4. 在 CKafka 控制台Consumer Group 页面,选择对应的消费组,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。