前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >mqttnet消息推送与接收[通俗易懂]

mqttnet消息推送与接收[通俗易懂]

作者头像
全栈程序员站长
发布2022-09-12 20:35:19
1.1K0
发布2022-09-12 20:35:19
举报

大家好,又见面了,我是你们的朋友全栈君。

创建windows服务网上有很多,不多述;

服务端做好后一定要写bat安装卸载文件

install.bat

@echo.请稍等,MqttNetServiceAddUserAndPassword服务安装启动中………… @echo off @title 安装windows服务:MqttNetServiceAddUserAndPassword @sc create MqttNetServiceAddUserAndPassword binPath=”%~dp0\MqttNetServiceAddUserAndPassword.exe” @sc config MqttNetServiceAddUserAndPassword start= auto @sc start MqttNetServiceAddUserAndPassword @echo.MqttNetServiceAddUserAndPassword启动完毕 pause

//binPath=”%~dp0\MqttNetServiceAddUserAndPassword.exe” 当前路径,也可指定

delete.bat

@echo.服务MqttNetServiceAddUserAndPassword卸载中………. @echo off @sc stop MqttNetServiceAddUserAndPassword @sc delete MqttNetServiceAddUserAndPassword @echo off @echo.MqttNetServiceAddUserAndPassword卸载完毕 @pause

服务端:

using MQTTnet; using MQTTnet.Protocol; using MQTTnet.Server; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.ServiceProcess; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Timers;

namespace MqttNetServiceAddUserAndPassword { public partial class Service1 : ServiceBase { private readonly static object locker = new object(); private MqttServer mqttServer = null; private System.Timers.Timer timer = null;

private GodSharp.Sockets.SocketServer socketService = null;

//此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多2000个清零; private List<string> subClientIDs = new List<string>(); public Service1() { InitializeComponent(); //创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中 timer = new System.Timers.Timer(); timer.AutoReset = true; timer.Enabled = true; timer.Interval = 5000; timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);

}

protected override void OnStart(string[] args) { //开启服务 //CreateMQTTServer();

Task.Run(CreateMQTTServer);

if (timer.Enabled == false) { timer.Enabled = true; timer.Start(); } //创建socket服务端 //CreateServerSocket(); // SocketServer.StartSocketService(); }

protected override void OnStop() { if (timer.Enabled == true) { timer.Enabled = false; timer.Stop(); } } /// <summary> /// 开启服务 /// </summary> private async Task CreateMQTTServer() { if (mqttServer == null) { var optionsBuilder = new MqttServerOptionsBuilder(); optionsBuilder.WithConnectionValidator(c => { if (c.ClientId.Length < 5 || !c.ClientId.StartsWith(“Eohi_”)) { c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected; return; }

if (c.Username != “user” || c.Password != “123456”) { c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; return; } c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted; }); //指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。 //options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”)) //指定端口 optionsBuilder.WithDefaultEndpointPort(1884); //连接记录数,默认 一般为2000 //optionsBuilder.WithConnectionBacklog(2000); mqttServer = new MqttFactory().CreateMqttServer() as MqttServer; string msg = null; //将发送的消息加到日志 mqttServer.ApplicationMessageReceived += (s, e) => { msg = @”发送消息的客户端id:” + e.ClientId + “\r\n” + “发送时间:” + DateTime.Now + “\r\n” + “发送消息的主题:” + e.ApplicationMessage.Topic + “\r\n” + “发送的消息内容:” + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + “\r\n” + “————————————————–\r\n” ; WriteMsgLog(msg); }; await mqttServer.StartAsync(optionsBuilder.Build());

} } #region 记录日志 /// <summary> /// 消息记录日志 /// </summary> /// <param name=”msg”></param> private void WriteMsgLog(string msg) {

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下 string path = AppDomain.CurrentDomain.BaseDirectory + “\\Msglog.txt”; FileInfo file = new FileInfo(path); if (!file.Exists) { FileStream fs; fs = File.Create(path); fs.Close(); } using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write)) { using (StreamWriter sw = new StreamWriter(fs)) { sw.WriteLine(DateTime.Now.ToString() + ” ” + msg); } } } private void PubMessage(string topic, string msg) { if (mqttServer != null) { lock (locker) { var message = new MqttApplicationMessageBuilder(); message.WithTopic(topic); message.WithPayload(msg); mqttServer.PublishAsync(message.Build()); } } } /// <summary> ///客户端链接日志 客户端接入 /// </summary> /// <param name=”msg”></param> private void WriteClientLinkLog(string msg) { //string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下 string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientLinklog.txt”; FileInfo file = new FileInfo(path); if (!file.Exists) { FileStream fs; fs = File.Create(path); fs.Close(); }

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write)) { using (StreamWriter sw = new StreamWriter(fs)) { sw.WriteLine(msg); } } } /// <summary> /// 通过定时器将客户端链接信息写入日志 /// </summary> /// <param name=”sender”></param> /// <param name=”e”></param> private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e) { // List<SetServiceM> dic = new List<SetServiceM>(); if (mqttServer != null) { List<ConnectedMqttClient> subclients = mqttServer.GetConnectedClientsAsync().Result.ToList(); if (subclients.Count > 0) { string subclientcount = @”客户端接入的总数为:” + (subclients.Count – 1).ToString() + “\r\n” + “——————————————————- \r\n”; WriteClientLinkLog(subclientcount); PubMessage(“ClientsCount”, (subclients.Count – 1).ToString()); List<string> clientids = new List<string>(); //连接客户端的个数 // dic.Add(SetServiceM.SetService( “ClientCount”, subclients.Count.ToString())); // var dicclientlink = new Dictionary<string, string>();

foreach (var item in subclients) { if (!subClientIDs.Contains(item.ClientId)) { subClientIDs.Add(item.ClientId); string msg = @”连接客户端ID:” + item.ClientId + “\r\n” + “连接时间:” + DateTime.Now + “\r\n” + “协议版本:” + item.ProtocolVersion + “\r\n” + “最后收到的非保持活包:” + item.LastNonKeepAlivePacketReceived + “\r\n” + “最后收到的包:” + item.LastPacketReceived + “\r\n” + “挂起的应用程序消息:” + item.PendingApplicationMessages + “\r\n” + “————————————————” + “\r\n”; WriteClientLinkLog(msg); PubMessage(“clientlink”, msg); // mqttServer.PublishAsync(“clientlink”, msg); // dicclientlink.Add(item.ClientId, msg); } clientids.Add(item.ClientId); } if (subClientIDs.Count >= 2000) { subClientIDs.Clear(); } var exceptlist = subClientIDs.Except(clientids).ToList(); // var dicclientoutline = new Dictionary<string, string>(); if (exceptlist.Count > 0) {

exceptlist.ForEach(u => { string msgoutline = @”客户端下线ID:” + u + “\r\n” + “客户端下线时间:” + DateTime.Now.ToString() + “\r\n” + “———————————————————— \r\n” ; WriteClientLinkLog(msgoutline); subClientIDs.Remove(u); PubMessage(“clientlink”, msgoutline); // mqttServer.PublishAsync(“clientlink”, msgoutline); // dicclientoutline.Add(“OutLineID_” + u, msgoutline); }); } 连接客户端的id //dic.Add(SetServiceM.SetService(“clientlink”, JsonConvert.SerializeObject(dicclientlink))); 客户端下线的时间 //dic.Add(SetServiceM.SetService(“clientoutline”, JsonConvert.SerializeObject(dicclientoutline))); //SocketServer.connection.Send(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dic))); } else { string subclientcount = @”暂无客户端接入!” + “\r\n” + “——————————————————– \r\n”; WriteClientLinkLog(subclientcount); } } } /// <summary> /// 客户端下线时间 /// </summary> /// <param name=”msg”></param> public void WriteClientOutLineLog(string msg) { string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientOutLineLog.txt”; FileInfo file = new FileInfo(path); if (!file.Exists) { FileStream fs = File.Create(path); fs.Close(); } using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write)) { using (StreamWriter sw = new StreamWriter(fs)) { sw.WriteLine(msg); } } } //windows服务里的服务端 private void CreateServerSocket() { if (socketService == null) { // IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(“127.0.0.1”), 9001); socketService = new GodSharp.Sockets.SocketServer(“127.0.0.1”, 9001, ProtocolType.Tcp); //Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); socketService.Start(); socketService.Listen(10); Thread thread = new Thread(new ThreadStart(new Action(() => { while (true) { // socketClient = socketService.Clients[0]; // string data = “sql|” ; //在这里封装数据,通常是自己定义一种数据结构,如struct data{sql;result} // client.Send(Encoding.Default.GetBytes(msg)); } }))); } else { CreateServerSocket(); } }

#endregion }

}

服务端桌面显示程序:

using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Diagnostics; using System.Drawing; using System.Linq; using System.Net; using System.Net.NetworkInformation; using System.Net.Sockets; using System.ServiceProcess; using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms;

namespace MQTTNetFrm { public partial class Form1 : Form { private ServiceController ServiceController = null; private MqttClientOptions options = null; private MqttClient mqttClient = null;

public Form1() { InitializeComponent();

new Thread(new ThreadStart(GetServiceStatus)).Start();

Task.Run(LinkClientService).Wait(); } /// <summary> /// 获取当前ip地址 /// </summary> /// <returns></returns> private string GetLocalIP() { string ip = null; var iplist = Dns.GetHostAddresses(Dns.GetHostName()).DefaultIfEmpty().ToList(); iplist.ForEach(u => { if (u.AddressFamily == AddressFamily.InterNetwork) ip= u.ToString(); }); return ip; } private async Task LinkClientService() { var m = “Eohi_Frm_” + Guid.NewGuid().ToString(); options = new MqttClientOptions { ClientId = m, CleanSession = true, ChannelOptions = new MqttClientTcpOptions { Server = GetLocalIP(), Port = 1884, }, Credentials = new MqttClientCredentials() { Username = “user”, Password = “123456” }

}; var factory = new MqttFactory(); mqttClient = factory.CreateMqttClient() as MqttClient; try { await mqttClient.ConnectAsync(options); but_submsg_Click(); this.Invoke(new Action(() => { lab_serverstatus.Text = “连接正常,服务运行中…………”; })); } catch (Exception ex) {

}

} private async void but_submsg_Click() { if (mqttClient != null) { await mqttClient.SubscribeAsync(new TopicFilter(“ClientsCount”, MqttQualityOfServiceLevel.AtMostOnce)); await mqttClient.SubscribeAsync(new TopicFilter(“clientlink”, MqttQualityOfServiceLevel.AtMostOnce)); await mqttClient.SubscribeAsync(new TopicFilter(“msglog”, MqttQualityOfServiceLevel.AtMostOnce)); mqttClient.ApplicationMessageReceived += (s, e) => { this.Invoke(new Action(() => { var msg = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); if (msg.Length<=5) { lab_clientcount.Text = msg; } if (msg.Length>10) { if (msg.StartsWith(“连接”) ) rtb_clientlog.AppendText(msg); rtb_msglog.AppendText(msg); }

})); };

} } private void GetServiceStatus() { ServiceController[] serviceControllers = ServiceController.GetServices(); if (serviceControllers.Length > 0) { serviceControllers.ToList().ForEach(u => { if (u.DisplayName == “MqttNetServiceAddUserAndPassword”) { if (ServiceController == null) { ServiceController = u; } if (u.Status == ServiceControllerStatus.Running) { lab_serverstatus.Text = “服务运行中…………”; } else { lab_serverstatus.Text = “服务已停止…………”; } } }); } } private void button2_Click(object sender, EventArgs e) { if (tabControl1.SelectedTab == tabPage1) { rtb_clientlog.Text = “”; } else { rtb_msglog.Text = “”; } }

private void Form1_Load(object sender, EventArgs e) {

} }

}

客户端:

using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Timers; using System.Windows.Forms;

namespace MqttClientTest01 {

public partial class Form1 : Form { private MqttClient mqttClient = null; private System.Timers.Timer timer = null; private int CountLink = 0; private MqttClientOptions options = null; public Form1() { InitializeComponent(); 创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中 //timer = new System.Timers.Timer(); //timer.AutoReset = true; //timer.Interval = 1000; //timer.Elapsed += new ElapsedEventHandler(LinkMqttNetService); }

private void LinkMqttNetService(object sender, ElapsedEventArgs e) { if (mqttClient == null) { // RunAsync(); CountLink++; } if (CountLink >= 5) { MessageBox.Show(“连接多次失败,请确认各参数是否正确!”); CountLink = 0; timer.Enabled = false; } } private void but_linkserver_Click(object sender, EventArgs k) { LinkClientService(); //CountLink = 0; //timer.Enabled = true; //timer.Start(); } /// <summary> /// 链接客户端 /// </summary> public async void LinkClientService() { var m = “Eohi_” + Guid.NewGuid().ToString(); options = new MqttClientOptions { ClientId = m, CleanSession = true, ChannelOptions = new MqttClientTcpOptions { Server = txtb_serverip.Text.Trim(), Port = Convert.ToInt32(txtb_serverport.Text.Trim()), }, Credentials = new MqttClientCredentials() { Username = tb_username.Text, Password = tb_userpwd.Text }

}; var factory = new MqttFactory(); mqttClient = factory.CreateMqttClient() as MqttClient; try { await mqttClient.ConnectAsync(options); this.Invoke(new Action(() => { lab_linkstatus.Text = “连接成功!”; lab_linktimer.Text = DateTime.Now.ToString(); })); mqttClient.Disconnected += async (s, e) => { if (e.ClientWasConnected==false) { try { await mqttClient.ConnectAsync(options); this.Invoke(new Action(() => { lab_linkstatus.Text = “连接成功!”; lab_linktimer.Text = DateTime.Now.ToString(); })); } catch (Exception ex) { lab_linkstatus.Text = “连接失败!”+ex.Message; lab_linktimer.Text = DateTime.Now.ToString(); }

} }; } catch (Exception ex) { lab_linkstatus.Text = “连接失败!请检查ip/端口” ; lab_linktimer.Text = DateTime.Now.ToString(); } }

private void tb_username_TextChanged(object sender, EventArgs e) {

}

private void but_clientsend_Click(object sender, EventArgs e) { if (mqttClient != null) { var message = new MqttApplicationMessageBuilder(); message.WithTopic(txtb_msgtopic.Text.Trim()); message.WithPayload(rtb_pubmsg.Text.Trim()); message.WithExactlyOnceQoS(); message.WithRetainFlag(); mqttClient.PublishAsync(message.Build()); } } private async void but_submsg_Click(object sender, EventArgs k) { if (mqttClient != null) { await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce)); mqttClient.ApplicationMessageReceived += (s, e) => { this.Invoke(new Action(() => { rtb_submsgclient.AppendText(“ClientID=” + e.ClientId + “\n”); rtb_submsgclient.AppendText(”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}” + “\n”); rtb_submsgclient.AppendText(”+ Retain = {e.ApplicationMessage.Retain}” + “\n”);

}));

};

} }

private void button1_Click(object sender, EventArgs e) { rtb_submsgclient.Text = “”; } } }

mqttnet消息推送与接收[通俗易懂]
mqttnet消息推送与接收[通俗易懂]
mqttnet消息推送与接收[通俗易懂]
mqttnet消息推送与接收[通俗易懂]

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/152869.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档