使用过C++和Nodejs的Mqtt封装类,之前使用过mosquitto的C语言库以及mosquittopp的C++库;另外nodejs中也有一个mqtt的模块。
在TypeScript中使用mqtt模块十分简单,首先得用npm或者cnpm安装mqtt模块
npm install mqtt
下面是使用TypeScript封装的mqtt的简单客户端,代码如下:
import mqtt = require('mqtt')
export interface MqttConnOpt extends mqtt.IClientOptions{}
export declare type OnMessageFunc = (topic: string, payload: Buffer) => void
declare class Topic {
public topic: string;
public qos: 0|1|2;
}
export class MQTT {
mqclient: mqtt.MqttClient;
brokerHost: string;
brokerPort: number;
subscribeTopics: Array<Topic>;
subscribeCallbacks: Map<string, OnMessageFunc>;
constructor(host?: string | any, port?: number) {
this.brokerHost = host;
this.brokerPort = port;
this.subscribeTopics = new Array<Topic>();
this.subscribeCallbacks = new Map<string, OnMessageFunc>();
}
/**
* 订阅主题
*/
public subscribe(topic: string, qos: 0|1|2) {
this.subscribeTopics.push({topic: topic, qos: qos});
if (this.is_connected()){
this.mqclient.subscribe(topic, {qos: qos});
}
}
/**
* 设置消息数据回调函数
*/
public set_message_callback(topicPatten: string, cb: OnMessageFunc) {
this.subscribeCallbacks.set(topicPatten, cb);
}
/**
* 是否已连接到服务器
*/
public is_connected() {
return this.mqclient.connected == true;
}
/**
* 连接到服务器
*/
public connect(opts?: MqttConnOpt){
this.mqclient = mqtt.connect(`mqtt://${this.brokerHost}:${this.brokerPort}`, opts);
this.mqclient.on('connect', ()=>{
console.log(`成功连接到服务器[${this.brokerHost}:${this.brokerPort}]`);
for (let index = 0; index < this.subscribeTopics.length; index++) {
const element = this.subscribeTopics[index];
this.mqclient.subscribe(element.topic, {qos: element.qos});
}
});
this.mqclient.on('message', (topic: string, payload: Buffer)=>{
this.mqclient;
this.subscribeCallbacks.forEach((val, key)=>{
if (topic.indexOf(key) != -1){
val(topic, payload);
}
});
});
this.mqclient.on('error', (err: Error)=>{
});
}
/**
* 推送数据
*/
public publish(topic: string, message: string, qos: 0|1|2) {
this.mqclient.publish(topic, message, {qos: qos, retain: false})
}
}
import { MQTT } from './mq'
// mqtt客户端列表
var mqttList: Array<MQTT> = new Array<MQTT>();
/**
* 实时数据处理函数
*/
function handleReal(topic: string, payload: Buffer) {
// console.log(`data: ${topic}=>${payload.toString()}`);
const topics = topic.split('/');
}
/**
* 数据包处理函数
*/
function handlePacked(topic: string, payload: Buffer) {
// console.log(`rx: ${topic}=>${payload.toString()}`);
}
/**
* 连接MQTT服务器
*/
function connectMqtt(host: string, port: number, user: string, pwd: string, id: string, clean: boolean) {
let it = new MQTT(host, port);
it.connect({
username:user,
password: pwd,
clientId: id,
clean: clean,
});
// 订阅主题 /gb212/#
it.subscribe('/gb212/#', 0);
// 设置主题 /gb212/data 的消息回调
it.set_message_callback('/gb212/data', handleReal.bind(this));
// 设置主题 /gb212/rx 的消息回调
it.set_message_callback('/gb212/rx', handlePacked.bind(this));
// 将MQTT变量it放到MQTT客户端列表中
mqttList.push(it);
}
// 连接MQTT服务器
connectMqtt('127.0.0.1', 1883, 'test', '123456', 'this_is_test_200507_nodejs_oilfume_2', true);