前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端

mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端

作者头像
全栈程序员站长
发布2022-09-12 20:37:45
1.3K0
发布2022-09-12 20:37:45
举报

大家好,又见面了,我是你们的朋友全栈君。

服务端:

using MQTTnet;

using MQTTnet.Server;

using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Diagnostics;

using System.IO;

using System.Linq;

using System.Net;

using System.ServiceProcess;

using System.Text;

using System.Threading.Tasks;

using System.Timers;

namespace MQTTNetService

{

public partial class MQTTNetService : ServiceBase

{

private MqttServer mqttServer = null;

private System.Timers.Timer timer = null;

//此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多1000个清零;

private List subClientIDs = new List();

public MQTTNetService()

{

InitializeComponent();

//创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中

timer = new System.Timers.Timer();

timer.AutoReset = true;

timer.Enabled = true;

timer.Interval = 5000;

timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);

}

protected override void OnStart(string[] args)

{

//开启服务

CreateMQTTServer();

if (timer.Enabled == false)

{

timer.Enabled = true;

timer.Start();

}

}

protected override void OnStop()

{

if (timer.Enabled == true)

{

timer.Enabled = false;

timer.Stop();

}

}

///

/// 开启服务

///

private async void CreateMQTTServer()

{

if (mqttServer == null)

{

var options = new MqttServerOptions();

var optionsBuilder = new MqttServerOptionsBuilder();

//指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。

//options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”))

//指定端口

optionsBuilder.WithDefaultEndpointPort(1883);

//连接记录数,默认 一般为2000

//optionsBuilder.WithConnectionBacklog(2000);

mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;

//将发送的消息加到日志

mqttServer.ApplicationMessageReceived += (s, e) =>

{

string msg = @”发送消息的客户端id:” + e.ClientId + “\n”

+ “发送时间:” + DateTime.Now + “\n”

+ “发送消息的主题:” + e.ApplicationMessage.Topic + “\n”

+ “发送的消息内容:” + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + “\n”

+ “————————————————–\n”

;

WriteMsgLog(msg);

};

await mqttServer.StartAsync(options);

}

}

#region 记录日志

///

/// 消息记录日志

///

///

private void WriteMsgLog(string msg)

{

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下

string path = AppDomain.CurrentDomain.BaseDirectory + “\\Msglog.txt”;

FileInfo file = new FileInfo(path);

if (!file.Exists)

{

FileStream fs;

fs = File.Create(path);

fs.Close();

}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))

{

using (StreamWriter sw = new StreamWriter(fs))

{

sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);

}

}

}

///

///客户端链接日志

///

///

private void WriteClientLinkLog(string msg)

{

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下

string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientLinklog.txt”;

FileInfo file = new FileInfo(path);

if (!file.Exists)

{

FileStream fs;

fs = File.Create(path);

fs.Close();

}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))

{

using (StreamWriter sw = new StreamWriter(fs))

{

sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);

}

}

}

///

/// 通过定时器将客户端链接信息写入日志

///

///

///

private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)

{

if (mqttServer != null)

{

List subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();

if (subclients.Count > 0)

{

foreach (var item in subclients)

{

if (!subClientIDs.Contains(item.ClientId))

{

subClientIDs.Add(item.ClientId);

string msg = @”接收客户端ID:” + item.ClientId + “\n”

+ “接收时间:” + DateTime.Now + “\n”

+ “协议版本:” + item.ProtocolVersion + “\n”

+ “最后收到的非保持活包” + item.LastNonKeepAlivePacketReceived + “\n”

+ “最后收到的包” + item.LastPacketReceived + “\n”

+ “挂起的应用程序消息” + item.PendingApplicationMessages + “\n”

+ “————————————————” + “\n”;

WriteClientLinkLog(msg);

}

}

if (subClientIDs.Count >= 1000)

{

subClientIDs.Clear();

}

}

}

}

#endregion

}

}

以上服务端不能判断特定标识的客户端接入,也就是只要有客户端连接就会接入,不够完善

客户端:简单用于测试 接收net core

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Protocol;

using System;

using System.Text;

using System.Threading.Tasks;

namespace mqttclienttest0101

{

class Program

{

static MqttClient mqttClient = null;

static void Main(string[] args)

{

//var factory = new MqttFactory();

//var mqttClient = factory.CreateMqttClient();

//var options = new MqttClientOptionsBuilder();

//options.WithClientId(new Guid().ToString());

//options.WithTcpServer(“192.168.3.193”);

//options.WithTls();

//options.WithCleanSession();

//var task = mqttClient.ConnectAsync(options.Build());

RunAsync();

Console.ReadLine();

if (mqttClient != null)

{

var message = new MqttApplicationMessageBuilder();

message.WithTopic(“TopicTest”);

message.WithPayload(“topictest001”);

message.WithExactlyOnceQoS();

message.WithRetainFlag();

mqttClient.PublishAsync(message.Build());

}

Console.WriteLine(“Hello World!”);

}

public static async Task RunAsync()

{

try

{

var options = new MqttClientOptions

{

ClientId = new Guid().ToString(),

CleanSession = true,

ChannelOptions = new MqttClientTcpOptions

{

//Server = “localhost”,

Server = “127.0.1.1”

},

//ChannelOptions = new MqttClientWebSocketOptions

//{

// Uri = “ws://localhost:59690/mqtt”

//}

};

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient()as MqttClient;

mqttClient.ApplicationMessageReceived += (s, e) =>

{

Console.WriteLine(“### RECEIVED APPLICATION MESSAGE ###”);

Console.WriteLine($”+ Topic = {e.ApplicationMessage.Topic}”);

Console.WriteLine($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

Console.WriteLine($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);

Console.WriteLine($”+ Retain = {e.ApplicationMessage.Retain}”);

Console.WriteLine();

};

mqttClient.Connected += async (s, e) =>

{

Console.WriteLine(“### CONNECTED WITH SERVER ###”);

await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(“#”).Build());

Console.WriteLine(“### SUBSCRIBED ###”);

};

mqttClient.Disconnected += async (s, e) =>

{

Console.WriteLine(“### DISCONNECTED FROM SERVER ###”);

await Task.Delay(TimeSpan.FromSeconds(5));

try

{

await mqttClient.ConnectAsync(options);

}

catch

{

Console.WriteLine(“### RECONNECTING FAILED ###”);

}

};

try

{

await mqttClient.ConnectAsync(options);

}

catch (Exception exception)

{

Console.WriteLine(“### CONNECTING FAILED ###” + Environment.NewLine + exception);

}

Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);

//while (true)

//{

// Console.ReadLine();

// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));

// var applicationMessage = new MqttApplicationMessageBuilder()

// .WithTopic(“A/B/C”)

// .WithPayload(“Hello World”)

// .WithAtLeastOnceQoS()

// .Build();

// await mqttClient.PublishAsync(applicationMessage);

//}

}

catch (Exception exception)

{

Console.WriteLine(exception);

}

}

}

}

客户端2 发送

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Protocol;

using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Drawing;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using System.Windows.Forms;

namespace MqttClientTest01

{

public partial class Form1 : Form

{

private MqttClient mqttClient = null;

public Form1()

{

InitializeComponent();

}

private void Form1_Load(object sender, EventArgs e)

{

}

private void but_linkserver_Click(object sender, EventArgs e)

{

RunAsync();

创建一个新的MQTT客户端。

//var factory = new MqttFactory();

//mqttClient = factory.CreateMqttClient() as MqttClient;

使用构建器创建基于TCP的选项。

//var options = new MqttClientOptionsBuilder();

//options.WithClientId(new Guid().ToString());

//options.WithTcpServer(txtb_serverip.Text.Trim(), Convert.ToInt32(txtb_serverport.Text.Trim()));

//options.WithTls();

//options.WithCleanSession();

//but_submsg_Click(sender, e);

//var task = mqttClient.ConnectAsync(options.Build());

//if (task != null)

//{

//}

}

public void RunAsync()

{

try

{

var options = new MqttClientOptions

{

ClientId = Guid.NewGuid().ToString(),

CleanSession = true,

ChannelOptions = new MqttClientTcpOptions

{

//Server = “localhost”,

Server = txtb_serverip.Text.Trim(),

Port = Convert.ToInt32(txtb_serverport.Text.Trim()) ,

BufferSize=20*400,

},

//ChannelOptions = new MqttClientWebSocketOptions

//{

// Uri = “ws://localhost:59690/mqtt”

//}

};

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient() as MqttClient;

try

{

var task= mqttClient.ConnectAsync(options);

if (task!=null)

{

lab_linkstatus.Text = “连接成功!”;

lab_linktimer.Text = DateTime.Now.ToString();

}

}

catch (Exception exception)

{

// Console.WriteLine(“### CONNECTING FAILED ###” + Environment.NewLine + exception);

}

// Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);

//while (true)

//{

// Console.ReadLine();

// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));

// var applicationMessage = new MqttApplicationMessageBuilder()

// .WithTopic(“A/B/C”)

// .WithPayload(“Hello World”)

// .WithAtLeastOnceQoS()

// .Build();

// await mqttClient.PublishAsync(applicationMessage);

//}

}

catch (Exception exception)

{

Console.WriteLine(exception);

}

}

private void tb_username_TextChanged(object sender, EventArgs e)

{

}

private void but_clientsend_Click(object sender, EventArgs e)

{

if (mqttClient != null)

{

var message = new MqttApplicationMessageBuilder();

message.WithTopic(txtb_msgtopic.Text.Trim());

message.WithPayload(rtb_pubmsg.Text.Trim());

message.WithExactlyOnceQoS();

message.WithRetainFlag();

mqttClient.PublishAsync(message.Build());

}

}

private async void but_submsg_Click(object sender, EventArgs k)

{

if (mqttClient != null)

{

//mqttClient.ApplicationMessageReceived += (s, e) =>

//{

// rtb_submsgclient.AppendText(“### RECEIVED APPLICATION MESSAGE ###”);

// rtb_submsgclient.AppendText($”+ Topic = {e.ApplicationMessage.Topic}”);

// rtb_submsgclient.AppendText($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

// rtb_submsgclient.AppendText($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);

// rtb_submsgclient.AppendText($”+ Retain = {e.ApplicationMessage.Retain}”);

//};

var meww= await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce));

订阅主题

//IList subresult = await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(txtb_subtopic.Text.Trim()).Build());

// if (subresult != null)

// {

// List result = subresult.ToList();

// if (result.Count > 0)

// {

// this.Invoke(new Action(() =>

// {

// foreach (var item in result)

// {

// rtb_submsgclient.AppendText(item.TopicFilter.QualityOfServiceLevel.ToString() + “\n”);

// rtb_submsgclient.AppendText(item.TopicFilter.Topic.ToString() + “\n”);

// rtb_submsgclient.AppendText(item.ReturnCode.ToString() + “\n”);

// }

// }));

// }

// }

}

}

}

}

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/152866.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档