C#MQTT编程06--MQTT服务器和客户端(winform版)

hqwest 2024-06-18 17:35:32 阅读 86

1、前言

介绍完基础理论部分,下面在Windows平台上搭建一个简单的MQTT应用,进行简单的应用,整体架构如下图所示;

消息模型:

运用MQTT协议,设备可以很方便地连接到物联网云服务,管理设备并处理数据,最后应用到各种业务场景,如下图所示

前面介绍过,MQTT可以运行在几乎所有的平台,windows,linux什么的都可以,各种语言都有实现MQTT的组件,.net也好,Java也好,都有封装好的mqtt服务器和客户端组件或插件来实现,本系列是在.net平台下实现mqtt的服务器和客户端。

常见的MQTT服务器包括Eclipse Mosquitto、EMQ X、HiveMQ、RabbitMQ、MQTTNET等,本系列文章都是基于.net平台的mqtt服务通信,开发环境vs2022,.net framework4.8。

2、服务器搭建

1、创建项目方案

 

 2、添加库引用

 

 

3、UI布局

 

 

4、控件代码

“启动”按钮

停止按钮

 窗体加载

全部完整代码 

注意这里面用到了多线程及任务task,委托action,异步async及await的技术,在mqtt中必须使用这些技术,否则界面会卡死。

using MQTTnet.Client.Receiving;using MQTTnet.Server;using MQTTnet;using System;using System.Collections.Generic;using System.ComponentModel;using System.Data;using System.Drawing;using System.Linq;using System.Text;using System.Threading.Tasks;using System.Windows.Forms;using MQTTnet.Protocol;namespace MQTTNETServerForms{ public partial class Form1 : Form { private IMqttServer server;//mqtt服务器对象 List<TopicItem> Topics = new List<TopicItem>(); public Form1() { InitializeComponent(); } private void Form1_Load(object sender, EventArgs e) { //创建服务器对象 server = new MqttFactory().CreateMqttServer(); server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(Server_ApplicationMessageReceived));//绑定消息接收事件 server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(new Action<MqttServerClientConnectedEventArgs>(Server_ClientConnected));//绑定客户端连接事件 server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(new Action<MqttServerClientDisconnectedEventArgs>(Server_ClientDisconnected));//绑定客户端断开事件 server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(new Action<MqttServerClientSubscribedTopicEventArgs>(Server_ClientSubscribedTopic));//绑定客户端订阅主题事件 server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(new Action<MqttServerClientUnsubscribedTopicEventArgs>(Server_ClientUnsubscribedTopic));//绑定客户端退订主题事件 server.StartedHandler = new MqttServerStartedHandlerDelegate(new Action<EventArgs>(Server_Started));//绑定服务端启动事件 server.StoppedHandler = new MqttServerStoppedHandlerDelegate(new Action<EventArgs>(Server_Stopped));//绑定服务端停止事件 } /// <summary> /// 绑定消息接收事件 /// </summary> /// <param name="e"></param> private void Server_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { string msg = e.ApplicationMessage.ConvertPayloadToString(); WriteLog(">>> 收到消息:" + msg + ",QoS =" + e.ApplicationMessage.QualityOfServiceLevel + ",客户端=" + e.ClientId + ",主题:" + e.ApplicationMessage.Topic); } /// <summary> /// 绑定客户端连接事件 /// </summary> /// <param name="e"></param> private void Server_ClientConnected(MqttServerClientConnectedEventArgs e) { Task.Run(new Action(() => { lbClients.BeginInvoke(new Action(() => { lbClients.Items.Add(e.ClientId); })); })); WriteLog(">>> 客户端" + e.ClientId + "连接"); } /// <summary> /// 绑定客户端断开事件 /// </summary> /// <param name="e"></param> private void Server_ClientDisconnected(MqttServerClientDisconnectedEventArgs e) { Task.Run(new Action(() => { lbClients.BeginInvoke(new Action(() => { lbClients.Items.Remove(e.ClientId); })); })); WriteLog(">>> 客户端" + e.ClientId + "断开"); } /// <summary> /// 绑定客户端订阅主题事件 /// </summary> /// <param name="e"></param> private void Server_ClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e) { Task.Run(new Action(() => { var topic = Topics.FirstOrDefault(t => t.Topic == e.TopicFilter.Topic); if (topic == null) { topic = new TopicItem { Topic = e.TopicFilter.Topic, Count = 0 }; Topics.Add(topic); } if (!topic.Clients.Exists(c => c == e.ClientId)) { topic.Clients.Add(e.ClientId); topic.Count++; } lvTopic.Invoke(new Action(() => { this.lvTopic.Items.Clear(); })); foreach (var item in this.Topics) { lvTopic.Invoke(new Action(() => { this.lvTopic.Items.Add($"{item.Topic}:{item.Count}"); })); } })); WriteLog(">>> 客户端" + e.ClientId + "订阅主题" + e.TopicFilter.Topic); } /// <summary> /// 绑定客户端退订主题事件 /// </summary> /// <param name="e"></param> private void Server_ClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e) { Task.Run(new Action(() => { var topic = Topics.FirstOrDefault(t => t.Topic == e.TopicFilter); if (topic != null) { topic.Count--; topic.Clients.Remove(e.ClientId); } this.lvTopic.Items.Clear(); foreach (var item in this.Topics) { this.lvTopic.Items.Add($"{item.Topic}:{item.Count}"); } })); WriteLog(">>> 客户端" + e.ClientId + "退订主题" + e.TopicFilter); } /// <summary> /// 绑定服务端启动事件 /// </summary> /// <param name="e"></param> private void Server_Started(EventArgs e) { WriteLog(">>> 服务端已启动!"); } /// <summary> /// 绑定服务端停止事件 /// </summary> /// <param name="e"></param> private void Server_Stopped(EventArgs e) { WriteLog(">>> 服务端已停止!"); } /// <summary> /// 显示日志 /// </summary> /// <param name="message"></param> public void WriteLog(string message) { if (txtMsg.InvokeRequired) { txtMsg.Invoke(new Action(() => { txtMsg.Text = ""; txtMsg.Text = (message + "\r"); })); } else { txtMsg.Text = ""; txtMsg.Text = (message + "\r"); } } /// <summary> /// 启动 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> [Obsolete] private async void btnStart_Click(object sender, EventArgs e) { var optionBuilder = new MqttServerOptionsBuilder() .WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(this.txtIP.Text)) .WithDefaultEndpointPort(int.Parse(this.txtPort.Text)) .WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(5000)) .WithConnectionValidator(t => { string un = "", pwd = ""; un = this.txtUname.Text; pwd = this.txtUpwd.Text; if (t.Username != un || t.Password != pwd) { t.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; } else { t.ReturnCode = MqttConnectReturnCode.ConnectionAccepted; } }); var option = optionBuilder.Build(); //启动 await server.StartAsync(option); WriteLog(">>> 服务器启动成功"); } /// <summary> /// 停止 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void btnStop_Click(object sender, EventArgs e) { if (server != null) { server.StopAsync(); } } }}

 注意这个端口,帐号,密码可以自己决定,Ip地址也是。

 注意这里面的代码,服务器上必须注册绑定实现的几个事件:消息接收事件 ,客户端连接事件,客户端断开事件,客户端订阅主题事件,客户端退订主题事件,服务端启动事件,服务端停止事件

server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(Server_ApplicationMessageReceived));//绑定消息接收事件 server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(new Action<MqttServerClientConnectedEventArgs>(Server_ClientConnected));//绑定客户端连接事件 server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(new Action<MqttServerClientDisconnectedEventArgs>(Server_ClientDisconnected));//绑定客户端断开事件 server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(new Action<MqttServerClientSubscribedTopicEventArgs>(Server_ClientSubscribedTopic));//绑定客户端订阅主题事件 server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(new Action<MqttServerClientUnsubscribedTopicEventArgs>(Server_ClientUnsubscribedTopic));//绑定客户端退订主题事件 server.StartedHandler = new MqttServerStartedHandlerDelegate(new Action<EventArgs>(Server_Started));//绑定服务端启动事件 server.StoppedHandler = new MqttServerStoppedHandlerDelegate(new Action<EventArgs>(Server_Stopped));//绑定服务端停止事件

启动测试服务器

 启动成功。

3、客户端创建

 1、添加项目

2、添加库引用 

注意这里添加的与服务器不一样,别混错了

 

3、UI布局

 布局使用的是常规的label,textbox,button 

4、控件代码

连接代码

订阅代码

 

发布代码

 完整代码

using MQTTnet.Client.Options;using MQTTnet.Client;using MQTTnet.Extensions.ManagedClient;using System;using System.Collections.Generic;using System.ComponentModel;using System.Data;using System.Drawing;using System.Linq;using System.Text;using System.Threading.Tasks;using System.Windows.Forms;using MQTTnet;namespace MQTTNETClientForms{ public partial class Form1 : Form { private IManagedMqttClient mqttClient;//客户端mqtt对象 public Form1() { InitializeComponent(); } /// <summary> /// 连接 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private async void btnConn_Click(object sender, EventArgs e) { var mqttClientOptions = new MqttClientOptionsBuilder() .WithClientId(this.txtId.Text) .WithTcpServer(this.txtIP.Text, int.Parse(this.txtPort.Text)) .WithCredentials(this.txtName.Text, this.txtUpwd.Text); var options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) .WithClientOptions(mqttClientOptions.Build()) .Build(); //开启 await mqttClient.StartAsync(options); } /// <summary> /// 断开 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private async void btnClose_Click(object sender, EventArgs e) { if (mqttClient != null) { if (mqttClient.IsStarted) { await mqttClient.StopAsync(); } mqttClient.Dispose(); } } private void Form1_Load(object sender, EventArgs e) { var factory = new MqttFactory(); mqttClient = factory.CreateManagedMqttClient();//创建客户端对象 //绑定断开事件 mqttClient.UseDisconnectedHandler(async ee => { WriteLog("与服务器之间的连接断开了,正在尝试重新连接"); // 等待 5s 时间 await Task.Delay(TimeSpan.FromSeconds(5)); try { mqttClient.UseConnectedHandler(tt => { WriteLog(">>> 连接到服务成功"); }); } catch (Exception ex) { WriteLog($"重新连接服务器失败:{ex}"); } }); //绑定接收事件 mqttClient.UseApplicationMessageReceivedHandler(aa => { try { string msg = aa.ApplicationMessage.ConvertPayloadToString(); WriteLog(">>> 消息:" + msg + ",QoS =" + aa.ApplicationMessage.QualityOfServiceLevel + ",客户端=" + aa.ClientId + ",主题:" + aa.ApplicationMessage.Topic); } catch (Exception ex) { WriteLog($"+ 消息 = " + ex.Message); } }); //绑定连接事件 mqttClient.UseConnectedHandler(ee => { WriteLog(">>> 连接到服务成功"); }); } /// <summary> /// 显示日志 /// </summary> /// <param name="message"></param> private void WriteLog(string message) { if (txtMsg.InvokeRequired) { txtMsg.Invoke(new Action(() => { txtMsg.Text = (message); })); } else { txtMsg.Text = (message); } } /// <summary> /// 订阅 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> [Obsolete] private async void btnSub_Click(object sender, EventArgs e) { if (string.IsNullOrWhiteSpace(this.txtTopic.Text)) { WriteLog(">>> 请输入主题"); return; } //在 MQTT 中有三种 QoS 级别: //At most once(0) 最多一次 //At least once(1) 至少一次 //Exactly once(2) 恰好一次 //await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(this.tbTopic.Text).WithAtMostOnceQoS().Build());//最多一次, QoS 级别0 await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(this.txtTopic.Text).WithAtLeastOnceQoS().Build());//恰好一次, QoS 级别1 WriteLog($">>> 成功订阅"); } /// <summary> /// 发布 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private async void btnPub_Click(object sender, EventArgs e) { if (string.IsNullOrWhiteSpace(this.txtTopik.Text)) { WriteLog(">>> 请输入主题"); return; } var result = await mqttClient.PublishAsync( this.txtTopik.Text, this.txtContent.Text, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);//恰好一次, QoS 级别1 WriteLog($">>> 主题:{this.txtTopik.Text},消息:{this.txtContent.Text},结果: {result.ReasonCode}"); } }}

4、运行测试

生成编译解决方案,成功后开始测试

1、启动服务

2、启动客户端

找到生成的客户端debug目录下的.exe文件

3、测试连接 

连接成功,服务器看到客户端上线了

4、测试订阅

再运行一个客户端,连接服务

5、测试发布

 c1向cced主题发布一个消息,结果是c1,c2都收到了消息

同样,c2发布一个消息,c1,c2都收到了消息

 6、测试下线

c1关闭,服务器马上知道了

 5、小结

基于mqttnet的组件搭建的mqtt服务器和客户端通信成功,发布和订阅都ko,ko,ko。

讲解不易,分析不易,原创不易,整理不易,伙伴们动动你的金手指,你的支持是我最大的动力。

 



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。