文档中心>消息队列 MQTT 版>SDK 文档>Javascript/Node.JS/小程序

Javascript/Node.JS/小程序

最近更新时间:2025-11-27 16:21:22

我的收藏

功能概述

MQTT.js 是 JavaScript 编写的,实现了 MQTT 协议客户端功能的模块,可以在浏览器 和 Node.js 环境中使用。
由于 JavaScript 单线程特性,MQTT.js 是全异步 MQTT 客户端,MQTT.js 支持 MQTT 与 MQTT over WebSocket,在不同运行环境支持的程度如下:
浏览器环境:MQTT over WebSocket(包括微信小程序、支付宝小程序等定制浏览器环境)。
Node.js 环境:MQTT、MQTT over WebSocket。
不同环境里除了少部分连接参数不同,其他 API 均是相同的。

环境准备

使用 npm 安装:
npm i mqtt
使用 CDN 安装(浏览器):
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
// 将在全局初始化一个 mqtt 变量
console.log(mqtt)
</script>
在安装 Node.js 的环境里,可以通过 npm i mqtt -g 命令全局安装以命令行的形式使用 MQTT.js。
npm i mqtt -g

mqtt help

MQTT.js command line interface, available commands are:

* publish publish a message to the broker
* subscribe subscribe for updates from the broker
* version the current MQTT.js version
* help help about commands

Launch 'mqtt help [command]' to know more about the commands.

示例代码

MQTT5
MQTT5 TLS
MQTT3.1.1
MQTT3.1.1 TLS

const mqtt = require('mqtt');

/**
* MQTT 5.0 TCP连接示例
*/

// ============ 连接配置 ============
const serverUri = 'mqtt://mqtt-xxx.mqtt.tencenttdmq.com:1883';
const clientId = 'QuickStart';
const username = 'YOUR_USERNAME';
const password = 'YOUR_PASSWORD';
const pubTopic = 'home/test';
const topicFilters = ['home/test', 'home/#', 'home/+'];
const qos = [1, 1, 1];

// 配置连接选项
const options = {
clientId: clientId,
username: username,
password: password,
protocolVersion: 5, // MQTT 5.0
clean: true,
reconnectPeriod: 0, // 禁用自动重连
};

// 创建客户端并连接
const client = mqtt.connect(serverUri, options);

// 连接成功
client.on('connect', async () => {
console.log('已连接');

try {
// 2. 订阅
for (let i = 0; i < topicFilters.length; i++) {
await new Promise((resolve, reject) => {
client.subscribe(topicFilters[i], { qos: qos[i] }, (err) => {
if (err) reject(err);
else resolve();
});
});
}
console.log('已订阅');

// 3. 发布
for (let i = 1; i <= 16; i++) {
await new Promise((resolve, reject) => {
client.publish(pubTopic, `消息 #${i}`, { qos: 1 }, (err) => {
if (err) reject(err);
else resolve();
});
});
await new Promise(resolve => setTimeout(resolve, 500));
}
console.log('发布完成');

// 4. 等待接收
await new Promise(resolve => setTimeout(resolve, 2000));

// 5. 断开连接
client.end();
console.log('已断开');
} catch (error) {
console.error('错误:', error.message);
client.end();
}
});

// 消息接收处理
client.on('message', (topic, message) => {
console.log(`收到消息: ${topic} -> ${message.toString()}`);
});

// 错误处理
client.on('error', (error) => {
console.error('错误:', error.message);
});


const mqtt = require('mqtt');
const fs = require('fs');
const tls = require('tls');

/**
* MQTT 5.0 TLS加密连接示例
*/

// ============ 连接配置 ============
const serverUri = 'mqtts://mqtt-xxx.mqtt.tencenttdmq.com:8883';
const clientId = 'QuickStart';
const username = 'YOUR_USERNAME';
const password = 'YOUR_PASSWORD';
const pubTopic = 'home/test';
const topicFilters = ['home/test', 'home/#', 'home/+'];
const qos = [1, 1, 1];

// CA证书路径(可选,用于验证服务器证书)
const caCertPath = null; // 例如: '/path/to/ca.crt'

// 配置连接选项(包含TLS)
const options = {
clientId: clientId,
username: username,
password: password,
protocolVersion: 5, // MQTT 5.0
clean: true,
reconnectPeriod: 0, // 禁用自动重连
// TLS配置
rejectUnauthorized: true, // 生产环境: 验证服务器证书
// 如果提供了CA证书,加载它
ca: caCertPath ? [fs.readFileSync(caCertPath)] : undefined,
// 自定义证书验证
checkServerIdentity: (host, cert) => {
return validateServerCertificate(host, cert);
}
};

/**
* 验证服务器证书
*/
function validateServerCertificate(host, cert) {
// 1. 检查证书有效期
const now = new Date();
const validFrom = new Date(cert.valid_from);
const validTo = new Date(cert.valid_to);
if (now < validFrom || now > validTo) {
const error = new Error(`证书验证失败: 证书已过期或尚未生效 (有效期: ${validFrom} - ${validTo})`);
console.error(error.message);
return error;
}
// 2. 检查主题名称(可选,根据实际需求)
// const expectedSubject = 'CN=*.mqtt.tencenttdmq.com';
// if (!cert.subject.CN || !cert.subject.CN.includes('mqtt.tencenttdmq.com')) {
// const error = new Error(`证书验证失败: 主题不匹配 (实际: ${cert.subject.CN})`);
// console.error(error.message);
// return error;
// }
// 3. 使用默认的证书链验证
const err = tls.checkServerIdentity(host, cert);
if (err) {
console.error('证书验证失败:', err.message);
return err;
}
console.log('证书验证通过');
return undefined;
}

// 创建客户端并连接
const client = mqtt.connect(serverUri, options);

// 连接成功
client.on('connect', async () => {
console.log('已连接(TLS加密)');

try {
// 2. 订阅
for (let i = 0; i < topicFilters.length; i++) {
await new Promise((resolve, reject) => {
client.subscribe(topicFilters[i], { qos: qos[i] }, (err) => {
if (err) reject(err);
else resolve();
});
});
}
console.log('已订阅');

// 3. 发布
for (let i = 1; i <= 16; i++) {
await new Promise((resolve, reject) => {
client.publish(pubTopic, `消息 #${i}`, { qos: 1 }, (err) => {
if (err) reject(err);
else resolve();
});
});
await new Promise(resolve => setTimeout(resolve, 500));
}
console.log('发布完成');

// 4. 等待接收
await new Promise(resolve => setTimeout(resolve, 2000));

// 5. 断开连接
client.end();
console.log('已断开');
} catch (error) {
console.error('错误:', error.message);
client.end();
}
});

// 消息接收处理
client.on('message', (topic, message) => {
console.log(`收到消息: ${topic} -> ${message.toString()}`);
});

// 错误处理
client.on('error', (error) => {
console.error('错误:', error.message);
});

const mqtt = require('mqtt');

/**
* MQTT 3.1.1 TCP连接示例
*/

// ============ 连接配置 ============
const serverUri = 'mqtt://mqtt-xxx.mqtt.tencenttdmq.com:1883';
const clientId = 'QuickStart';
const username = 'YOUR_USERNAME';
const password = 'YOUR_PASSWORD';
const pubTopic = 'home/test';
const topicFilters = ['home/test', 'home/#', 'home/+'];
const qos = [1, 1, 1];

// 配置连接选项
const options = {
clientId: clientId,
username: username,
password: password,
protocolVersion: 4, // MQTT 3.1.1
clean: true,
reconnectPeriod: 0, // 禁用自动重连
};

// 创建客户端并连接
const client = mqtt.connect(serverUri, options);

// 连接成功
client.on('connect', async () => {
console.log('已连接');

try {
// 2. 订阅
for (let i = 0; i < topicFilters.length; i++) {
await new Promise((resolve, reject) => {
client.subscribe(topicFilters[i], { qos: qos[i] }, (err) => {
if (err) reject(err);
else resolve();
});
});
}
console.log('已订阅');

// 3. 发布
for (let i = 1; i <= 16; i++) {
await new Promise((resolve, reject) => {
client.publish(pubTopic, `消息 #${i}`, { qos: 1 }, (err) => {
if (err) reject(err);
else resolve();
});
});
await new Promise(resolve => setTimeout(resolve, 500));
}
console.log('发布完成');

// 4. 等待接收
await new Promise(resolve => setTimeout(resolve, 2000));

// 5. 断开连接
client.end();
console.log('已断开');
} catch (error) {
console.error('错误:', error.message);
client.end();
}
});

// 消息接收处理
client.on('message', (topic, message) => {
console.log(`收到消息: ${topic} -> ${message.toString()}`);
});

// 错误处理
client.on('error', (error) => {
console.error('错误:', error.message);
});

const mqtt = require('mqtt');
const fs = require('fs');
const tls = require('tls');

/**
* MQTT 3.1.1 TLS加密连接示例
*/

// ============ 连接配置 ============
const serverUri = 'mqtts://mqtt-xxx.mqtt.tencenttdmq.com:8883';
const clientId = 'QuickStart';
const username = 'YOUR_USERNAME';
const password = 'YOUR_PASSWORD';
const pubTopic = 'home/test';
const topicFilters = ['home/test', 'home/#', 'home/+'];
const qos = [1, 1, 1];

// CA证书路径(可选,用于验证服务器证书)
const caCertPath = null; // 例如: '/path/to/ca.crt'

// 配置连接选项(包含TLS)
const options = {
clientId: clientId,
username: username,
password: password,
protocolVersion: 4, // MQTT 3.1.1
clean: true,
reconnectPeriod: 0, // 禁用自动重连
// TLS配置
rejectUnauthorized: true, // 生产环境: 验证服务器证书
// 如果提供了CA证书,加载它
ca: caCertPath ? [fs.readFileSync(caCertPath)] : undefined,
// 自定义证书验证
checkServerIdentity: (host, cert) => {
return validateServerCertificate(host, cert);
}
};

/**
* 验证服务器证书
*/
function validateServerCertificate(host, cert) {
// 1. 检查证书有效期
const now = new Date();
const validFrom = new Date(cert.valid_from);
const validTo = new Date(cert.valid_to);
if (now < validFrom || now > validTo) {
const error = new Error(`证书验证失败: 证书已过期或尚未生效 (有效期: ${validFrom} - ${validTo})`);
console.error(error.message);
return error;
}
// 2. 检查主题名称(可选,根据实际需求)
// const expectedSubject = 'CN=*.mqtt.tencenttdmq.com';
// if (!cert.subject.CN || !cert.subject.CN.includes('mqtt.tencenttdmq.com')) {
// const error = new Error(`证书验证失败: 主题不匹配 (实际: ${cert.subject.CN})`);
// console.error(error.message);
// return error;
// }
// 3. 使用默认的证书链验证
const err = tls.checkServerIdentity(host, cert);
if (err) {
console.error('证书验证失败:', err.message);
return err;
}
console.log('证书验证通过');
return undefined;
}

// 创建客户端并连接
const client = mqtt.connect(serverUri, options);

// 连接成功
client.on('connect', async () => {
console.log('已连接(TLS加密)');

try {
// 2. 订阅
for (let i = 0; i < topicFilters.length; i++) {
await new Promise((resolve, reject) => {
client.subscribe(topicFilters[i], { qos: qos[i] }, (err) => {
if (err) reject(err);
else resolve();
});
});
}
console.log('已订阅');

// 3. 发布
for (let i = 1; i <= 16; i++) {
await new Promise((resolve, reject) => {
client.publish(pubTopic, `消息 #${i}`, { qos: 1 }, (err) => {
if (err) reject(err);
else resolve();
});
});
await new Promise(resolve => setTimeout(resolve, 500));
}
console.log('发布完成');

// 4. 等待接收
await new Promise(resolve => setTimeout(resolve, 2000));

// 5. 断开连接
client.end();
console.log('已断开');
} catch (error) {
console.error('错误:', error.message);
client.end();
}
});

// 消息接收处理
client.on('message', (topic, message) => {
console.log(`收到消息: ${topic} -> ${message.toString()}`);
});

// 错误处理
client.on('error', (error) => {
console.error('错误:', error.message);
});



参数说明

参数
说明
topic
MQTT 第一级 Topic,在控制台集群详情页 Topic 管理页面复制。



connectUrl
broker 连接地址,在控制台目标集群基本信息 > 接入信息模块中复制。位置如下图所示。格式:mqtt-xxx-gz.mqtt.qcloud.tencenttdmq.com:1883。



clientId
客户端 ID,在控制台集群详情页客户端管理页面获取。



username
连接用户名,在控制台集群详情页认证管理页面复制。



password
连接用户名匹配的密码,在控制台集群详情页认证管理页面复制。