功能概述
由于 JavaScript 单线程特性,MQTT.js 是全异步 MQTT 客户端,MQTT.js 支持 MQTT 与 MQTT over WebSocket,在不同运行环境支持的程度如下:
浏览器环境:MQTT over WebSocket(包括微信小程序、支付宝小程序等定制浏览器环境)。
Node.js 环境:MQTT、MQTT over WebSocket。
不同环境里除了少部分连接参数不同,其他 API 均是相同的。
环境准备
使用 npm 安装:
npm i mqtt
使用 CDN 安装(浏览器):
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script><script>// 将在全局初始化一个 mqtt 变量console.log(mqtt)</script>
在安装 Node.js 的环境里,可以通过 npm i mqtt -g 命令全局安装以命令行的形式使用 MQTT.js。
npm i mqtt -gmqtt helpMQTT.js command line interface, available commands are:* publish publish a message to the broker* subscribe subscribe for updates from the broker* version the current MQTT.js version* help help about commandsLaunch 'mqtt help [command]' to know more about the commands.
示例代码
const mqtt = require('mqtt');/*** MQTT 5.0 TCP连接示例*/// ============ 连接配置 ============const serverUri = 'mqtt://mqtt-xxx.mqtt.tencenttdmq.com:1883';const clientId = 'QuickStart';const username = 'YOUR_USERNAME';const password = 'YOUR_PASSWORD';const pubTopic = 'home/test';const topicFilters = ['home/test', 'home/#', 'home/+'];const qos = [1, 1, 1];// 配置连接选项const options = {clientId: clientId,username: username,password: password,protocolVersion: 5, // MQTT 5.0clean: true,reconnectPeriod: 0, // 禁用自动重连};// 创建客户端并连接const client = mqtt.connect(serverUri, options);// 连接成功client.on('connect', async () => {console.log('已连接');try {// 2. 订阅for (let i = 0; i < topicFilters.length; i++) {await new Promise((resolve, reject) => {client.subscribe(topicFilters[i], { qos: qos[i] }, (err) => {if (err) reject(err);else resolve();});});}console.log('已订阅');// 3. 发布for (let i = 1; i <= 16; i++) {await new Promise((resolve, reject) => {client.publish(pubTopic, `消息 #${i}`, { qos: 1 }, (err) => {if (err) reject(err);else resolve();});});await new Promise(resolve => setTimeout(resolve, 500));}console.log('发布完成');// 4. 等待接收await new Promise(resolve => setTimeout(resolve, 2000));// 5. 断开连接client.end();console.log('已断开');} catch (error) {console.error('错误:', error.message);client.end();}});// 消息接收处理client.on('message', (topic, message) => {console.log(`收到消息: ${topic} -> ${message.toString()}`);});// 错误处理client.on('error', (error) => {console.error('错误:', error.message);});
const mqtt = require('mqtt');const fs = require('fs');const tls = require('tls');/*** MQTT 5.0 TLS加密连接示例*/// ============ 连接配置 ============const serverUri = 'mqtts://mqtt-xxx.mqtt.tencenttdmq.com:8883';const clientId = 'QuickStart';const username = 'YOUR_USERNAME';const password = 'YOUR_PASSWORD';const pubTopic = 'home/test';const topicFilters = ['home/test', 'home/#', 'home/+'];const qos = [1, 1, 1];// CA证书路径(可选,用于验证服务器证书)const caCertPath = null; // 例如: '/path/to/ca.crt'// 配置连接选项(包含TLS)const options = {clientId: clientId,username: username,password: password,protocolVersion: 5, // MQTT 5.0clean: true,reconnectPeriod: 0, // 禁用自动重连// TLS配置rejectUnauthorized: true, // 生产环境: 验证服务器证书// 如果提供了CA证书,加载它ca: caCertPath ? [fs.readFileSync(caCertPath)] : undefined,// 自定义证书验证checkServerIdentity: (host, cert) => {return validateServerCertificate(host, cert);}};/*** 验证服务器证书*/function validateServerCertificate(host, cert) {// 1. 检查证书有效期const now = new Date();const validFrom = new Date(cert.valid_from);const validTo = new Date(cert.valid_to);if (now < validFrom || now > validTo) {const error = new Error(`证书验证失败: 证书已过期或尚未生效 (有效期: ${validFrom} - ${validTo})`);console.error(error.message);return error;}// 2. 检查主题名称(可选,根据实际需求)// const expectedSubject = 'CN=*.mqtt.tencenttdmq.com';// if (!cert.subject.CN || !cert.subject.CN.includes('mqtt.tencenttdmq.com')) {// const error = new Error(`证书验证失败: 主题不匹配 (实际: ${cert.subject.CN})`);// console.error(error.message);// return error;// }// 3. 使用默认的证书链验证const err = tls.checkServerIdentity(host, cert);if (err) {console.error('证书验证失败:', err.message);return err;}console.log('证书验证通过');return undefined;}// 创建客户端并连接const client = mqtt.connect(serverUri, options);// 连接成功client.on('connect', async () => {console.log('已连接(TLS加密)');try {// 2. 订阅for (let i = 0; i < topicFilters.length; i++) {await new Promise((resolve, reject) => {client.subscribe(topicFilters[i], { qos: qos[i] }, (err) => {if (err) reject(err);else resolve();});});}console.log('已订阅');// 3. 发布for (let i = 1; i <= 16; i++) {await new Promise((resolve, reject) => {client.publish(pubTopic, `消息 #${i}`, { qos: 1 }, (err) => {if (err) reject(err);else resolve();});});await new Promise(resolve => setTimeout(resolve, 500));}console.log('发布完成');// 4. 等待接收await new Promise(resolve => setTimeout(resolve, 2000));// 5. 断开连接client.end();console.log('已断开');} catch (error) {console.error('错误:', error.message);client.end();}});// 消息接收处理client.on('message', (topic, message) => {console.log(`收到消息: ${topic} -> ${message.toString()}`);});// 错误处理client.on('error', (error) => {console.error('错误:', error.message);});
const mqtt = require('mqtt');/*** MQTT 3.1.1 TCP连接示例*/// ============ 连接配置 ============const serverUri = 'mqtt://mqtt-xxx.mqtt.tencenttdmq.com:1883';const clientId = 'QuickStart';const username = 'YOUR_USERNAME';const password = 'YOUR_PASSWORD';const pubTopic = 'home/test';const topicFilters = ['home/test', 'home/#', 'home/+'];const qos = [1, 1, 1];// 配置连接选项const options = {clientId: clientId,username: username,password: password,protocolVersion: 4, // MQTT 3.1.1clean: true,reconnectPeriod: 0, // 禁用自动重连};// 创建客户端并连接const client = mqtt.connect(serverUri, options);// 连接成功client.on('connect', async () => {console.log('已连接');try {// 2. 订阅for (let i = 0; i < topicFilters.length; i++) {await new Promise((resolve, reject) => {client.subscribe(topicFilters[i], { qos: qos[i] }, (err) => {if (err) reject(err);else resolve();});});}console.log('已订阅');// 3. 发布for (let i = 1; i <= 16; i++) {await new Promise((resolve, reject) => {client.publish(pubTopic, `消息 #${i}`, { qos: 1 }, (err) => {if (err) reject(err);else resolve();});});await new Promise(resolve => setTimeout(resolve, 500));}console.log('发布完成');// 4. 等待接收await new Promise(resolve => setTimeout(resolve, 2000));// 5. 断开连接client.end();console.log('已断开');} catch (error) {console.error('错误:', error.message);client.end();}});// 消息接收处理client.on('message', (topic, message) => {console.log(`收到消息: ${topic} -> ${message.toString()}`);});// 错误处理client.on('error', (error) => {console.error('错误:', error.message);});
const mqtt = require('mqtt');const fs = require('fs');const tls = require('tls');/*** MQTT 3.1.1 TLS加密连接示例*/// ============ 连接配置 ============const serverUri = 'mqtts://mqtt-xxx.mqtt.tencenttdmq.com:8883';const clientId = 'QuickStart';const username = 'YOUR_USERNAME';const password = 'YOUR_PASSWORD';const pubTopic = 'home/test';const topicFilters = ['home/test', 'home/#', 'home/+'];const qos = [1, 1, 1];// CA证书路径(可选,用于验证服务器证书)const caCertPath = null; // 例如: '/path/to/ca.crt'// 配置连接选项(包含TLS)const options = {clientId: clientId,username: username,password: password,protocolVersion: 4, // MQTT 3.1.1clean: true,reconnectPeriod: 0, // 禁用自动重连// TLS配置rejectUnauthorized: true, // 生产环境: 验证服务器证书// 如果提供了CA证书,加载它ca: caCertPath ? [fs.readFileSync(caCertPath)] : undefined,// 自定义证书验证checkServerIdentity: (host, cert) => {return validateServerCertificate(host, cert);}};/*** 验证服务器证书*/function validateServerCertificate(host, cert) {// 1. 检查证书有效期const now = new Date();const validFrom = new Date(cert.valid_from);const validTo = new Date(cert.valid_to);if (now < validFrom || now > validTo) {const error = new Error(`证书验证失败: 证书已过期或尚未生效 (有效期: ${validFrom} - ${validTo})`);console.error(error.message);return error;}// 2. 检查主题名称(可选,根据实际需求)// const expectedSubject = 'CN=*.mqtt.tencenttdmq.com';// if (!cert.subject.CN || !cert.subject.CN.includes('mqtt.tencenttdmq.com')) {// const error = new Error(`证书验证失败: 主题不匹配 (实际: ${cert.subject.CN})`);// console.error(error.message);// return error;// }// 3. 使用默认的证书链验证const err = tls.checkServerIdentity(host, cert);if (err) {console.error('证书验证失败:', err.message);return err;}console.log('证书验证通过');return undefined;}// 创建客户端并连接const client = mqtt.connect(serverUri, options);// 连接成功client.on('connect', async () => {console.log('已连接(TLS加密)');try {// 2. 订阅for (let i = 0; i < topicFilters.length; i++) {await new Promise((resolve, reject) => {client.subscribe(topicFilters[i], { qos: qos[i] }, (err) => {if (err) reject(err);else resolve();});});}console.log('已订阅');// 3. 发布for (let i = 1; i <= 16; i++) {await new Promise((resolve, reject) => {client.publish(pubTopic, `消息 #${i}`, { qos: 1 }, (err) => {if (err) reject(err);else resolve();});});await new Promise(resolve => setTimeout(resolve, 500));}console.log('发布完成');// 4. 等待接收await new Promise(resolve => setTimeout(resolve, 2000));// 5. 断开连接client.end();console.log('已断开');} catch (error) {console.error('错误:', error.message);client.end();}});// 消息接收处理client.on('message', (topic, message) => {console.log(`收到消息: ${topic} -> ${message.toString()}`);});// 错误处理client.on('error', (error) => {console.error('错误:', error.message);});
参数说明
参数 | 说明 |
topic | MQTT 第一级 Topic,在控制台集群详情页 Topic 管理页面复制。 ![]() |
connectUrl | broker 连接地址,在控制台目标集群基本信息 > 接入信息模块中复制。位置如下图所示。格式:mqtt-xxx-gz.mqtt.qcloud.tencenttdmq.com:1883。 ![]() |
clientId | 客户端 ID,在控制台集群详情页客户端管理页面获取。 ![]() |
username | 连接用户名,在控制台集群详情页认证管理页面复制。 ![]() |
password | 连接用户名匹配的密码,在控制台集群详情页认证管理页面复制。 |



