|
@@ -0,0 +1,235 @@
|
|
|
|
|
+using RabbitMQ.Client;
|
|
|
|
|
+using RabbitMQ.Client.Events;
|
|
|
|
|
+using System;
|
|
|
|
|
+using System.Collections.Concurrent;
|
|
|
|
|
+using System.Collections.Generic;
|
|
|
|
|
+using System.Linq;
|
|
|
|
|
+using System.Text;
|
|
|
|
|
+using System.Threading.Channels;
|
|
|
|
|
+using System.Threading.Tasks;
|
|
|
|
|
+using YSAI.Core.data;
|
|
|
|
|
+using YSAI.Core.@interface.only;
|
|
|
|
|
+using YSAI.Core.@interface.unify;
|
|
|
|
|
+using YSAI.Unility;
|
|
|
|
|
+
|
|
|
|
|
+namespace YSAI.RabbitMQ
|
|
|
|
|
+{
|
|
|
|
|
+ /// <summary>
|
|
|
|
|
+ /// 消费者操作
|
|
|
|
|
+ /// </summary>
|
|
|
|
|
+ public class RabbitMQConsumerOperate : IBase<RabbitMQData.Event>, IRabbitMQConsumer
|
|
|
|
|
+ {
|
|
|
|
|
+ public override string LogHead => "[ RabbitMQConsumerOperate 操作 ]";
|
|
|
|
|
+ public override string ClassName => "RabbitMQConsumerOperate";
|
|
|
|
|
+
|
|
|
|
|
+ private static readonly object Lock = new object(); //锁
|
|
|
|
|
+ private static List<RabbitMQConsumerOperate> ThisObjList = new List<RabbitMQConsumerOperate>(); //自身对象集合
|
|
|
|
|
+
|
|
|
|
|
+ /// <summary>
|
|
|
|
|
+ /// 单例模式
|
|
|
|
|
+ /// </summary>
|
|
|
|
|
+ /// <returns></returns>
|
|
|
|
|
+ public static RabbitMQConsumerOperate Instance(RabbitMQData.Basics basics)
|
|
|
|
|
+ {
|
|
|
|
|
+ RabbitMQConsumerOperate? exp = ThisObjList.FirstOrDefault(c => c.basics.Equals(basics));
|
|
|
|
|
+ if (exp == null)
|
|
|
|
|
+ {
|
|
|
|
|
+ lock (Lock)
|
|
|
|
|
+ {
|
|
|
|
|
+ if (ThisObjList.Count(c => c.basics.Equals(basics)) > 0)
|
|
|
|
|
+ {
|
|
|
|
|
+ return ThisObjList.First(c => c.basics.Equals(basics));
|
|
|
|
|
+ }
|
|
|
|
|
+ else
|
|
|
|
|
+ {
|
|
|
|
|
+ RabbitMQConsumerOperate exp2 = new RabbitMQConsumerOperate(basics);
|
|
|
|
|
+ ThisObjList.Add(exp2);
|
|
|
|
|
+ return exp2;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return exp;
|
|
|
|
|
+ }
|
|
|
|
|
+ /// <summary>
|
|
|
|
|
+ /// 基础数据
|
|
|
|
|
+ /// </summary>
|
|
|
|
|
+ private RabbitMQData.Basics basics { get; set; }
|
|
|
|
|
+
|
|
|
|
|
+ /// <summary>
|
|
|
|
|
+ /// 构造函数
|
|
|
|
|
+ /// </summary>
|
|
|
|
|
+ public RabbitMQConsumerOperate(RabbitMQData.Basics basics)
|
|
|
|
|
+ {
|
|
|
|
|
+ this.basics = basics;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /// <summary>
|
|
|
|
|
+ /// 连接对象
|
|
|
|
|
+ /// </summary>
|
|
|
|
|
+ private IConnection connection { get; set; }
|
|
|
|
|
+
|
|
|
|
|
+ /// <summary>
|
|
|
|
|
+ /// 通道集合
|
|
|
|
|
+ /// </summary>
|
|
|
|
|
+ private ConcurrentDictionary<string, IModel> Channels = new ConcurrentDictionary<string, IModel>();
|
|
|
|
|
+ /// <summary>
|
|
|
|
|
+ /// 队列集合
|
|
|
|
|
+ /// </summary>
|
|
|
|
|
+ private ConcurrentDictionary<string, string> Queues = new ConcurrentDictionary<string, string>();
|
|
|
|
|
+
|
|
|
|
|
+ /// <summary>
|
|
|
|
|
+ /// [消费]接收发布者消息
|
|
|
|
|
+ /// </summary>
|
|
|
|
|
+ private void Consumer_Received(object? sender, BasicDeliverEventArgs e, bool AutoAck = false)
|
|
|
|
|
+ {
|
|
|
|
|
+ OnEventHandler?.Invoke(this, new RabbitMQData.Event() { MessageContent = Encoding.UTF8.GetString(e.Body.ToArray()), MessageHead = e.RoutingKey });
|
|
|
|
|
+ if (!AutoAck)
|
|
|
|
|
+ {
|
|
|
|
|
+ //当自动确认为false,得手动确认消息
|
|
|
|
|
+ Channels[e.RoutingKey].BasicAck(e.DeliveryTag, false);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public Task<OperateResult> OnAsync()
|
|
|
|
|
+ {
|
|
|
|
|
+ return Task.Run(() => On());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public OperateResult On()
|
|
|
|
|
+ {
|
|
|
|
|
+ //开始记录运行时间
|
|
|
|
|
+ RunTimeTool.Instance($"{ClassName}.On").StartRecord();
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ if (connection == null || !connection.IsOpen)
|
|
|
|
|
+ {
|
|
|
|
|
+ //连接工厂实例
|
|
|
|
|
+ ConnectionFactory factory = new();
|
|
|
|
|
+ //设置服务端参数
|
|
|
|
|
+ factory.HostName = basics.HostName;
|
|
|
|
|
+ factory.Port = basics.Port;
|
|
|
|
|
+ factory.UserName = basics.UserName;
|
|
|
|
|
+ factory.Password = basics.Password;
|
|
|
|
|
+ //创建连接
|
|
|
|
|
+ connection = factory.CreateConnection();
|
|
|
|
|
+ //判断是否连接
|
|
|
|
|
+ if (connection.IsOpen)
|
|
|
|
|
+ {
|
|
|
|
|
+ return Break("On", true);
|
|
|
|
|
+ }
|
|
|
|
|
+ return Break("On", false, "RabbitMQ连接打开失败");
|
|
|
|
|
+ }
|
|
|
|
|
+ else
|
|
|
|
|
+ {
|
|
|
|
|
+ return Break("On", false, "RabbitMQ连接已打开");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (Exception ex)
|
|
|
|
|
+ {
|
|
|
|
|
+ return Break("On", false, "打开RabbitMQ连接异常:" + ex.Message);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public Task<OperateResult> OffAsync()
|
|
|
|
|
+ {
|
|
|
|
|
+ return Task.Run(() => Off());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public OperateResult Off()
|
|
|
|
|
+ {
|
|
|
|
|
+ //开始记录运行时间
|
|
|
|
|
+ RunTimeTool.Instance($"{ClassName}.Off").StartRecord();
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ if (connection == null || !connection.IsOpen)
|
|
|
|
|
+ {
|
|
|
|
|
+ return Break("Off", false, "RabbitMQ连接未打开");
|
|
|
|
|
+ }
|
|
|
|
|
+ else
|
|
|
|
|
+ {
|
|
|
|
|
+ //关闭
|
|
|
|
|
+ connection.Close();
|
|
|
|
|
+ //释放
|
|
|
|
|
+ connection.Dispose();
|
|
|
|
|
+ //清空通道集合
|
|
|
|
|
+ Channels.Clear();
|
|
|
|
|
+ //队列清空
|
|
|
|
|
+ Queues.Clear();
|
|
|
|
|
+ return Break("Off", true);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (Exception ex)
|
|
|
|
|
+ {
|
|
|
|
|
+ return Break("Off", false, "打开RabbitMQ连接异常:" + ex.Message);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public void Dispose()
|
|
|
|
|
+ {
|
|
|
|
|
+ Off();
|
|
|
|
|
+ GC.Collect();
|
|
|
|
|
+ GC.SuppressFinalize(this);
|
|
|
|
|
+ ThisObjList.Remove(this);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public OperateResult Consume(string MessageHead, string Type = "topic", bool AutoAck = false, bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
|
|
+ {
|
|
|
|
|
+ //开始记录运行时间
|
|
|
|
|
+ RunTimeTool.Instance($"{ClassName}.Consume").StartRecord();
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ if (connection == null || !connection.IsOpen)
|
|
|
|
|
+ {
|
|
|
|
|
+ return Break("Consume", false, "RabbitMQ连接未打开");
|
|
|
|
|
+ }
|
|
|
|
|
+ else
|
|
|
|
|
+ {
|
|
|
|
|
+ //判断是否存在此通道
|
|
|
|
|
+ if (!Channels.ContainsKey(basics.ExChangeName))
|
|
|
|
|
+ {
|
|
|
|
|
+ IModel channel = connection.CreateModel();
|
|
|
|
|
+ //创建交换机
|
|
|
|
|
+ channel.ExchangeDeclare(basics.ExChangeName, Type, Durable, AutoDelete);
|
|
|
|
|
+ //添加通道
|
|
|
|
|
+ Channels.TryAdd(basics.ExChangeName, channel);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (!Queues.ContainsKey(MessageHead))
|
|
|
|
|
+ {
|
|
|
|
|
+ //创建队列
|
|
|
|
|
+ Channels[basics.ExChangeName].QueueDeclare(MessageHead, Durable, Exclusive, AutoDelete);
|
|
|
|
|
+ //绑定队列
|
|
|
|
|
+ Channels[basics.ExChangeName].QueueBind(MessageHead, basics.ExChangeName, MessageHead);
|
|
|
|
|
+
|
|
|
|
|
+ EventingBasicConsumer consumer = new(Channels[basics.ExChangeName]);
|
|
|
|
|
+ consumer.Received += delegate (object? sender, BasicDeliverEventArgs e)
|
|
|
|
|
+ {
|
|
|
|
|
+ Consumer_Received(sender, e, AutoAck);
|
|
|
|
|
+ };
|
|
|
|
|
+ // 开启消费者与通道、队列关联
|
|
|
|
|
+ Channels[basics.ExChangeName].BasicConsume(MessageHead, AutoAck, consumer);
|
|
|
|
|
+ //返回
|
|
|
|
|
+ return Break("Consume", true);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (OnEventHandler == null)
|
|
|
|
|
+ {
|
|
|
|
|
+ return Break("Consume", false, "已存在此消费,请注册事件返回参数");
|
|
|
|
|
+ }
|
|
|
|
|
+ else
|
|
|
|
|
+ {
|
|
|
|
|
+ return Break("Consume", false, "已存在此消费");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (Exception ex)
|
|
|
|
|
+ {
|
|
|
|
|
+ return Break("Consume", false, "RabbitMQ消费异常:" + ex.Message);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public Task<OperateResult> ConsumeAsync(string MessageHead, string Type = "topic", bool AutoAck = false, bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
|
|
+ {
|
|
|
|
|
+ return Task.Run(() => Consume(MessageHead, Type, AutoAck, Durable, Exclusive, AutoDelete));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|