All checks were successful
Build, Push and Run Container / build (push) Successful in 37s
87 lines
3.2 KiB
C#
87 lines
3.2 KiB
C#
using System.Text;
|
|
using Microsoft.Extensions.Options;
|
|
using MQTTnet;
|
|
|
|
namespace ProofOfConcept.Services;
|
|
|
|
public class MQTTClient : IHostedService
|
|
{
|
|
private readonly ILogger<MQTTClient> logger;
|
|
private readonly MQTTClientConfiguration configuration;
|
|
private readonly MQTTServerConfiguration serverConfiguration;
|
|
|
|
private readonly IMqttClient client;
|
|
|
|
public MQTTClient(ILogger<MQTTClient> logger, IOptions<MQTTClientConfiguration> options, IOptions<MQTTServerConfiguration> serverOptions, IMessageProcessor messageProcessor)
|
|
{
|
|
this.logger = logger;
|
|
this.configuration = options.Value;
|
|
this.serverConfiguration = serverOptions.Value;
|
|
|
|
client = new MqttClientFactory().CreateMqttClient();
|
|
|
|
this.client.ApplicationMessageReceivedAsync += (e) =>
|
|
{
|
|
string topic = e.ApplicationMessage.Topic;
|
|
|
|
if (topic.IndexOf("/", StringComparison.Ordinal) > 0)
|
|
{
|
|
string[] parts = topic.Split('/'); //telemetry/5YJ3E7EB7KF291652/v/Location
|
|
|
|
if (parts is ["telemetry", _, "v", _])
|
|
{
|
|
string vin = parts[1];
|
|
string field = parts[3];
|
|
|
|
string? message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
|
|
logger.LogInformation("Message received: {Message}", message);
|
|
messageProcessor.ProcessMessage(vin, field.ToLowerInvariant(), message.StripQuotes());
|
|
}
|
|
else
|
|
logger.LogWarning("Topic not passed to message processor: {Topic}", topic);
|
|
}
|
|
else
|
|
logger.LogWarning("Topic not passed to message processor: {Topic}", topic);
|
|
|
|
return Task.CompletedTask;
|
|
};
|
|
}
|
|
|
|
public async Task StartAsync(CancellationToken cancellationToken)
|
|
{
|
|
this.logger.LogTrace("Stating...");
|
|
|
|
MqttClientOptions options = new MqttClientOptionsBuilder()
|
|
.WithTcpServer("localhost", this.serverConfiguration.Port)
|
|
.WithClientId(this.serverConfiguration.LocalClient.ClientID)
|
|
.WithCredentials(this.serverConfiguration.LocalClient.Username, this.serverConfiguration.LocalClient.Password)
|
|
.Build();
|
|
await this.client.ConnectAsync(options, cancellationToken);
|
|
this.logger.LogTrace("Connected");
|
|
|
|
await this.client.SubscribeAsync("telemetry/#", cancellationToken: cancellationToken);
|
|
this.logger.LogTrace("Subscribed");
|
|
|
|
this.logger.LogInformation("Started");
|
|
}
|
|
|
|
public async Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
logger.LogTrace("Stopping...");
|
|
|
|
await this.client.UnsubscribeAsync("telemetry", cancellationToken: cancellationToken);
|
|
this.logger.LogTrace("Unsubscribed");
|
|
|
|
await this.client.DisconnectAsync(cancellationToken: cancellationToken);
|
|
this.logger.LogTrace("Disconnected");
|
|
|
|
logger.LogInformation("Stopped");
|
|
}
|
|
}
|
|
|
|
public class MQTTClientConfiguration
|
|
{
|
|
|
|
}
|
|
|
|
file static class StringExtensions { public static string StripQuotes(this string value) => value.Trim('"'); } |