using System.Text; using Microsoft.Extensions.Options; using MQTTnet; namespace ProofOfConcept.Services; public class MQTTClient : IHostedService { private readonly ILogger logger; private readonly MQTTClientConfiguration configuration; private readonly MQTTServerConfiguration serverConfiguration; private readonly IMqttClient client; public MQTTClient(ILogger logger, IOptions options, IOptions serverOptions, IMessageProcessor messageProcessor) { this.logger = logger; this.configuration = options.Value; this.serverConfiguration = serverOptions.Value; client = new MqttClientFactory().CreateMqttClient(); this.client.ApplicationMessageReceivedAsync += (e) => { string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); logger.LogInformation("Message received: {Message}", message); messageProcessor.ProcessMessage(message); return Task.CompletedTask; }; } public async Task StartAsync(CancellationToken cancellationToken) { this.logger.LogTrace("Stating..."); MqttClientOptions options = new MqttClientOptionsBuilder() .WithTcpServer("localhost", this.serverConfiguration.Port) .WithClientId(this.serverConfiguration.LocalClient.ClientID) .WithCredentials(this.serverConfiguration.LocalClient.Username, this.serverConfiguration.LocalClient.Password) .Build(); await this.client.ConnectAsync(options, cancellationToken); this.logger.LogTrace("Connected"); await this.client.SubscribeAsync("telemetry", cancellationToken: cancellationToken); this.logger.LogTrace("Subscribed"); this.logger.LogInformation("Started"); } public async Task StopAsync(CancellationToken cancellationToken) { logger.LogTrace("Stopping..."); await this.client.UnsubscribeAsync("telemetry", cancellationToken: cancellationToken); this.logger.LogTrace("Unsubscribed"); await this.client.DisconnectAsync(cancellationToken: cancellationToken); this.logger.LogTrace("Disconnected"); logger.LogInformation("Stopped"); } } public class MQTTClientConfiguration { }