|
|
@@ -6,6 +6,7 @@ using System.Collections.Generic;
|
|
|
using System.Linq;
|
|
|
using System.Text;
|
|
|
using System.Threading.Tasks;
|
|
|
+using Ys.Scada.Core;
|
|
|
using Ys.Scada.Core.MQ;
|
|
|
|
|
|
namespace Ys.Scada.MQTT
|
|
|
@@ -17,9 +18,15 @@ namespace Ys.Scada.MQTT
|
|
|
public Func<TransportMessage, object?, Task>? OnMessageCallback { get; set; }
|
|
|
private MqttClient _consumerClient;
|
|
|
public BrokerAddress BrokerAddress => new("RabbitMQ", $"{_mqttOptions.Server}:{_mqttOptions.Port}");
|
|
|
- public MQTTConsumerClient(IOptions<MQTTOptions> options)
|
|
|
+ private readonly MemoryChannel<TransportMessage> _memoryChannel;
|
|
|
+ private int _id;
|
|
|
+ private readonly Producer<TransportMessage> _producer;
|
|
|
+ public MQTTConsumerClient(IOptions<MQTTOptions> options, MemoryChannel<TransportMessage> memoryChannel, int id)
|
|
|
{
|
|
|
_mqttOptions = options.Value;
|
|
|
+ _memoryChannel = memoryChannel;
|
|
|
+ _producer = new Producer<TransportMessage>(_memoryChannel.Channel.Writer);
|
|
|
+ _id = id;
|
|
|
}
|
|
|
public void Connect()
|
|
|
{
|
|
|
@@ -32,7 +39,7 @@ namespace Ys.Scada.MQTT
|
|
|
var mqttFactory = new MqttFactory();
|
|
|
_consumerClient = mqttFactory.CreateMqttClient() as MqttClient;
|
|
|
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(_mqttOptions.Server, _mqttOptions.Port).Build();
|
|
|
- _consumerClient!.ConnectAsync(mqttClientOptions, CancellationToken.None).GetAwaiter().GetResult();
|
|
|
+ _consumerClient!.ConnectAsync(mqttClientOptions, CancellationToken.None).GetAwaiter().GetResult();
|
|
|
}
|
|
|
}
|
|
|
finally
|
|
|
@@ -44,13 +51,20 @@ namespace Ys.Scada.MQTT
|
|
|
{
|
|
|
Connect();
|
|
|
_consumerClient.ApplicationMessageReceivedAsync += _consumerClient_ApplicationMessageReceivedAsync;
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ cancellationToken.ThrowIfCancellationRequested();
|
|
|
+ cancellationToken.WaitHandle.WaitOne(timeout);
|
|
|
+ }
|
|
|
}
|
|
|
private async Task _consumerClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
|
|
|
{
|
|
|
- var message = new TransportMessage();
|
|
|
- message.Headers.Add("topic",arg.ApplicationMessage.Topic);
|
|
|
- message.Headers.Add("clientid", arg.ClientId);
|
|
|
- await OnMessageCallback!(message,arg);
|
|
|
+ var headers = new Dictionary<string, string?>();
|
|
|
+ headers.Add("topic", arg.ApplicationMessage.Topic);
|
|
|
+ headers.Add("clientid", arg.ClientId);
|
|
|
+ var message = new TransportMessage(headers, arg.ApplicationMessage.Payload);
|
|
|
+
|
|
|
+ await _producer.PublishAsync(message);
|
|
|
}
|
|
|
public async void Subscribe(IEnumerable<string> topics)
|
|
|
{
|
|
|
@@ -60,6 +74,7 @@ namespace Ys.Scada.MQTT
|
|
|
}
|
|
|
public void Dispose()
|
|
|
{
|
|
|
+ _consumerClient?.Dispose();
|
|
|
}
|
|
|
}
|
|
|
}
|