create third liabray use Mqttnet5.x, public partial class MqttClientService : IMqttClientService,which publish ,recieve topic,display connected event,ect used by web app or blazor server
Table of Contents
Folder Structure (Class Library)
YourProject.Mqtt/
│
├── IMqttClientService.cs
├── MqttClientService.cs (partial class)
├── MqttClientService.Events.cs
└── MqttClientService.Options.cs
IMqttClientService.cs
using MQTTnet;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Nop.Data
{
public partial interface IMqttClientService
{
Task PublishAsync(string topic, string payload);
Task SubscribeAsync(string topic);
Task StartAsync(CancellationToken cancellationToken = default);
Task StopAsync(CancellationToken cancellationToken = default);
}
}
MqttClientService.cs
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace Nop.Data
{
public partial class MqttClientService : BackgroundService, IMqttClientService
{
#region Fields
private readonly IConfiguration _configuration;
private readonly ILogger<MqttClientService> _logger;
private readonly IMqttClient _mqttClient;
private readonly MqttClientOptions _mqttClientOptions;
private Dictionary<string, Func<string, Task>> _registrations = new();
#endregion
public MqttClientService(IConfiguration configuration,
ILogger<MqttClientService> logger)
{
this._configuration = configuration;
this._logger = logger;
// Build MQTT client
var factory = new MqttClientFactory();
_mqttClient = factory.CreateMqttClient();
// 配置 MQTT 客户端选项
_mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer("192.168.2.125", 1883).Build(); // 指定 MQTT 服务器地址和端口
// 连接到 MQTT 服务器
//var response = _mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
Console.WriteLine($"MQTT Received:");
_mqttClient.ConnectedAsync += async e =>
{
_logger.LogInformation($"{DateTime.Now} Connected to mqtt broker.");
};
_mqttClient.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"MQTT Received: {e.ApplicationMessage.Topic} -> {e.ApplicationMessage.ConvertPayloadToString()}");
return Task.CompletedTask;
};
_mqttClient.DisconnectedAsync += async e =>
{
_logger.LogInformation($"Disconnected from MQTT Broker. Reason: {e.Reason}");
_logger.LogInformation($"Disconnected from MQTT Broker. ReasonString: {e.ReasonString}");
if (e.Reason == MqttClientDisconnectReason.KeepAliveTimeout ||
e.Reason == MqttClientDisconnectReason.ServerBusy)
{
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await _mqttClient.ConnectAsync(_mqttClientOptions);
}
catch (Exception ex)
{
_logger.LogError($"Reconnect failed: {ex.Message}");
}
}
else
{
_logger.LogWarning($"Unexpected disconnection reason: {e.Reason}. No reconnection attempt.");
}
};
// 连接到 MQTT 服务器
// var response = _mqttClient.ConnectAsync(_mqttClientOptions);
// The response contains additional data sent by the server after subscribing.
// response.DumpToConsole();
Connect().Wait();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var options = new MqttClientOptionsBuilder()
.WithTcpServer("192.168.2.125", 1883)
.WithClientId("aspnet_mvc")
.Build();
_mqttClient.ApplicationMessageReceivedAsync += args =>
{
_logger.LogInformation($"Received: {args.ApplicationMessage.Topic}");
return Task.CompletedTask;
};
// Auto reconnect loop
while (!stoppingToken.IsCancellationRequested)
{
try
{
if (!_mqttClient.IsConnected)
{
await _mqttClient.ConnectAsync(options, stoppingToken);
_logger.LogInformation($"{DateTime.Now}BackgroundService ->Auto reconnect loop:MQTT connected.");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT connection failed.");
}
await Task.Delay(5000, stoppingToken); // reconnect interval
}
}
public async Task StartAsync(string broker, int port, string topic)
{
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
this._logger.LogInformation($"Message Received{ e.ApplicationMessage.Topic }");
//if (e.ApplicationMessage.Topic == $"homeassistant/switch/{uniqueId}/set")
//{
// string command = e.ApplicationMessage.ConvertPayloadToString();
// Console.WriteLine($"Received command from HA: {command}");
// // _logger.LogInformation("{0} Received command from HA: {1} successful.", DateTime.Now.ToString(), command);
// if (command == "ON")
// {
// // Your device logic to turn on goes here
// await PublishStateAsync(mqttClient, "ON");
// }
// else if (command == "OFF")
// {
// // Your device logic to turn off goes here
// await PublishStateAsync(mqttClient, "OFF");
// }
//}
};
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
// 根据 QoS 级别可定制 ACK 行为
if (e.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
// 手动确认
// await e.AcknowledgeAsync(CancellationToken.None);
}
Console.WriteLine($"Received on {topic}: {payload}");
Console.WriteLine($"Received message: {e.ApplicationMessage.Payload}");
this._logger.LogInformation($"Message Received topic:{topic}: payload{payload}");
};
}
private async Task ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
if (_registrations.ContainsKey(arg.ApplicationMessage.Topic))
{
await _registrations[arg.ApplicationMessage.Topic].Invoke(arg.ApplicationMessage.ConvertPayloadToString());
}
}
private async Task Connect()
{
try
{
await _mqttClient.ConnectAsync(_mqttClientOptions);
_logger.LogInformation("MQTT connected.");
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT connect failed");
}
}
public async Task PublishAsync(string topic, string payload)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(true)
.Build();
await _mqttClient.PublishAsync(message);
}
public async Task PublishStateAsync( string state)
{
var uniqueId = "my_csharp_switch";
var msg = new MqttApplicationMessageBuilder()
.WithTopic($"homeassistant/switch/{uniqueId}/state")
.WithPayload(state)
.WithRetainFlag(true)
.Build();
await _mqttClient.PublishAsync(msg);
}
public async Task SubscribeAsync(string topic)
{
var uniqueId = "my_csharp_switch";
await _mqttClient.SubscribeAsync($"homeassistant/switch/{uniqueId}/set");
_logger.LogInformation("{0} SubscribeAsync command from HA: {1} .", DateTime.Now.ToString(), "homeassistant/switch/{uniqueId}/command");
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
if (e.ApplicationMessage.Topic == $"homeassistant/switch/{uniqueId}/set")
{
string command = e.ApplicationMessage.ConvertPayloadToString();
Console.WriteLine($"Received command from HA: {command}");
_logger.LogInformation("{0} Received command from HA: {1} successful.", DateTime.Now.ToString(), command);
if (command == "ON")
{
// Your device logic to turn on goes here
await PublishStateAsync( "ON");
}
else if (command == "OFF")
{
// Your device logic to turn off goes here
await PublishStateAsync( "OFF");
}
}
};
}
private static Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
Console.WriteLine("Connected to EMQX.");
return Task.CompletedTask;
}
private Task ClientConnectedAsync(MqttClientConnectedEventArgs arg)
{
Console.WriteLine("MqttBrokerService Client connected: ");
return Task.CompletedTask;
}
private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
string payload = e.ApplicationMessage.ConvertPayloadToString();
_logger.LogInformation($"MQTT RECEIVED: {e.ApplicationMessage.Topic} -> {payload}");
return Task.CompletedTask;
}
#region
public async Task StartAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("Connecting to MQTT broker...");
await _mqttClient.ConnectAsync(_mqttClientOptions, cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("Disconnecting MQTT client...");
await _mqttClient.DisconnectAsync(cancellationToken: cancellationToken);
}
#endregion
}
}
Register in Program.cs (MVC / WebAPI / Blazor Server)
builder.Services.AddSingleton<IMqttClientService, MqttClientService>();
builder.Services.AddHostedService<MqttHostedService>();
MqttHostedService
public class MqttHostedService : IHostedService
{
private readonly IMqttClientService _mqtt;
public MqttHostedService(IMqttClientService mqtt)
{
_mqtt = mqtt;
}
public Task StartAsync(CancellationToken cancellationToken)
=> _mqtt.StartAsync(cancellationToken);
public Task StopAsync(CancellationToken cancellationToken)
=> _mqtt.StopAsync(cancellationToken);
}
Use in MVC Controller
public class HomeController : Controller
{
private readonly IMqttClientService _mqtt;
public HomeController(IMqttClientService mqtt)
{
_mqtt = mqtt;
}
public async Task<IActionResult> Publish()
{
await _mqtt.PublishAsync("web/test", "Hello MVC!");
return Content("Sent!");
}
}
useful links
https://github.com/rafiulgits/mqtt-client-dotnet-core/blob/master/Extensions/ServiceCollectionExtension.cs#L12
