Popular blog tags

Mqttnet: - production-ready third-party class library

Published

create third liabray use Mqttnet5.x, public partial class MqttClientService : IMqttClientService,which publish ,recieve topic,display connected event,ect used by web app or blazor server

Folder Structure (Class Library)

YourProject.Mqtt/
│
├── IMqttClientService.cs
├── MqttClientService.cs     (partial class)
├── MqttClientService.Events.cs
└── MqttClientService.Options.cs

 

IMqttClientService.cs

using MQTTnet;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Nop.Data
{
    public partial interface IMqttClientService
    {
        Task PublishAsync(string topic, string payload);
        Task SubscribeAsync(string topic);


        Task StartAsync(CancellationToken cancellationToken = default);
        Task StopAsync(CancellationToken cancellationToken = default);

    }
}

 

MqttClientService.cs

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;



namespace Nop.Data
{
    public partial class MqttClientService : BackgroundService, IMqttClientService
    {
        #region Fields
        private readonly IConfiguration _configuration;
        private readonly ILogger<MqttClientService> _logger;
        private readonly IMqttClient _mqttClient;
        private readonly MqttClientOptions _mqttClientOptions;
        private Dictionary<string, Func<string, Task>> _registrations = new();
        #endregion

        public MqttClientService(IConfiguration configuration,
            ILogger<MqttClientService> logger)
        {
            this._configuration = configuration;
            this._logger = logger;


            // Build MQTT client
            var factory = new MqttClientFactory();
             _mqttClient = factory.CreateMqttClient();


            // 配置 MQTT 客户端选项
            _mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer("192.168.2.125", 1883).Build(); // 指定 MQTT 服务器地址和端口

           

            // 连接到 MQTT 服务器
            //var response =  _mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

            Console.WriteLine($"MQTT Received:");

            _mqttClient.ConnectedAsync += async e =>
            {
                _logger.LogInformation($"{DateTime.Now} Connected to mqtt broker.");
            };

            _mqttClient.ApplicationMessageReceivedAsync += e =>
            {
                Console.WriteLine($"MQTT Received: {e.ApplicationMessage.Topic} -> {e.ApplicationMessage.ConvertPayloadToString()}");
                return Task.CompletedTask;
            };

            _mqttClient.DisconnectedAsync += async e =>
            {
                _logger.LogInformation($"Disconnected from MQTT Broker. Reason: {e.Reason}");
                _logger.LogInformation($"Disconnected from MQTT Broker. ReasonString: {e.ReasonString}");

                if (e.Reason == MqttClientDisconnectReason.KeepAliveTimeout ||
                    e.Reason == MqttClientDisconnectReason.ServerBusy)
                {
                    await Task.Delay(TimeSpan.FromSeconds(5));
                    try
                    {
                        await _mqttClient.ConnectAsync(_mqttClientOptions);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError($"Reconnect failed: {ex.Message}");
                    }
                }
                else
                {
                    _logger.LogWarning($"Unexpected disconnection reason: {e.Reason}. No reconnection attempt.");
                }
            };


            // 连接到 MQTT 服务器
           // var response  =  _mqttClient.ConnectAsync(_mqttClientOptions);

            // The response contains additional data sent by the server after subscribing.
            // response.DumpToConsole();

            Connect().Wait();

        }


        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var options = new MqttClientOptionsBuilder()
                .WithTcpServer("192.168.2.125", 1883)
                .WithClientId("aspnet_mvc")
                .Build();

            _mqttClient.ApplicationMessageReceivedAsync += args =>
            {
                _logger.LogInformation($"Received: {args.ApplicationMessage.Topic}");
                return Task.CompletedTask;
            };

            // Auto reconnect loop
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    if (!_mqttClient.IsConnected)
                    {
                        await _mqttClient.ConnectAsync(options, stoppingToken);
                        _logger.LogInformation($"{DateTime.Now}BackgroundService ->Auto reconnect loop:MQTT connected.");
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "MQTT connection failed.");
                }

                await Task.Delay(5000, stoppingToken); // reconnect interval
            }
        }


        public async Task StartAsync(string broker, int port, string topic)
        {
           


            _mqttClient.ApplicationMessageReceivedAsync += async e =>
            {
                this._logger.LogInformation($"Message Received{ e.ApplicationMessage.Topic }");
                //if (e.ApplicationMessage.Topic == $"homeassistant/switch/{uniqueId}/set")
                //{
                //    string command = e.ApplicationMessage.ConvertPayloadToString();
                //    Console.WriteLine($"Received command from HA: {command}");

                //    //     _logger.LogInformation("{0} Received command from HA: {1} successful.", DateTime.Now.ToString(), command);

                //    if (command == "ON")
                //    {
                //        // Your device logic to turn on goes here
                //        await PublishStateAsync(mqttClient, "ON");
                //    }
                //    else if (command == "OFF")
                //    {
                //        // Your device logic to turn off goes here
                //        await PublishStateAsync(mqttClient, "OFF");
                //    }
                //}
            };



            _mqttClient.ApplicationMessageReceivedAsync += async e =>
            {
                var topic = e.ApplicationMessage.Topic;
                var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);

                // 根据 QoS 级别可定制 ACK 行为
                if (e.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
                {
                    // 手动确认
                  //  await e.AcknowledgeAsync(CancellationToken.None);
                }

                Console.WriteLine($"Received on {topic}: {payload}");
                Console.WriteLine($"Received message: {e.ApplicationMessage.Payload}");
                this._logger.LogInformation($"Message Received topic:{topic}: payload{payload}");

            };
        }





        private async Task ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            if (_registrations.ContainsKey(arg.ApplicationMessage.Topic))
            {
                await _registrations[arg.ApplicationMessage.Topic].Invoke(arg.ApplicationMessage.ConvertPayloadToString());
            }
        }


        private async Task Connect()
        {
            try
            {
                await _mqttClient.ConnectAsync(_mqttClientOptions);
                _logger.LogInformation("MQTT connected.");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "MQTT connect failed");
            }
        }


        public async Task PublishAsync(string topic, string payload)
        {
            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                .WithRetainFlag(true)
                .Build();

            await _mqttClient.PublishAsync(message);
        }


        public async Task PublishStateAsync( string state)
        {
            var uniqueId = "my_csharp_switch";

            var msg = new MqttApplicationMessageBuilder()
                .WithTopic($"homeassistant/switch/{uniqueId}/state")
                .WithPayload(state)
                .WithRetainFlag(true)
                .Build();

            await _mqttClient.PublishAsync(msg);
        }


        public async Task SubscribeAsync(string topic)
        {
            var uniqueId = "my_csharp_switch";

            await _mqttClient.SubscribeAsync($"homeassistant/switch/{uniqueId}/set");

            _logger.LogInformation("{0} SubscribeAsync command from HA: {1} .", DateTime.Now.ToString(), "homeassistant/switch/{uniqueId}/command");

            _mqttClient.ApplicationMessageReceivedAsync += async e =>
            {
                if (e.ApplicationMessage.Topic == $"homeassistant/switch/{uniqueId}/set")
                {
                    string command = e.ApplicationMessage.ConvertPayloadToString();
                    Console.WriteLine($"Received command from HA: {command}");

                        _logger.LogInformation("{0} Received command from HA: {1} successful.", DateTime.Now.ToString(), command);

                    if (command == "ON")
                    {
                        // Your device logic to turn on goes here
                        await PublishStateAsync( "ON");
                    }
                    else if (command == "OFF")
                    {
                        // Your device logic to turn off goes here
                        await PublishStateAsync( "OFF");
                    }
                }
            };





        }



        private static Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            Console.WriteLine("Connected to EMQX.");
            return Task.CompletedTask;
        }

        private Task ClientConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            Console.WriteLine("MqttBrokerService Client connected: ");

            return Task.CompletedTask;
        }

        private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            string payload = e.ApplicationMessage.ConvertPayloadToString();

            _logger.LogInformation($"MQTT RECEIVED: {e.ApplicationMessage.Topic} -> {payload}");

            return Task.CompletedTask;
        }


        #region
        public async Task StartAsync(CancellationToken cancellationToken = default)
        {
          

            _logger.LogInformation("Connecting to MQTT broker...");
            await _mqttClient.ConnectAsync(_mqttClientOptions, cancellationToken);
        }

        public async Task StopAsync(CancellationToken cancellationToken = default)
        {
            _logger.LogInformation("Disconnecting MQTT client...");
            await _mqttClient.DisconnectAsync(cancellationToken: cancellationToken);
        }
        #endregion

    }
}

 

Register in Program.cs (MVC / WebAPI / Blazor Server)

builder.Services.AddSingleton<IMqttClientService, MqttClientService>();
builder.Services.AddHostedService<MqttHostedService>();

MqttHostedService

public class MqttHostedService : IHostedService
{
    private readonly IMqttClientService _mqtt;

    public MqttHostedService(IMqttClientService mqtt)
    {
        _mqtt = mqtt;
    }

    public Task StartAsync(CancellationToken cancellationToken)
        => _mqtt.StartAsync(cancellationToken);

    public Task StopAsync(CancellationToken cancellationToken)
        => _mqtt.StopAsync(cancellationToken);
}

 

Use in MVC Controller

public class HomeController : Controller
{
    private readonly IMqttClientService _mqtt;

    public HomeController(IMqttClientService mqtt)
    {
        _mqtt = mqtt;
    }

    public async Task<IActionResult> Publish()
    {
        await _mqtt.PublishAsync("web/test", "Hello MVC!");
        return Content("Sent!");
    }
}

 

 

useful links

https://github.com/rafiulgits/mqtt-client-dotnet-core/blob/master/Extensions/ServiceCollectionExtension.cs#L12