Files
Automatic-Parking/Source/ProofOfConcept/Services/MQTTClient.cs
Szakáts Alpár Zsolt 22df381755
All checks were successful
Build, Push and Run Container / build (push) Successful in 27s
Enables location and gear testing via API
Adds query parameters for latitude and longitude to the `/TestStartParking` endpoint, allowing to simulate different locations.

Adds logic to set the gear to "P" if the received value is null to cover edge cases.

Ensures asynchronous message processing by awaiting the result of ProcessMessage to prevent potential race conditions.
2025-08-25 08:49:57 +02:00

87 lines
3.2 KiB
C#

using System.Text;
using Microsoft.Extensions.Options;
using MQTTnet;
namespace ProofOfConcept.Services;
public class MQTTClient : IHostedService
{
private readonly ILogger<MQTTClient> logger;
private readonly MQTTClientConfiguration configuration;
private readonly MQTTServerConfiguration serverConfiguration;
private readonly IMqttClient client;
public MQTTClient(ILogger<MQTTClient> logger, IOptions<MQTTClientConfiguration> options, IOptions<MQTTServerConfiguration> serverOptions, MessageProcessor messageProcessor)
{
this.logger = logger;
this.configuration = options.Value;
this.serverConfiguration = serverOptions.Value;
client = new MqttClientFactory().CreateMqttClient();
this.client.ApplicationMessageReceivedAsync += (e) =>
{
string topic = e.ApplicationMessage.Topic;
if (topic.IndexOf("/", StringComparison.Ordinal) > 0)
{
string[] parts = topic.Split('/'); //telemetry/5YJ3E7EB7KF291652/v/Location
if (parts is ["telemetry", _, "v", _])
{
string vin = parts[1];
string field = parts[3];
string? message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
logger.LogInformation("Message received: {Message}", message);
messageProcessor.ProcessMessage(vin, field.ToLowerInvariant(), message.StripQuotes()).GetAwaiter().GetResult();
}
else
logger.LogWarning("Topic not passed to message processor: {Topic}", topic);
}
else
logger.LogWarning("Topic not passed to message processor: {Topic}", topic);
return Task.CompletedTask;
};
}
public async Task StartAsync(CancellationToken cancellationToken)
{
this.logger.LogTrace("Stating...");
MqttClientOptions options = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", this.serverConfiguration.Port)
.WithClientId(this.serverConfiguration.LocalClient.ClientID)
.WithCredentials(this.serverConfiguration.LocalClient.Username, this.serverConfiguration.LocalClient.Password)
.Build();
await this.client.ConnectAsync(options, cancellationToken);
this.logger.LogTrace("Connected");
await this.client.SubscribeAsync("telemetry/#", cancellationToken: cancellationToken);
this.logger.LogTrace("Subscribed");
this.logger.LogInformation("Started");
}
public async Task StopAsync(CancellationToken cancellationToken)
{
logger.LogTrace("Stopping...");
await this.client.UnsubscribeAsync("telemetry", cancellationToken: cancellationToken);
this.logger.LogTrace("Unsubscribed");
await this.client.DisconnectAsync(cancellationToken: cancellationToken);
this.logger.LogTrace("Disconnected");
logger.LogInformation("Stopped");
}
}
public class MQTTClientConfiguration
{
}
file static class StringExtensions { public static string StripQuotes(this string value) => value.Trim('"'); }