快速搭建MQTT服务器(MQTTnet和Apache Apollo)

摘要:
前言MQTT协议是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分,http://mqtt.org/。MQTTnetisa.NETlibraryforMQTTbasedcommunication.ItprovidesaMQTTclientandaMQTTserver.Theimplementationisbasedonthedocumentationfromhttp://mqtt.org/.通过Nuget搜索MQTT找到了MQTTnet,它不是下载量最多的,也不在官方推荐列表中,主要是因为同时支持客户端和服务端,所以开始下载试用,结果证明有坑,源码在vs2015中不能打开,客户端示例接收不到消息。开源地址:https://github.com/chkr1011/MQTTnet首先把官方的控制台程序改成winform的,界面如下:publicpartialclassForm1:Form{privateMqttServermqttServer=null;privateMqttClientmqttClient=null;publicForm1(){InitializeComponent();}privatevoidbutton_启动服务端_Click{MqttTrace.TraceMessagePublished+=MqttTrace_TraceMessagePublished;if{try{varoptions=newMqttServerOptions{ConnectionValidator=p=>{if{if(p.Username!="PASS"){returnMqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;}}returnMqttConnectReturnCode.ConnectionAccepted;}};mqttServer=newMqttServerFactory().CreateMqttServer;}catch{MessageBox.Show;return;}}mqttServer.Start();this.txt_服务器.AppendText;}privatevoidMqttTrace_TraceMessagePublished{this.Invoke(newAction(()=>{this.txt_服务器.AppendText($">>[{e.ThreadId}][{e.Source}][{e.Level}]:{e.Message}"+Environment.NewLine);if(e.Exception!
前言

MQTT协议是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分,http://mqtt.org/。

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. For example, it has been used in sensors communicating to a broker via satellite link, over occasional dial-up connections with healthcare providers, and in a range of home automation and small device scenarios. It is also ideal for mobile applications because of its small size, low power usage, minimised data packets, and efficient distribution of information to one or many receivers

通过https://github.com/mqtt/mqtt.github.io/wiki/servers 找到官方推荐的服务端软件,比如:Apache Apollo,

通过https://github.com/mqtt/mqtt.github.io/wiki/libraries可以找到推荐的客户端类库,比如:Eclipse Paho Java

MQTTnet

MQTTnet 是MQTT协议的.NET开源类库。

MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server. The implementation is based on the documentation fromhttp://mqtt.org/.

通过Nuget搜索MQTT找到了MQTTnet,它不是下载量最多的,也不在官方推荐列表中,主要是因为同时支持客户端和服务端,所以开始下载试用,结果证明有坑,源码在vs2015中不能打开,客户端示例接收不到消息。

开源地址:https://github.com/chkr1011/MQTTnet

首先把官方的控制台程序改成winform的,界面如下:

快速搭建MQTT服务器(MQTTnet和Apache Apollo)第1张

快速搭建MQTT服务器(MQTTnet和Apache Apollo)第2张
复制代码
 public partial class Form1 : Form
    {
        private MqttServer mqttServer = null;
        private MqttClient mqttClient = null;
        public Form1()
        {
            InitializeComponent();
        }
        private void button_启动服务端_Click(object sender, EventArgs e)
        {
            MqttTrace.TraceMessagePublished += MqttTrace_TraceMessagePublished;
            if (this.mqttServer == null)
            {
                try
                {
                    var options = new MqttServerOptions
                    {
                        ConnectionValidator = p =>
                        {
                            if (p.ClientId == "SpecialClient")
                            {
                                if (p.Username != "USER" || p.Password != "PASS")
                                {
                                    return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
                                }
                            }
                            return MqttConnectReturnCode.ConnectionAccepted;
                        }
                    };
                    mqttServer = new MqttServerFactory().CreateMqttServer(options);
                }
                catch (Exception ex)
                {
                    MessageBox.Show(ex.Message);
                    return;
                }
            }
            mqttServer.Start();
            this.txt_服务器.AppendText( $">> 启动成功..." + Environment.NewLine);
        }
        private void MqttTrace_TraceMessagePublished(object sender, MqttTraceMessagePublishedEventArgs e)
        {
            this.Invoke(new Action(() =>
            {
                this.txt_服务器.AppendText($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}" + Environment.NewLine);
                if (e.Exception != null)
                {
                    this.txt_服务器.AppendText( e.Exception + Environment.NewLine);
                }
            }));
        }
        private void button_停止服务端_Click(object sender, EventArgs e)
        {
            if (mqttServer != null)
            {
                mqttServer.Stop();
            }
            this.txt_服务器.AppendText( $">> 停止成功" + Environment.NewLine);
        }
        private async void button_启动客户端_Click(object sender, EventArgs e)
        {
            if (this.mqttClient == null)
            {
                var options = new MqttClientOptions
                {
                    Server = "192.168.2.54",
                    ClientId = "zbl",
                    CleanSession = true
                };
                this.mqttClient = new MqttClientFactory().CreateMqttClient(options);
                this.mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
                this.mqttClient.Connected += MqttClient_Connected;
                this.mqttClient.Disconnected += MqttClient_Disconnected;
            }
            try
            {
                await this.mqttClient.ConnectAsync();
            }
            catch(Exception ex)
            {
                this.txt_客户端.AppendText( $"### CONNECTING FAILED ###" + Environment.NewLine);
            }
        }
        private void MqttClient_Connected(object sender, EventArgs e)
        {
            this.Invoke(new Action( async () =>
            {
                this.txt_客户端.AppendText( $"### CONNECTED WITH SERVER ###" + Environment.NewLine);
                await this.mqttClient.SubscribeAsync(new List<TopicFilter>{
                        new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
                    });
                this.txt_客户端.AppendText($"### SUBSCRIBED  ###" + Environment.NewLine);
            }));
        }
        private void MqttClient_Disconnected(object sender, EventArgs e)
        {
            this.Invoke(new Action(() =>
            {
                this.txt_客户端.AppendText( $"### DISCONNECTED FROM SERVER ###" + Environment.NewLine);
            }));
        }
        private void MqttClient_ApplicationMessageReceived(object sender, MQTTnet.Core.MqttApplicationMessageReceivedEventArgs e)
        {
            this.Invoke(new Action(() =>
            {
                this.txt_客户端.AppendText( $">> Topic:{e.ApplicationMessage.Topic} Payload:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} QoS:{e.ApplicationMessage.QualityOfServiceLevel}   Retain:{e.ApplicationMessage.Retain}" + Environment.NewLine);
            }));
        }
        private async void button_停止客户端_Click(object sender, EventArgs e)
        {
            if (this.mqttClient != null)
            {
                await this.mqttClient.DisconnectAsync();
            }
            this.txt_客户端.AppendText( $">> 停止成功" + Environment.NewLine);
        }
        private async void button_发送_Click(object sender, EventArgs e)
        {
            //var options = new MqttClientOptions
            //{
            //    Server = "localhost"
            //};
            //var client = new MqttClientFactory().CreateMqttClient(options);
            //await client.ConnectAsync();
            var applicationMessage = new MqttApplicationMessage(this.txt_topic.Text,
                Encoding.UTF8.GetBytes(this.txt_message.Text), MqttQualityOfServiceLevel.AtMostOnce, false);
            await this.mqttClient.PublishAsync(applicationMessage);
        }
        private void button_清空服务端_Click(object sender, EventArgs e)
        {
            this.txt_服务器.Text = "";
        }
    }
复制代码

需要注意的是按照作者说明是

1
2
3
4
5
6
7
8
9
10
11
12
13
while(true)
{
Console.ReadLine();
varapplicationMessage =newMqttApplicationMessage(
"A/B/C",
Encoding.UTF8.GetBytes("Hello World"),
MqttQualityOfServiceLevel.AtLeastOnce,
false
);
await client.PublishAsync(applicationMessage);
}

客户端死活都收不到消息,改成MqttQualityOfServiceLevel.AtMostOnce 就可以了,找问题时尝试下载源码调试因为vs2015打不开项目也折腾了一会。这个问题提交了issues,期待作者回复。

Apache Apollo

1.下载Apollo服务器,我这里用的是Binaries for Windows。下载后解压到一个文件夹,注意路径不要包含中文,安装手册

2.创建Broker Instance,命令行cd到bin目录,执行/bin/apollo create mybroker,执行后就会在bin目录下创建mybroker文件夹。

3.运行Broker Instance,命令行cd到mybroker/bin目录,执行mybroker/bin/apollo-broker.cmd run

4.Web Administrator,地址http://127.0.0.1:61680/orhttps://127.0.0.1:61681/,默认账号 admin,密码 password

快速搭建MQTT服务器(MQTTnet和Apache Apollo)第5张

参考

搭建了MQTT服务端之后需要在Esp8266模块和手机App中分别实现客户端功能,稍后待续。。。。

分类:IoT

免责声明:文章转载自《快速搭建MQTT服务器(MQTTnet和Apache Apollo)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Java 计时器HBase里配置SNAPPY压缩以后regionserver启动不了的问题下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

MQTT服务器的搭建(Windows平台)

人工智能、智能家居越来越火,在服务器和多个终端进行通信的过程中使用传统的请求/回答(Request/Response)模式已经过时,伴随而来的是发布/订阅(Publish/Subscribe)模式------MQTT(Message Queuing Telemetry Transport)协议。由于本文是讲解如何在windows平台上搭建MQTT服务,所以...

如何使用 MQTT 报文实现发布订阅功能

MQTT 协议通过交换预定义的MQTT控制报文来通信。下面以 MQTTX 为例,展示如何通过 MQTT 报文实现发布订阅功能。 Connect 连接 MQTT 协议基于 TCP/IP 协议,MQTT Broker 和 Client 都有需要有 TCP/IP 地址。 Broker 如果你暂时没有一个可用的 MQTT Broker,EMQ X 提供了一个公共...

MQTT是IBM开发的一个即时通讯协议,构建于TCP/IP协议上,是物联网IoT的订阅协议,借助消息推送功能,可以更好地实现远程控制

最近一直做物联网方面的开发,以下内容关于使用MQTT过程中遇到问题的记录以及需要掌握的机制原理,主要讲解理论。 背景 MQTT是IBM开发的一个即时通讯协议。MQTT构建于TCP/IP协议上,面向M2M和物联网IoT的连接协议,采用轻量级发布和订阅消息传输机制。Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,提供轻量级的,支...

php windows环境 安装 Apache-apollo + phpMQTT 实现发送 MQTT

首先安装Apache-apollo 原文链接:http://blog.csdn.net/marrn/article/details/71141122?utm_source=itdadao&utm_medium=referral 1.首先从http://activemq.apache.org/apollo/download.html官网上下载wind...

MQTT-Client-FrameWork使用整理

作者: wbl MQTT MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和制动器(比如通过Twitter让房屋联网)的通信协议 MQTT特点 MQTT协议是为大量计算...

ESA2GJK1DH1K升级篇: 阿里云物联网平台 OTA: 关于阿里云物联网平台 OTA 的升级流程说明

前言   鉴于有些用户直接想使用现成的物联网平台实现 OTA 远程升级   我就写一写这系列的文章   首先大家需要学习完这部分   https://www.cnblogs.com/yangfengwu/p/11828777.html    现在说一下具体流程 新增固件   一,选择物联网平台里面的          固件名称: 随意     所属产品:...