|
|
@@ -15,23 +15,23 @@ using YSAI.Unility;
|
|
|
namespace YSAI.RabbitMQ
|
|
|
{
|
|
|
/// <summary>
|
|
|
- /// 消费者操作
|
|
|
+ /// 发布者操作
|
|
|
/// </summary>
|
|
|
- public sealed class RabbitMQConsumerOperate : IBaseAbstract<RabbitMQData.Event>, IRabbitMQConsumer
|
|
|
+ public sealed class RabbitMQOperate : IBaseAbstract<RabbitMQData.Event>, IRabbitMQ
|
|
|
{
|
|
|
- protected override string LogHead => "[ RabbitMQConsumerOperate 操作 ]";
|
|
|
- protected override string ClassName => "RabbitMQConsumerOperate";
|
|
|
+ protected override string LogHead => "[ RabbitMQOperate 操作 ]";
|
|
|
+ protected override string ClassName => "RabbitMQOperate";
|
|
|
|
|
|
private static readonly object Lock = new object(); //锁
|
|
|
- private static List<RabbitMQConsumerOperate> ThisObjList = new List<RabbitMQConsumerOperate>(); //自身对象集合
|
|
|
+ private static List<RabbitMQOperate> ThisObjList = new List<RabbitMQOperate>(); //自身对象集合
|
|
|
|
|
|
/// <summary>
|
|
|
/// 单例模式
|
|
|
/// </summary>
|
|
|
/// <returns></returns>
|
|
|
- public static RabbitMQConsumerOperate Instance(RabbitMQData.Basics basics)
|
|
|
+ public static RabbitMQOperate Instance(RabbitMQData.Basics basics)
|
|
|
{
|
|
|
- RabbitMQConsumerOperate? exp = ThisObjList.FirstOrDefault(c => c.basics.Equals(basics));
|
|
|
+ RabbitMQOperate? exp = ThisObjList.FirstOrDefault(c => c.basics.Equals(basics));
|
|
|
if (exp == null)
|
|
|
{
|
|
|
lock (Lock)
|
|
|
@@ -42,7 +42,7 @@ namespace YSAI.RabbitMQ
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- RabbitMQConsumerOperate exp2 = new RabbitMQConsumerOperate(basics);
|
|
|
+ RabbitMQOperate exp2 = new RabbitMQOperate(basics);
|
|
|
ThisObjList.Add(exp2);
|
|
|
return exp2;
|
|
|
}
|
|
|
@@ -50,6 +50,7 @@ namespace YSAI.RabbitMQ
|
|
|
}
|
|
|
return exp;
|
|
|
}
|
|
|
+
|
|
|
/// <summary>
|
|
|
/// 基础数据
|
|
|
/// </summary>
|
|
|
@@ -58,7 +59,7 @@ namespace YSAI.RabbitMQ
|
|
|
/// <summary>
|
|
|
/// 构造函数
|
|
|
/// </summary>
|
|
|
- public RabbitMQConsumerOperate(RabbitMQData.Basics basics)
|
|
|
+ public RabbitMQOperate(RabbitMQData.Basics basics)
|
|
|
{
|
|
|
this.basics = basics;
|
|
|
}
|
|
|
@@ -76,19 +77,10 @@ namespace YSAI.RabbitMQ
|
|
|
/// 队列集合
|
|
|
/// </summary>
|
|
|
private ConcurrentDictionary<string, string> Queues = new ConcurrentDictionary<string, string>();
|
|
|
-
|
|
|
/// <summary>
|
|
|
- /// [消费]接收发布者消息
|
|
|
+ /// 队列与基础数据
|
|
|
/// </summary>
|
|
|
- private void Consumer_Received(object? sender, BasicDeliverEventArgs e, bool AutoAck = false)
|
|
|
- {
|
|
|
- OnEventHandler?.Invoke(this, new RabbitMQData.Event() { MessageContent = Encoding.UTF8.GetString(e.Body.ToArray()), MessageHead = e.RoutingKey });
|
|
|
- if (!AutoAck)
|
|
|
- {
|
|
|
- //当自动确认为false,得手动确认消息
|
|
|
- Channels[e.RoutingKey].BasicAck(e.DeliveryTag, false);
|
|
|
- }
|
|
|
- }
|
|
|
+ private ConcurrentDictionary<string, (QueueDeclareOk QueueDeclareOk, IBasicProperties BasicProperties)> QueueAndBasicProperties = new ConcurrentDictionary<string, (QueueDeclareOk QueueDeclareOk, IBasicProperties BasicProperties)>();
|
|
|
|
|
|
public Task<OperateResult> OnAsync()
|
|
|
{
|
|
|
@@ -153,8 +145,8 @@ namespace YSAI.RabbitMQ
|
|
|
connection.Dispose();
|
|
|
//清空通道集合
|
|
|
Channels.Clear();
|
|
|
- //队列清空
|
|
|
- Queues.Clear();
|
|
|
+ //清空基础数据
|
|
|
+ QueueAndBasicProperties.Clear();
|
|
|
return Break("Off", true);
|
|
|
}
|
|
|
}
|
|
|
@@ -163,7 +155,6 @@ namespace YSAI.RabbitMQ
|
|
|
return Break("Off", false, "打开RabbitMQ连接异常:" + ex.Message);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
public void Dispose()
|
|
|
{
|
|
|
Off();
|
|
|
@@ -172,7 +163,58 @@ namespace YSAI.RabbitMQ
|
|
|
ThisObjList.Remove(this);
|
|
|
}
|
|
|
|
|
|
- public OperateResult Consume(string MessageHead, string Type = "topic", bool AutoAck = false, bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
+ public OperateResult Publish(string Topic, string Content, string Type = "topic", bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
+ {
|
|
|
+ //开始记录运行时间
|
|
|
+ Depart("Publish");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (connection == null || !connection.IsOpen)
|
|
|
+ {
|
|
|
+ return Break("Publish", false, "RabbitMQ连接未打开");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //判断是否存在此通道
|
|
|
+ if (!Channels.ContainsKey(basics.ExChangeName))
|
|
|
+ {
|
|
|
+ IModel channel = connection.CreateModel();
|
|
|
+ //创建交换机
|
|
|
+ channel.ExchangeDeclare(basics.ExChangeName, Type, Durable, AutoDelete);
|
|
|
+ //添加通道
|
|
|
+ Channels.TryAdd(basics.ExChangeName, channel);
|
|
|
+ }
|
|
|
+ //判断队列与消息头是否存在
|
|
|
+ if (!QueueAndBasicProperties.ContainsKey(Topic))
|
|
|
+ {
|
|
|
+ //创建队列
|
|
|
+ QueueDeclareOk queueDeclareOk = Channels[basics.ExChangeName].QueueDeclare(Topic, Durable, Exclusive, AutoDelete);
|
|
|
+ //绑定队列
|
|
|
+ Channels[basics.ExChangeName].QueueBind(Topic, basics.ExChangeName, Topic);
|
|
|
+ //消息持久化
|
|
|
+ IBasicProperties basicProperties = Channels[basics.ExChangeName].CreateBasicProperties();
|
|
|
+ basicProperties.Persistent = true;
|
|
|
+ //添加消息头
|
|
|
+ QueueAndBasicProperties.TryAdd(Topic, (queueDeclareOk, basicProperties));
|
|
|
+ }
|
|
|
+ // 发布消息
|
|
|
+ Channels[basics.ExChangeName].BasicPublish(basics.ExChangeName, Topic, QueueAndBasicProperties[Topic].BasicProperties, Encoding.UTF8.GetBytes(Content));
|
|
|
+ //返回
|
|
|
+ return Break("Publish", true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("Publish", false, "RabbitMQ发布异常:" + ex.Message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> PublishAsync(string Topic, string Content, string Type = "topic", bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Publish(Topic, Content, Type, Durable, Exclusive, AutoDelete));
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Consume(string Topic, string Type = "topic", bool AutoAck = false, bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
{
|
|
|
//开始记录运行时间
|
|
|
Depart("Consume");
|
|
|
@@ -194,12 +236,12 @@ namespace YSAI.RabbitMQ
|
|
|
Channels.TryAdd(basics.ExChangeName, channel);
|
|
|
}
|
|
|
|
|
|
- if (!Queues.ContainsKey(MessageHead))
|
|
|
+ if (!Queues.ContainsKey(Topic))
|
|
|
{
|
|
|
//创建队列
|
|
|
- Channels[basics.ExChangeName].QueueDeclare(MessageHead, Durable, Exclusive, AutoDelete);
|
|
|
+ Channels[basics.ExChangeName].QueueDeclare(Topic, Durable, Exclusive, AutoDelete);
|
|
|
//绑定队列
|
|
|
- Channels[basics.ExChangeName].QueueBind(MessageHead, basics.ExChangeName, MessageHead);
|
|
|
+ Channels[basics.ExChangeName].QueueBind(Topic, basics.ExChangeName, Topic);
|
|
|
|
|
|
EventingBasicConsumer consumer = new(Channels[basics.ExChangeName]);
|
|
|
consumer.Received += delegate (object? sender, BasicDeliverEventArgs e)
|
|
|
@@ -207,7 +249,7 @@ namespace YSAI.RabbitMQ
|
|
|
Consumer_Received(sender, e, AutoAck);
|
|
|
};
|
|
|
// 开启消费者与通道、队列关联
|
|
|
- Channels[basics.ExChangeName].BasicConsume(MessageHead, AutoAck, consumer);
|
|
|
+ Channels[basics.ExChangeName].BasicConsume(Topic, AutoAck, consumer);
|
|
|
//返回
|
|
|
return Break("Consume", true);
|
|
|
}
|
|
|
@@ -227,9 +269,22 @@ namespace YSAI.RabbitMQ
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public Task<OperateResult> ConsumeAsync(string MessageHead, string Type = "topic", bool AutoAck = false, bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
+ public Task<OperateResult> ConsumeAsync(string Topic, string Type = "topic", bool AutoAck = false, bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Consume(Topic, Type, AutoAck, Durable, Exclusive, AutoDelete));
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// [消费]接收发布者消息
|
|
|
+ /// </summary>
|
|
|
+ private void Consumer_Received(object? sender, BasicDeliverEventArgs e, bool AutoAck = false)
|
|
|
{
|
|
|
- return Task.Run(() => Consume(MessageHead, Type, AutoAck, Durable, Exclusive, AutoDelete));
|
|
|
+ OnEventHandler?.Invoke(this, new RabbitMQData.Event() { MessageContent = Encoding.UTF8.GetString(e.Body.ToArray()), MessageHead = e.RoutingKey });
|
|
|
+ if (!AutoAck)
|
|
|
+ {
|
|
|
+ //当自动确认为false,得手动确认消息
|
|
|
+ Channels[e.RoutingKey].BasicAck(e.DeliveryTag, false);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|