Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

xamarin Mqtt

醉梦ai天涯 2019-10-20 14:22:00 阅读数:148 评论数:0 点赞数:0 收藏数:0

1 什么是MQTT?

mqtt (Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通讯协议,有可能成为物联网的重要组成部分。MQTT 是基于二进制消息的发布/订阅编程模式的消息协议,如今已经成为 OASIS 规范,由于规范很简单,非常适合需要低功耗和网络带宽有限的 IoT 场景。

2 MQTTnet

MQTTnet 是一个基于MQTT协议高度专业的.net库,它同时提供MQTT client和MQTT server(broke),支持v3.1.0,v3.1.1和v5.0.0的标准MQTT协议.

3 MQTTnet支持范围

.Net Standard 1.3+

.Net Core 1.1+

.Net Core App 1.1+

.Net Framework 4.5.2+(x86,x64,AnyCPU)

Mono 5.2+

Universal Windows Platform(UWP) 10.0.1024+(x86,x64,ARM,AnyCPU,Windwos 10 IoT Core)

Xamarin.Android 7.5+

Xamarin.iOS 10.14+

4 创建服务器

MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

(1)接受来自客户的网络连接;

(2)接受客户发布的应用信息;

(3)处理来自客户端的订阅和退订请求;

(4)向订阅的客户转发应用程序消息。

服务器创建一个控制台应用,可选>>控制台应用(.NET Core)创建新项目MqttNetServer,代码如下:

 sing MQTTnet;
 using MQTTnet.Protocol;
 using MQTTnet.Server;
 using Newtonsoft.Json;
 using System;
 using System.Collections.Generic;
 using System.IO;
 using System.Reflection;
 using System.Security.Cryptography.X509Certificates;
 using System.Text;
 using System.Threading.Tasks;

namespace MqttServerTest
 {
 class Program
  {
 public static IMqttServer mqttServer;
 static void Main(string[] args)
  {
 mqttServer = new MQTTnet.MqttFactory().CreateMqttServer();
 mqttServer.UseClientConnectedHandler(e =>
 {
 Console.WriteLine("***new connect:" + e.ClientId);

 });
 mqttServer.UseClientDisconnectedHandler(e =>
 {
 Console.WriteLine("*** disconnect:" + e.ClientId);
  });

//var options = new MqttServerOptions();
 //await mqttServer.StartAsync(options);

//var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
 //var certificate = new X509Certificate2(Path.Combine(currentPath, "certificate.pfx"), "yourPassword", X509KeyStorageFlags.Exportable);
var optionsBuilder = new MqttServerOptionsBuilder()
 .WithConnectionBacklog()
 .WithDefaultEndpointPort()
 .WithConnectionValidator(c=> {
 //c.SessionItems.
 //if (c.ClientId.Length < 10)
 //{
 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
 // //c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
 // return;
 //}
 //if (c.Username != "mySecretUser")
 //{
 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
 // return;
 //}

//if (c.Password != "mySecretPassword")
 //{
 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
 // return;
 //}
c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
 Console.WriteLine("***connect validator:"+c.ClientId);
  })
 //.WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx))
 //.WithEncryptionSslProtocol(SslProtocols.Tls12)
.WithApplicationMessageInterceptor(context=> {
 //if (context.ApplicationMessage.Topic == "my/custom/topic")
 //{
 // context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
 //}
//// It is possible to disallow the sending of messages for a certain client id like this:
//if (context.ClientId != "Someone")
 //{
 // context.AcceptPublish = false;
 // return;
 //}
 // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
 // This is useful when the IoT device has no own clock and the creation time of the message might be important.
context.AcceptPublish = true;
 Console.WriteLine("***Message:" + context.ApplicationMessage.Payload);
  })
 .WithSubscriptionInterceptor(context=>
 {
 //if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
 //{
 // context.AcceptSubscription = false;
 //}

//if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
 //{
 // context.AcceptSubscription = false;
 // context.CloseConnection = true;
 //}
context.AcceptSubscription = true;
 Console.WriteLine("***Subscript:" + context.TopicFilter);
  })
 //.WithStorage(new RetainedMessageHandler())
 ;
 var options = optionsBuilder.Build();

//// Setting the options
//options.Storage=new RetainedMessageHandler();
 StartServer(options);

Console.WriteLine("Press any key to exit.");
  Console.ReadLine();

//await mqttServer.StopAsync();
 }

public static async void StartServer(IMqttServerOptions options)
  {
 await mqttServer.StartAsync(options);
  }

 }

// The implementation of the storage:
 // This code uses the JSON library "Newtonsoft.Json".
public class RetainedMessageHandler : IMqttServerStorage
  {
 private const string Filename = "C:\\MQTT\\RetainedMessages.json";

public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
  {
  File.WriteAllText(Filename, JsonConvert.SerializeObject(messages));
 return Task.FromResult();
  }

public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
  {
 IList<MqttApplicationMessage> retainedMessages;
 if (File.Exists(Filename))
  {
 var json = File.ReadAllText(Filename);
 retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
  }
 else
 {
 retainedMessages = new List<MqttApplicationMessage>();
  }

return Task.FromResult(retainedMessages);
  }
  }

}

代码直接运行起来,就是一个简单的Mqtt server。

5 创建xamarin APP

一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:

(1)发布其他客户端可能会订阅的信息;

(2)订阅其它客户端发布的消息;

(3)退订或删除应用程序的消息;

(4)断开与服务器连接。

在VS中新建一个xamarin.Forms的移动应用,创建好后在Nuget上搜索mqttnet,添加对MQTTnet包的引用。更改代码如下:

 <?xml version="1.0" encoding="utf-8" ?>
<ContentPage xmlns="http://xamarin.com/schemas/2014/forms"
xmlns:x="http://schemas.microsoft.com/winfx/2009/xaml"
xmlns:local="clr-namespace:CatShell"
x:Class="CatShell.MainPage">
<StackLayout>
<!-- Place new controls here -->
<Label Text="SubscribeTopic"/>
<Entry x:Name="txtSubTopic" Placeholder="Subscribe Topic" />
<Button Text="BtnSubscribe" Clicked="SubButton_Clicked"/>
<Entry x:Name="txtReceiveMessage"/>
<Label Text="PublishTopic"/>
<Entry x:Name="txtPubTopic"/>
<Entry x:Name="txtSendMessage" />
<Button Text="Publish" Clicked="PubButton_Clicked"/>
<Editor>
</Editor>
</StackLayout>
</ContentPage>
 using MQTTnet;
 using MQTTnet.Client;
 using MQTTnet.Client.Options;
 using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 using Xamarin.Forms;

namespace CatShell
 {
 public partial class MainPage : ContentPage
  {
 public IMqttClient mqttClient;
 public IMqttClientOptions options;
 public MainPage()
  {
  InitializeComponent();
  InitMqttClient();
  ConnectMqttServer();
  }

public void InitMqttClient()
  {
 // Create a new MQTT client.
var factory = new MqttFactory();
 mqttClient = factory.CreateMqttClient();

mqttClient.UseConnectedHandler(e => {

Device.BeginInvokeOnMainThread(() =>
 {
 txtReceiveMessage.Text = txtReceiveMessage.Text + $">> connect success." + Environment.NewLine;
  });
  });
 mqttClient.UseDisconnectedHandler(e =>
 {
 Device.BeginInvokeOnMainThread(() =>
 {
 txtReceiveMessage.Text = txtReceiveMessage.Text + $">> Disconnect." + Environment.NewLine;
  });
  });
 mqttClient.UseApplicationMessageReceivedHandler(e =>
 {
 Device.BeginInvokeOnMainThread(() =>
 {
 txtReceiveMessage.Text = $">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}" + Environment.NewLine;
  });
  });

// Create TCP based options using the builder.
options = new MqttClientOptionsBuilder()
 .WithClientId("Client4")
 .WithTcpServer("10.100.1.247", ) // Use TCP connection, Port is opptinal
 //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection.
 //.WithCredentials("bud", "%spencer%")
 //.WithTls()
 //.WithTls(new MqttClientOptionsBuilderTlsParameters
 //{
 // UseTls = true,
 // CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) =>
 // {
 // // TODO: Check conditions of certificate by using above parameters.
 // return true;
 // }
 //})
 .WithCleanSession()
  .Build();

 }

public async void ConnectMqttServer()
  {
 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
 }

private async void SubButton_Clicked(object sender, EventArgs e)
  {

string topic = txtSubTopic.Text.Trim();

if (string.IsNullOrEmpty(topic))
  {
 //MessageBox.Show("订阅主题不能为空!");
return;
  }

if (!mqttClient.IsConnected)
  {
 //MessageBox.Show("MQTT客户端尚未连接!");
return;
  }

// Subscribe to a topic
await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());

txtReceiveMessage.Text = txtReceiveMessage.Text + $"已订阅[{topic}]主题" + Environment.NewLine;
 txtSubTopic.IsReadOnly = false;
 //BtnSubscribe.Enabled = false;
 }

private async void PubButton_Clicked(object sender, EventArgs e)
  {
 string topic = txtPubTopic.Text.Trim();

if (string.IsNullOrEmpty(topic))
  {
 //MessageBox.Show("发布主题不能为空!");
return;
  }

string inputString = txtSendMessage.Text.Trim();

 PublishMessages(topic, inputString);
  }

public async void PublishMessages(string topicMsg, string payloadMsg)
  {
 var message = new MqttApplicationMessageBuilder()
  .WithTopic(topicMsg)
  .WithPayload(payloadMsg)
  .WithExactlyOnceQoS()
  .WithRetainFlag()
  .Build();

await mqttClient.PublishAsync(message);
  }

 }
 }

代码运行起来,在APP上可以直接发信息。

6 创建winForm client(可选)

可以创建一个winForm来相互互动,在VS上新建一个windows窗体应用(.NET Framework),界面设计如下

后台代码如下:

 using MQTTnet;
 using MQTTnet.Client.Options;
 using MQTTnet.Client;
 using System;
 using System.Collections.Generic;
 using System.ComponentModel;
 using System.Data;
 using System.Drawing;
 using System.Linq;
 using System.Net.Security;
 using System.Security.Cryptography.X509Certificates;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Windows.Forms;

namespace MqttClientWin
 {
 public partial class Form1 : Form
  {
 public IMqttClient mqttClient;
 public IMqttClientOptions options;
 public Form1()
  {
  InitializeComponent();
  InitMqttClient();
  ConnectMqttServer();
  }

public void InitMqttClient()
  {
 // Create a new MQTT client.
var factory = new MqttFactory();
 mqttClient = factory.CreateMqttClient();

mqttClient.UseApplicationMessageReceivedHandler(e =>
 {
 Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
 Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
 Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
 Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
 Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
  Console.WriteLine();

this.Invoke(new Action(() =>
 {
 txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
  }));

//Task.Run(() => mqttClient.PublishAsync("hello/world"));
 });

mqttClient.UseConnectedHandler(async e =>
 {
 Console.WriteLine("### CONNECTED WITH SERVER ###");

//// Subscribe to a topic
//await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());

//Console.WriteLine("### SUBSCRIBED ###");
this.Invoke(new Action(() =>
 {
 txtReceiveMessage.AppendText($">> connect success.{Environment.NewLine}");
  }));
  });

mqttClient.UseDisconnectedHandler(e =>
 {
 this.Invoke(new Action(() =>
 {
 txtReceiveMessage.AppendText($">> Disconnect .{Environment.NewLine}");
  }));
  });

// Create TCP based options using the builder.
options = new MqttClientOptionsBuilder()
 .WithClientId("Client5")
 .WithTcpServer("10.100.1.247", ) // Use TCP connection, Port is opptinal
 //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection.
 //.WithCredentials("bud", "%spencer%")
 //.WithTls()
 //.WithTls(new MqttClientOptionsBuilderTlsParameters
 //{
 // UseTls = true,
 // CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) =>
 // {
 // // TODO: Check conditions of certificate by using above parameters.
 // return true;
 // }
 //})
 .WithCleanSession()
  .Build();

 }

public async void ConnectMqttServer()
  {
 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
 }

public void ReconnectMqttServer()
  {
 mqttClient.UseDisconnectedHandler(async e =>
 {
 Console.WriteLine("### DISCONNECTED FROM SERVER ###");
 await Task.Delay(TimeSpan.FromSeconds());

try
 {
 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
 }
 catch
 {
 Console.WriteLine("### RECONNECTING FAILED ###");
  }
  });
  }

public async void PublishMessages(string topicMsg,string payloadMsg)
  {
 var message = new MqttApplicationMessageBuilder()
  .WithTopic(topicMsg)
  .WithPayload(payloadMsg)
  .WithExactlyOnceQoS()
  .WithRetainFlag()
  .Build();

await mqttClient.PublishAsync(message);
  }

private async void BtnSubscribe_Click(object sender, EventArgs e)
  {
 string topic = txtSubTopic.Text.Trim();

if (string.IsNullOrEmpty(topic))
  {
 MessageBox.Show("订阅主题不能为空!");
 return;
  }

if (!mqttClient.IsConnected)
  {
 MessageBox.Show("MQTT客户端尚未连接!");
 return;
  }

// Subscribe to a topic
await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());

txtReceiveMessage.AppendText($"已订阅[{topic}]主题" + Environment.NewLine);
 txtSubTopic.Enabled = false;
 BtnSubscribe.Enabled = false;
  }

private void BtnPublish_Click(object sender, EventArgs e)
  {
 string topic = txtPubTopic.Text.Trim();

if (string.IsNullOrEmpty(topic))
  {
 MessageBox.Show("发布主题不能为空!");
 return;
  }

string inputString = txtSendMessage.Text.Trim();

 PublishMessages(topic, inputString);

 }
  }
 }

7 MQTT协议中的订阅、主题、会话

一、订阅(Subscription)

订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

二、会话(Session)

每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

三、主题名(Topic Name)

连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

四、主题筛选器(Topic Filter)

一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

五、负载(Payload)

消息订阅者所具体接收的内容。

8 MQTT协议中的方法

MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

(1)Connect。等待与服务器建立连接。

(2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。

(3)Subscribe。等待完成订阅。

(4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。

(5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。

版权声明
本文为[醉梦ai天涯]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/zuimengaitianya/p/11707490.html