All checks were successful
Build, Push and Run Container / build (push) Successful in 27s
Adds query parameters for latitude and longitude to the `/TestStartParking` endpoint, allowing to simulate different locations. Adds logic to set the gear to "P" if the received value is null to cover edge cases. Ensures asynchronous message processing by awaiting the result of ProcessMessage to prevent potential race conditions.
87 lines
3.2 KiB
C#
87 lines
3.2 KiB
C#
using System.Text;
|
|
using Microsoft.Extensions.Options;
|
|
using MQTTnet;
|
|
|
|
namespace ProofOfConcept.Services;
|
|
|
|
public class MQTTClient : IHostedService
|
|
{
|
|
private readonly ILogger<MQTTClient> logger;
|
|
private readonly MQTTClientConfiguration configuration;
|
|
private readonly MQTTServerConfiguration serverConfiguration;
|
|
|
|
private readonly IMqttClient client;
|
|
|
|
public MQTTClient(ILogger<MQTTClient> logger, IOptions<MQTTClientConfiguration> options, IOptions<MQTTServerConfiguration> serverOptions, MessageProcessor messageProcessor)
|
|
{
|
|
this.logger = logger;
|
|
this.configuration = options.Value;
|
|
this.serverConfiguration = serverOptions.Value;
|
|
|
|
client = new MqttClientFactory().CreateMqttClient();
|
|
|
|
this.client.ApplicationMessageReceivedAsync += (e) =>
|
|
{
|
|
string topic = e.ApplicationMessage.Topic;
|
|
|
|
if (topic.IndexOf("/", StringComparison.Ordinal) > 0)
|
|
{
|
|
string[] parts = topic.Split('/'); //telemetry/5YJ3E7EB7KF291652/v/Location
|
|
|
|
if (parts is ["telemetry", _, "v", _])
|
|
{
|
|
string vin = parts[1];
|
|
string field = parts[3];
|
|
|
|
string? message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
|
|
logger.LogInformation("Message received: {Message}", message);
|
|
messageProcessor.ProcessMessage(vin, field.ToLowerInvariant(), message.StripQuotes()).GetAwaiter().GetResult();
|
|
}
|
|
else
|
|
logger.LogWarning("Topic not passed to message processor: {Topic}", topic);
|
|
}
|
|
else
|
|
logger.LogWarning("Topic not passed to message processor: {Topic}", topic);
|
|
|
|
return Task.CompletedTask;
|
|
};
|
|
}
|
|
|
|
public async Task StartAsync(CancellationToken cancellationToken)
|
|
{
|
|
this.logger.LogTrace("Stating...");
|
|
|
|
MqttClientOptions options = new MqttClientOptionsBuilder()
|
|
.WithTcpServer("localhost", this.serverConfiguration.Port)
|
|
.WithClientId(this.serverConfiguration.LocalClient.ClientID)
|
|
.WithCredentials(this.serverConfiguration.LocalClient.Username, this.serverConfiguration.LocalClient.Password)
|
|
.Build();
|
|
await this.client.ConnectAsync(options, cancellationToken);
|
|
this.logger.LogTrace("Connected");
|
|
|
|
await this.client.SubscribeAsync("telemetry/#", cancellationToken: cancellationToken);
|
|
this.logger.LogTrace("Subscribed");
|
|
|
|
this.logger.LogInformation("Started");
|
|
}
|
|
|
|
public async Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
logger.LogTrace("Stopping...");
|
|
|
|
await this.client.UnsubscribeAsync("telemetry", cancellationToken: cancellationToken);
|
|
this.logger.LogTrace("Unsubscribed");
|
|
|
|
await this.client.DisconnectAsync(cancellationToken: cancellationToken);
|
|
this.logger.LogTrace("Disconnected");
|
|
|
|
logger.LogInformation("Stopped");
|
|
}
|
|
}
|
|
|
|
public class MQTTClientConfiguration
|
|
{
|
|
|
|
}
|
|
|
|
file static class StringExtensions { public static string StripQuotes(this string value) => value.Trim('"'); } |