MQTT属于是物联网的通信协议,在MQTT协议中有两大角色:客户端(发布者/订阅者),服务端(Mqtt broker);针对客户端和服务端需要有遵循该协议的的具体实现,EMQ/EMQ X就是MQTT Broker的一种实现。
EMQ X 是开源百万级分布式 MQTT 消息服务器(MQTT Messaging Broker),用于支持各种接入标准 MQTT协议的设备,实现从设备端到服务器端的消息传递,以及从服务器端到设备端的设备控制消息转发。从而实现物联网设备的数据采集,和对设备的操作和控制 EMQ官网传送门(本博客基于emq x 4.1产品编写)
EMQ X 公司主要提供三个产品,可在官网首页产品导航查看每一种产品;主要体现在支持的连接数量、产品功能和商业服务等方面的区别:
产品功能对比图
虽然EMQ X Enterprise, EMQ X Platform性能更加强大, EMQ X Enterprise 使用说明传送门 但是收费. 本着学习的原则选择的是性能稍差的EMQ X Broker
EMQX支持的安装方式多种多样, docker安装, rpm安装, zip安装 点击跳转到安装地址
下面首先演示rpm安装
# rpm安装
## rpm安装指定版本的emqx
rpm -ivh emqx-centos7-v4.0.5.x86_64.rpm
## 查询emqx安装是否成功
rpm -qa | grep emqx
## 启动emqx
empx start
empx restart
## 查看emqx运行状态
emqx_ctl status
## 关闭emqx
emqx stop
## 卸载 emqx
rpm -e emqx
# 访问emqx
## 访问地址
http://emqx安装ip:18083/
## 默认用户名:admin,默认密码:public
图1: 安装命令截图
图2: 登陆emqx后的Dashboard
需要安装docker, 如果之前没有安装请跳转至 docker教程第二章
# docker安装
## emqx版本查看地址
https://hub.docker.com/r/emqx/emqx/tags?page=1&ordering=last_updated
## 拉取指定版本镜像
docker pull emqx/emqx:v4.0.5
## 将该镜像生成对应容器并运行
docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.5
# 访问emqx
## 访问地址
http://emqx安装ip:18083/
## 默认用户名:admin,默认密码:public
解压zip包, 进入 bin目录下直接运行相关命令即可 bin文件夹文件截图
安装命令
# bin目录下运行
[root@docker01 bin]# ./emqx start
EMQ X Broker v4.0.5 is started successfully!
[root@docker01 bin]# ./emqx_ctl status
Node 'emqx@127.0.0.1' is started
emqx 4.0.5 is running
[root@docker01 bin]# ./emqx stop
# 访问emqx
## 访问地址
http://emqx安装ip:18083/
## 默认用户名:admin,默认密码:public
不同安装方式得到的 EMQ X 其目录结构会有所不同,具体如下:
{<Plugin Name>, <Enabled>}.
, <Enabled>
字段为布尔类型,EMQ X 会在启动时根据 的值判断是否需要启动该插件。
Mnesia 数据库是 Erlang 内置的一个分布式 DBMS,可以直接存储 Erlang 的各种数据结构
EMQ X 使用 Mnesia 数据库存储自身运行数据,例如告警记录、规则引擎已创建的资源和规则、Dashbaord用户信息等数据,这些数据都将被存储在 mnesia 目录下,因此一旦删除该目录,将导致 EMQ X 丢失所有业务数据。可以通过 emqx_ctl mnesia
命令查询 EMQ X 中 Mnesia 数据库的系统信息。
使用基于emqx的图形化ui中的websocket实现图片的收发 请求地址: http://emqx服务器ip:18083/#/websocket
身份认证是大多数应用的重要组成部分,MQTT 协议支持用户名密码认证,启用身份认证能有效阻止非法客户端的连接。 EMQ X 中的认证指的是当一个客户端连接到 EMQ X 的时候,通过服务器端的配置来控制客户端连接服务器的权限。
EMQ X 的认证支持包括两个层面:
EMQ X 支持使用内置数据源(文件、内置数据库)、JWT、外部主流数据库和自定义 HTTP API 作为身份认证数据源。 连接数据源、进行认证逻辑通过插件实现的,每个插件对应一种认证方式,使用前需要启用相应的插件。 客户端连接时插件通过检查其 username/clientid 和 password 是否与指定数据源的信息一致来实现对客户端的身份认证。
EMQ X 支持的认证方式:
注意
EMQ X 默认配置中启用了匿名认证,任何客户端都能接入 EMQ X。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQ X 将根据匿名认证启用情况决定是否允许客户端连接。 生产环境中请禁用匿名认证。
# 进入 etc/emqx.conf
# 配置是否开启匿名认证(默认为true)
## Value: true | false
allow_anonymous = false
配置之后, 可以发现再次使用websocket 直接连接就会失败, 必须要输入指定的用户名和密码
EMQ X 多数认证插件中可以启用哈希方法,数据源中仅保存密码密文,保证数据安全。 启用哈希方法时,用户可以为每个客户端都指定一个 salt(盐)并配置加盐规则,数据库中存储的密码是按照加盐规则与哈希方法处理后的密文。
以 MySQL 认证为例:加盐规则与哈希方法配置
# etc/plugins/emqx_auth_mysql.conf
## 不加盐,仅做哈希处理
auth.mysql.password_hash = sha256
## salt 前缀:使用 sha256 加密 salt + 密码 拼接的字符串
auth.mysql.password_hash = salt,sha256
## salt 后缀:使用 sha256 加密 密码 + salt 拼接的字符串
auth.mysql.password_hash = sha256,salt
## pbkdf2 with macfun iterations dklen
## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
## auth.mysql.password_hash = pbkdf2,sha256,1000,20
任何一种认证方式最终都会返回一个结果:
Username 认证使用配置文件预设客户端用户名与密码,支持通过 HTTP API 管理认证数据。 Username 认证不依赖外部数据源,使用上足够简单轻量。使用这种认证方式前需要开启插件,我们可以在Dashboard里找到这个插件并开启。
配置和测试流程同username认证
开启http认证, 通过客户端来进行认证, emqx通过客户端返回的响应码来判断请求成功或失败 响应码可见 认证结果 这一部分介绍
在实际项目中我们要针对接MQTT消息代理服务端,从而向其发布消息、订阅消息等来完成我们自己的业务逻辑的开发。EMQX针对不同的客户端语言都提供了不同的SDK工具包,可以在官网上查看并下下载
基于Java语言开发的sdk工具, 通过该sdk来实现对emqx的操作 官方文档及源码地址 https://github.com/eclipse/paho.mqtt.java
实现步骤
MQTT.js是MQTT协议的客户端库,用JavaScript编写,适用于node.js和浏览器。 GitHub项目地址:https://github.com/mqttjs/MQTT.js
下面将演示基于sdk-matt.js实现消息的收发
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>mqtt.js测试</title>
<script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js" ></script>
<style>
div{
width: 300px;
height: 300px;
float: left;
border: 1px solid red;
}
</style>
<script type="text/javascript">
$(function () {
//定义连接选项对象
const options = {
clean: true, // 不保留回话
connectTimeout: 4000, // 超时时间
// 认证信息
clientId: 'emqx_h5_client',
username: 'user',
password: '123456',
}
// 连接字符串, 通过协议指定使用的连接方式
// ws 未加密 WebSocket 连接 8083端口
// wss 加密 WebSocket 连接 8084端口
// mqtt 未加密 TCP 连接
// mqtts 加密 TCP 连接
// wxs 微信小程序连接
// alis 支付宝小程序连接
const connectUrl = "ws://emqx所在服务器ip:8083/mqtt";
const client = mqtt.connect(connectUrl,options);
/**
* mqtt.Client相关事件
*/
//当重新连接启动触发回调
client.on('reconnect', () => {
$("#div1").text("正在重连.....");
});
//连接断开后触发的回调
client.on("close",function () {
$("#div1").text("客户端已断开连接.....");
});
//从broker接收到断开连接的数据包后发出。MQTT 5.0特性
client.on("disconnect",function (packet) {
$("#div1").text("从broker接收到断开连接的数据包....."+packet);
});
//客户端脱机下线触发回调
client.on("offline",function () {
$("#div1").text("客户端脱机下线.....");
});
//当客户端无法连接或出现错误时触发回调
client.on("error",(error) =>{
$("#div1").text("客户端出现错误....."+error);
});
//以下两个事件监听粒度细
//当客户端发送任何数据包时发出。这包括published()包以及MQTT用于管理订阅和连接的包
client.on("packetsend",(packet)=>{
$("#div1").text("客户端已发出数据包....."+packet);
});
//当客户端接收到任何数据包时发出。这包括来自订阅主题的信息包以及MQTT用于管理订阅和连接的信息包
client.on("packetreceive",(packet)=>{
$("#div1").text("客户端已收到数据包....."+packet);
});
//注册监听connect事件
client.on("connect",function (connack) {
//成功连接上服务端之后
$("#div1").text("成功连接上服务器"+new Date());
//订阅 testtopic/#
client.subscribe("testtopic/#",{qos:2});
//每隔2秒钟发布一次
setInterval(publish,2000);
});
function publish() {
//发布数据
/*** client.publish(topic,message,[options], [callback])
* message: Buffer or String
* options:{
* qos:0, //默认0
* retain:false, //默认false
* dup:false, //默认false
* properties:{}
* }
* callback:function (err){}
*/
const message = "h5 message "+Math.random()+new Date();
client.publish("testtopic/123",message,{qos:2});
$("#div2").text("客户端发布了数据:"+message);
}
//注册消息到达的事件
client.on("message",(topic, message, packet)=>{
$("#div3").text("客户端收到订阅消息,topic="+topic+";消息数据:"+message+";数据包:"+packet);
});
//页面离开自动断开连接
$(window).bind("beforeunload",()=>{
$("#div1").text("客户端窗口关闭,断开连接");
client.end();
})
});
</script>
</head>
<body>
<div id="div1"></div>
<div id="div2"></div>
<div id="div3"></div>
</body>
</html>
debug < info < notice < warning < error < critical < alert < emergency
log.level = warning
此配置将所有 log handler 的配置设置为 warning。 [Primary Level] -- global log level and filters
/ \
[Handler 1] [Handler 2] -- log levels and filters at each handler
EMQ X 使用了分层的日志系统,在日志级别上,包括全局日志级别 (primary log level)、以及各 log handler 是负责日志处理和输出的工作进程,它由 log handler id 唯一标识,并负有如下任务:
查看 emqx 默认安装的 log handlers:
总结:
可以使用 EMQ X 的命令行工具 emqx_ctl 在运行时修改 emqx 的日志级别:
log.dir = log
emqx.log.N:
: 包含了 EMQ X 的所有日志消息。比如emqx.log.1 , emqx.log.2 …
emqx.log.siz 和 emqx.log.idx:
记录日志滚动信息的系统文件。
run_erl.log:
以 emqx start 方式后台启动 EMQ X 时,用于记录启动信息的系统文件。
erlang.log.N:
以 emqx start 方式后台启动 EMQ X 时,控制台日志的副本文件。比如 erlang.log.1 , erlang.log.2 …
详情见官方文档-日志参数配置
HTTP 认证使用外部自建 HTTP 应用认证授权数据源,根据 HTTP API 返回的数据判定授权结果,能够实现复杂的 ACL 校验逻辑。
搭建实现过程
注意:
auth.http.super_req
和auth.http.acl_req
配置的是客户端应用程序superuser 请求和ACL 授权查询请求接口地址, 因此我们不仅要关注接口的地址正确与否, 还要编写这两个接口的代码通过MQTTX测试发布订阅权限是否成功 MQTTX安装教程在第三章第2节认证操作案例之username认证第5步
订阅主题
向指定主题发送消息并通过订阅消息接收 因为是超级用户, 所以订阅和接收都没有限制
客户端应用程序输出信息
主题订阅
客户端应用程序输出信息
测试能否订阅 testtopic/# 主题
测试能否向 testtopic/123 主题发送消息(可以)
同时可以看到emq-client2接收到了emq-client3发送的消息
测试过程客户端应用程序输出
由测试结果123可以看出:
插件可以将所有 EMQ X 的事件及消息都发送到指定的 HTTP 服务器 只需要在etc/plugins/emqx_web_hook.conf 中配置用于通知emqx 客户端服务端消息事件的事件类型以及被通知的Web Server地址
插件原理
WebHook 的内部实现是基于钩子,但它更靠近顶层一些。它通过在钩子上的挂载回调函数,获取到 EMQ X中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器。
以客户端成功接入(client.connected) 事件为例,其事件的传递流程如下:
模拟实现
EMQ X 提供了 HTTP API 以实现与外部系统的集成,例如查询客户端信息、发布消息和创建规则等。 EMQ X 的 HTTP API 服务默认监听 8081 端口,可通过 etc/plugins/emqx_management.conf 配置文件修改监听端口,或启用 HTTPS 监听。EMQ X 4.0.0 以后的所有 API 调用均以 api/v4 开头。
接口安全及响应码
EMQ X 的 HTTP API 使用 Basic 认证方式, id 和 password 须分别填写 AppID 和 AppSecret。 默认的AppID 和 AppSecret 是:
amdin/public
。你可以在 Dashboard 的左侧菜单栏里,选择 “MANAGEMENT” ->“Applications” 来修改和添加 AppID/AppSecret。
接口请求工具
在第三章第2节 认证操作案例->client id认证第4步中, 我们已经使用了vscode的rest client 插件发送http请求. 下面我们将继续使用该插件来请求其他api
@hostname = emqx所在服务器ip
@port=8081
@contentType=application/json
@userName=admin
@password=public
#############获取所有支持的API接口########
GET http://{{hostname}}:{{port}}/api/v4 HTTP/1.1
Content-Type: {{contentType}}
Authorization: Basic {{userName}}:{{password}}
#############获取所有Broker基本信息########
GET http://{{hostname}}:{{port}}/api/v4/brokers HTTP/1.1
Content-Type: {{contentType}}
Authorization: Basic {{userName}}:{{password}}
#############获取Broker基本信息########
GET http://{{hostname}}:{{port}}/api/v4/brokers/{node} HTTP/1.1
Content-Type: {{contentType}}
Authorization: Basic {{userName}}:{{password}}
#############获取客户端列表信息########
GET http://{{hostname}}:{{port}}/api/v4/clients HTTP/1.1
Content-Type: {{contentType}}
Authorization: Basic {{userName}}:{{password}}
#############获取集群下所有订阅信息########
GET http://{{hostname}}:{{port}}/api/v4/subscriptions HTTP/1.1
Content-Type: {{contentType}}
Authorization: Basic {{userName}}:{{password}}
#############获取所有状态数据########
GET http://{{hostname}}:{{port}}/api/v4/stats HTTP/1.1
Content-Type: {{contentType}}
Authorization: Basic {{userName}}:{{password}}
#############获取集群下当前告警信息########
GET http://{{hostname}}:{{port}}/api/v4/alarms/present HTTP/1.1
Content-Type: {{contentType}}
Authorization: Basic {{userName}}:{{password}}
#############获取黑名单信息########
GET http://{{hostname}}:{{port}}/api/v4/banned HTTP/1.1
Content-Type: {{contentType}}
Authorization: Basic {{userName}}:{{password}}
当客户端建立订阅时,如果服务端存在主题匹配的保留消息,则这些保留消息将被立即发送给该客户端。借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,这在很多场景下非常重要的。
EMQ X 默认开启保留消息的功能,可以在 etc/emqx.conf 中修改 mqtt.retain_available 为 false 以禁用保留消息功能。如果 EMQ X 在保留消息功能被禁用的情况下依然收到了保留消息,那么将返回原因码为0x9A(不支持保留消息)的 DISCONNECT 报文。
应用场景
某车联网项目,车辆出租公司会实时监控所有车辆的GPS地理位置信息,这些信息是通过每个车辆每10分钟定时上报的GPS信息,这些信息需要展示在某调度系统的大屏上,该调度系统因为其他模块升级需要重新部署,升级后也需要去订阅获取所有车辆的GPS信息,上线完成后刚好错过了车辆最近一次上报的GPS信息,如果这些消息不是保留消息,该调度系统大屏上是空白的,必须等10分钟后才能调度这些车辆,10分钟内无法做出任何操作,用户体验非常差,但是如果这些信息是保留消息,该系统上线后立即就会收到最近所有车辆的位置信息,立即就可以展示然后进行调度。
保留消息配置
EMQ X 的保留消息功能是由 emqx_retainer 插件实现,该插件默认开启,通过修改 emqx_retainer 插件的配置,可以调整 EMQ X 储存保留消息的位置,限制接收保留消息数量和 Payload 最大长度,以及调整保留消息的过期时间。 emqx_retainer 插件默认开启,插件的配置路径为 etc/plugins/emqx_retainer.conf 。
## retained 消息存储方式
## - ram: 仅内存
## - disc: 内存和磁盘
## - disc_only: 仅磁盘
retainer.storage_type = ram
## 最大存储数 (0表示未限制)
retainer.max_retained_messages = 0
## 单条最大可存储消息大小
retainer.max_payload_size = 1MB
## 过期时间, 0 表示永不过期
## 单位: h 小时; m 分钟; s 秒。如 60m 表示 60 分钟
retainer.expiry_interval = 0
测试保留消息
可以通过DashBoard -> 工具 -> websocket 来模拟保留消息的实现以及效果 1.首先去新建一个保留消息(图1), 然后订阅. 查看接收到的消息(图2) 2.取消步骤1订阅(图3), 然后创建一个非保留消息(图4), 然后再去订阅并查看收到的消息(图5)
图1
图2
图3
图4
图5
共享订阅是在多个订阅者之间实现负载均衡的订阅方式 共享订阅的主题格式是针对订阅端来指定的,例如: $share/g/t/a ;而消息的发布方是向主题: t/a发布消息。这样在订阅方才能达到负载均衡的效果
以
$share/<group-name>
为前缀的共享订阅是带群组的共享订阅 group-name 可以为任意字符串,属于同一个群组内部的订阅者将以负载均衡接收消息,但 EMQ X 会向不同群组广播消息。
例如,假设订阅者 s1,s2,s3 属于群组 g1,订阅者 s4,s5 属于群组 g2。那么当 EMQ X 向这个主题发布消息 msg1 的时候:EMQ X 会向两个群组
实现过程
以 queue/ 为前缀的共享订阅是不带群组的共享订阅。它是 share 订阅的一种特例,相当与所有订阅者都在一个订阅组里面:
实现过程
$queue/g1/t1/a
主题.
t1/a
主题发布5条消息不带群组的共享群组订阅消息
.
常用的均衡策略有随机, 轮询, 哈希等, 具体如下图
配置均衡策略
在emqx根目录下. 进入 /etc/emqx.conf , 通过修改broker.shared_dispatch_ack_enabled
属性来修改负载均衡策略
# 均衡策略
## Dispatch strategy for shared subscription
#### Value: Enum
## - random
## - round_robin
## - sticky
## - hash broker.shared_subscription_strategy = random
# 共享分发时是否需要 ACK,适用于 QoS1 QoS2 消息,启用时,当通过shared_subscription_strategy选中的 一个订阅者离线时,应该允许将消息发送到组中的另一个订阅者
broker.shared_dispatch_ack_enabled = false
EMQ X 的延迟发布功能可以实现按照用户配置的时间间隔延迟发布 PUBLISH 报文的功能。当客户端使用特殊主题前缀$delayed/{DelayInteval} 发布消息到 EMQ X 时,将触发延迟发布功能。延迟发布的功能是针对消息发布者而言的,订阅方只需要按照正常的主题订阅即可。
应用场景
某智能售货机平台在双十一当天要对设备中所有商品做5折销售,双十一过去之后要立马恢复原价,为了满足这样的场景,我们可以在双十一0点给所有设备发送两条消息,一条消息是通过正常的主题发送,消息内容打5折;第二条消息延迟消息,延迟24小时,消息内容是恢复原价。这样在一个实现中可以完成两个业务场景。
消息发布格式
# DelayInterval延时时间, 单位秒, TopicName: 主题名
$delayed/{DelayInterval}/{TopicName}
功能实现
http://emqx所在ip:18083/#/plugins
, 开启延时发布插件emqx_delayed_publish
http://emqx所在ip:18083/#/websocket
中, 在创建连接后, 首先订阅一个主题 t1/1
, 然后根据这个主题按照延时发布的格式 $delayed/10/t1/1
发布消息{ "msg": "测试延时发布" }
, 即: 消息发送后消息订阅者延时10s才能接收到消息
EMQ X 的代理订阅功能使得客户端在连接建立时,不需要发送额外的 SUBSCRIBE 报文,便能自动建立用户预设的订阅关系。 静态代理订阅的核心就是通过配置文件来配置订阅的主题, 在我们建立连接后就会自动为我们创建创建订阅
功能实现
开启了 emqx_web_hook 组件后,EMQ X的事件都会勾起对我们配置的webhook接口进行回调,在该webhook接口中我们能够获取客户端的相关信息比如 clientId,username 等,然后我们可以在该接口方法中针对该客户端自动订阅某一主题,订阅的实现我们基于EMQ X给我们提供的监控管理的相关HTTP API,意味着我们调用相关的HTTP API可完成客户端订阅的功能,相关的HTTP API可在Dashboard中查看,也可以在官方的产品文档中查找: 功能概括就是: 让客户端连接时自动订阅某一主题, 反之客户端下线时我们可以自动取消订阅
功能实现
http://emqx所在ip:18083/#/plugins
, 开启webhook插件emqx_web_hook
EMQ X 的主题重写功能支持根据用户配置的规则在客户端订阅主题、发布消息、取消订阅的时候将 A 主题重写为 B 主题。 EMQ X 的保留消息和延迟发布可以与主题重写配合使用,例如,当用户想使用延迟发布功能,但不方便修改客户端发布的主题时,可以使用主题重写将相关主题重写为延迟发布的主题格式。
应用场景:
某共享单车平台A运营着大量的共享单车,每个单车上都装有一个物联网终端芯片,芯片上的程序是将一些数据通过mqtt协议上报到EMQ服务器;该公司某一天收购了另一家共享单车平台B,B平台下原有的单车也是通过mqtt上报消息数据,但是消息主题跟A平台的不一样,如果A平台想接入B平台的车上报的数据,我们就需要把B平台下所有车上芯片程序更改一下,这样虽然可行但是会耗费大量的人力物力成本,这时我们通过主题重写就可以实现B平台下所有单车数据的接收,几乎不需要编码,成本非常低。
每条主题重写规则的格式:
module.rewrite.rule.<number> = 主题过滤器 正则表达式 目标表达式
注意事项
正则表达式解析:
^
匹配输入字符串的开始位置,除非在方括号表达式中使用,当该符号在方括号表达式中使用时,表示不接受该方括号表达式中的字符集合$
匹配输入字符串的结尾位置( )
表示一个标记一个子表达式的开始和结束位置,[
标记一个中括号表达式的开始.
匹配除换行符 \n 之外的任何单字符,+
匹配前面的子表达式一次或多次*
匹配前面的子表达式零次或多次?
匹配前面的子表达式零次或一次|
指明两项之间的一个选择{n} n
是一个非负整数。匹配确定的 n 次{n,} n
是一个非负整数。至少匹配n 次{n,m}
m 和 n 均为非负整数,其中n <= m。最少匹配 n 次且最多匹配 m 次 \d 匹配一个数字字符。等价于 [0-9]主题重写配置实例
module.rewrite.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
module.rewrite.rule.2 = x/# ^x/y/(.+)$ z/y/x/$1
module.rewrite.rule.3 = x/y/+ ^x/y/(\d+)$ z/y/$1
配置解析
功能实现
主题重写功能默认关闭,开启此功能需要修改 etc/emqx.conf 文件中的 module.rewrite 配置项, 修改后重启 emqx
## Rewrite Module
## Enable Rewrite Module.
##
## Value: on | off
module.rewrite = on
##x/y/1 -> z/y/1 ,这里通过下面规则会将原来的主题x/y/1 重写从 z/y/1
module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1
功能测试 登录MQTTX, 订阅重写后的主题, 然后向重写前的主题发送消息, 测试重写配置是否正确, 如下图
EMQ X 为用户提供了黑名单功能,用户可以通过相关的 HTTP API 将指定客户端加入黑名单以拒绝该客户端访问,除了客户端标识符以外,还支持直接封禁用户名甚至 IP 地址。 黑名单只适用于少量客户端封禁需求,如果有大量客户端需要认证管理,我们需要使用认证功能来实现。
自动封禁
在黑名单功能的基础上,EMQ X 支持自动封禁那些被检测到短时间内频繁登录的客户端,并且在一段时间内拒绝这些客户端的登录,以避免此类客户端过多占用服务器资源而影响其他客户端的正常使用。 需要注意的是,自动封禁功能只封禁客户端标识符,并不封禁用户名和 IP 地址,即该机器只要更换客户端标识符就能够继续登录。
自动封禁功能配置
手动配置(通过管理端api实现)
EMQ X 提供对接入速度、消息速度的限制:当客户端连接请求速度超过指定限制的时候,暂停新连接的建立;当消息接收速度超过指定限制的时候,暂停接收消息。
速率限制原理
EMQ X 使用令牌桶Token Bucket算法来对所有的 Rate Limit 来做控制。令牌桶算法 的逻辑如下图:
由此可知该算法中:
功能实现
消息重传 (Message Retransmission) 是属于 MQTT 协议标准规范的一部分。
协议中规定了作为通信的双方 服务端 和 客户端 对于自己发送到对端的 PUBLISH 消息都应满足其 服务质量(Quality of Service levels) 的要求。如:
虽然,QoS 1 和 QoS 2 的 PUBLISH 报文在 MQTT 协议栈这一层都会发生重传,但注意:
功能实现
有两种场景会导致消息重发:
在 etc/emqx.conf 中可配置:
EMQ X 消息服务器 4.x 版本 MQTT 连接压力测试到 130 万,在一台 8 核心、32G 内存的 CentOS 服务器上。100 万连接测试所需的 Linux 内核参数,网络协议栈参数,Erlang 虚拟机参数.EMQ X 消息服务器参数以及测试客户端设置可见官网 系统调优 介绍, 非常全面
EMQ X Rule Engine (以下简称规则引擎) 用于配置 EMQ X 消息流与设备事件的处理、响应规则。 规则引擎用于配置一套规则,该规则是针对EMQ X的消息流和设备事件如何处理的一套细则。
原理
EMQ X 在 消息发布 或 事件触发 时将触发规则引擎,满足触发条件的规则将执行各自的 SQL 语句筛选并处理消息和事件的上下文信息。
规则引擎借助响应动作可将特定主题的消息处理结果存储到关系型数据库(mysql,PostgreSQL),NoSql(Redis,MongoDB),发送到 HTTP Server,转发到消息队列 Kafka 或 RabbitMQ,重新发布到新的主题甚至是另一个 Broker 集群中,每个规则可以配置多个响应动作。
规则引擎使用
$events/
开头的虚拟主题(事件主题)处理 EMQ X 内置事件,内置事件提供更精细的消息控制和客户端动作处理能力,可用在 QoS 1 QoS 2 的消息抵达记录、设备上下线记录等业务中。
应用场景
规则引擎组成
与 EMQ X 规则引擎相关的概念包括: 规则(rule)、动作(action)、资源(resource) 和 资源类型(resourcetype)。 规则、动作、资源的关系:
基本语法
基本语法举例
CASE-WHEN 语法示例
现需要通过规则引擎提取出从 username=emq-client2 的客户端发送过来原始数据中的msg,user,orderNo 等数据,需要过滤 password 字段,同时还需要提取消息发布的qos信息,然后将最终过滤出来的消息通知到我们的web服务上。
功能实现
现将该教程所使用的emq x,mqttx, xhell+xftp 通过百度云分享如下: 觉得不错麻烦动动手点个赞吧~~~
链接:https://pan.baidu.com/s/1dSgs79rw5bnSkQyH-FbR4A 提取码:6mte 复制这段内容后打开百度网盘手机App,操作更方便哦–来自百度网盘超级会员V4的分享