|
|
@@ -156,6 +156,7 @@ namespace YSAI.RabbitMQ
|
|
|
return Break("Off", false, "打开RabbitMQ连接异常:" + ex.Message);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
public void Dispose()
|
|
|
{
|
|
|
Off();
|
|
|
@@ -164,7 +165,7 @@ namespace YSAI.RabbitMQ
|
|
|
ThisObjList.Remove(this);
|
|
|
}
|
|
|
|
|
|
- public OperateResult Publish(string Topic, string Content, string Type = "topic", bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
+ public OperateResult Publish(string Content, string? Topic = null, string? Queue = null, string? RoutingKey = null, string Type = "topic", bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
{
|
|
|
//开始记录运行时间
|
|
|
Depart("Publish");
|
|
|
@@ -176,6 +177,19 @@ namespace YSAI.RabbitMQ
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ //主题不为空,则队列、路由设置为主题
|
|
|
+ if (!string.IsNullOrWhiteSpace(Topic))
|
|
|
+ {
|
|
|
+ Queue = Topic;
|
|
|
+ RoutingKey = Topic;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (string.IsNullOrWhiteSpace(Queue) || string.IsNullOrWhiteSpace(RoutingKey))
|
|
|
+ {
|
|
|
+ return Break("Publish", false, "Queue、RoutingKey 不能为空");
|
|
|
+ }
|
|
|
+ }
|
|
|
//判断是否存在此通道
|
|
|
if (!Channels.ContainsKey(basics.ExChangeName))
|
|
|
{
|
|
|
@@ -186,20 +200,21 @@ namespace YSAI.RabbitMQ
|
|
|
Channels.TryAdd(basics.ExChangeName, channel);
|
|
|
}
|
|
|
//判断队列与消息头是否存在
|
|
|
- if (!QueueAndBasicProperties.ContainsKey(Topic))
|
|
|
+ if (!QueueAndBasicProperties.ContainsKey(Queue))
|
|
|
{
|
|
|
//创建队列
|
|
|
- QueueDeclareOk queueDeclareOk = Channels[basics.ExChangeName].QueueDeclare(Topic, Durable, Exclusive, AutoDelete);
|
|
|
+ QueueDeclareOk queueDeclareOk = Channels[basics.ExChangeName].QueueDeclare(Queue, Durable, Exclusive, AutoDelete);
|
|
|
+
|
|
|
//绑定队列
|
|
|
- Channels[basics.ExChangeName].QueueBind(Topic, basics.ExChangeName, Topic);
|
|
|
+ Channels[basics.ExChangeName].QueueBind(Queue, basics.ExChangeName, RoutingKey);
|
|
|
//消息持久化
|
|
|
IBasicProperties basicProperties = Channels[basics.ExChangeName].CreateBasicProperties();
|
|
|
basicProperties.Persistent = true;
|
|
|
//添加消息头
|
|
|
- QueueAndBasicProperties.TryAdd(Topic, (queueDeclareOk, basicProperties));
|
|
|
+ QueueAndBasicProperties.TryAdd(Queue, (queueDeclareOk, basicProperties));
|
|
|
}
|
|
|
// 发布消息
|
|
|
- Channels[basics.ExChangeName].BasicPublish(basics.ExChangeName, Topic, QueueAndBasicProperties[Topic].BasicProperties, Encoding.UTF8.GetBytes(Content));
|
|
|
+ Channels[basics.ExChangeName].BasicPublish(basics.ExChangeName, RoutingKey, QueueAndBasicProperties[Queue].BasicProperties, Encoding.UTF8.GetBytes(Content));
|
|
|
//返回
|
|
|
return Break("Publish", true);
|
|
|
}
|
|
|
@@ -210,12 +225,12 @@ namespace YSAI.RabbitMQ
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public Task<OperateResult> PublishAsync(string Topic, string Content, string Type = "topic", bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
+ public Task<OperateResult> PublishAsync(string Content, string? Topic = null, string? Queue = null, string? RoutingKey = null, string Type = "topic", bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
{
|
|
|
- return Task.Run(() => Publish(Topic, Content, Type, Durable, Exclusive, AutoDelete));
|
|
|
+ return Task.Run(() => Publish( Content,Topic,Queue,RoutingKey, Type, Durable, Exclusive, AutoDelete));
|
|
|
}
|
|
|
|
|
|
- public OperateResult Consume(string Topic, string Type = "topic", bool AutoAck = false, bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
+ public OperateResult Consume(string? Topic = null, string? Queue = null, string? RoutingKey = null, string Type = "topic", bool AutoAck = false, bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
{
|
|
|
//开始记录运行时间
|
|
|
Depart("Consume");
|
|
|
@@ -227,6 +242,19 @@ namespace YSAI.RabbitMQ
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ //主题不为空,则队列、路由设置为主题
|
|
|
+ if (!string.IsNullOrWhiteSpace(Topic))
|
|
|
+ {
|
|
|
+ Queue = Topic;
|
|
|
+ RoutingKey = Topic;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (string.IsNullOrWhiteSpace(Queue) || string.IsNullOrWhiteSpace(RoutingKey))
|
|
|
+ {
|
|
|
+ return Break("Consume", false, "Queue、RoutingKey 不能为空");
|
|
|
+ }
|
|
|
+ }
|
|
|
//判断是否存在此通道
|
|
|
if (!Channels.ContainsKey(basics.ExChangeName))
|
|
|
{
|
|
|
@@ -237,12 +265,13 @@ namespace YSAI.RabbitMQ
|
|
|
Channels.TryAdd(basics.ExChangeName, channel);
|
|
|
}
|
|
|
|
|
|
- if (!Queues.ContainsKey(Topic))
|
|
|
+ if (!Queues.ContainsKey(Queue))
|
|
|
{
|
|
|
//创建队列
|
|
|
- Channels[basics.ExChangeName].QueueDeclare(Topic, Durable, Exclusive, AutoDelete);
|
|
|
+ Channels[basics.ExChangeName].QueueDeclare(Queue, Durable, Exclusive, AutoDelete);
|
|
|
+
|
|
|
//绑定队列
|
|
|
- Channels[basics.ExChangeName].QueueBind(Topic, basics.ExChangeName, Topic);
|
|
|
+ Channels[basics.ExChangeName].QueueBind(Queue, basics.ExChangeName, RoutingKey);
|
|
|
|
|
|
EventingBasicConsumer consumer = new(Channels[basics.ExChangeName]);
|
|
|
consumer.Received += delegate (object? sender, BasicDeliverEventArgs e)
|
|
|
@@ -250,17 +279,13 @@ namespace YSAI.RabbitMQ
|
|
|
Consumer_Received(sender, e, AutoAck);
|
|
|
};
|
|
|
// 开启消费者与通道、队列关联
|
|
|
- Channels[basics.ExChangeName].BasicConsume(Topic, AutoAck, consumer);
|
|
|
+ Channels[basics.ExChangeName].BasicConsume(Queue, AutoAck, consumer);
|
|
|
//返回
|
|
|
return Break("Consume", true);
|
|
|
}
|
|
|
- if (OnEventHandler == null)
|
|
|
- {
|
|
|
- return Break("Consume", false, "已存在此消费,请注册事件返回参数");
|
|
|
- }
|
|
|
else
|
|
|
{
|
|
|
- return Break("Consume", false, "已存在此消费");
|
|
|
+ return Break("Consume", false, $"已存在 {Queue} 的消费");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -270,15 +295,15 @@ namespace YSAI.RabbitMQ
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public Task<OperateResult> ConsumeAsync(string Topic, string Type = "topic", bool AutoAck = false, bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
+ public Task<OperateResult> ConsumeAsync(string? Topic = null, string? Queue = null, string? RoutingKey = null, string Type = "topic", bool AutoAck = false, bool Durable = true, bool Exclusive = false, bool AutoDelete = false)
|
|
|
{
|
|
|
- return Task.Run(() => Consume(Topic, Type, AutoAck, Durable, Exclusive, AutoDelete));
|
|
|
+ return Task.Run(() => Consume(Topic, Queue, RoutingKey, Type, AutoAck, Durable, Exclusive, AutoDelete));
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// [消费]接收发布者消息
|
|
|
/// </summary>
|
|
|
- private void Consumer_Received(object? sender, BasicDeliverEventArgs e, bool AutoAck = false)
|
|
|
+ private void Consumer_Received(object? sender, BasicDeliverEventArgs e, bool AutoAck)
|
|
|
{
|
|
|
dynamic DynamicObj = new ExpandoObject();
|
|
|
DynamicObj.Content = Encoding.UTF8.GetString(e.Body.ToArray());
|
|
|
@@ -294,7 +319,7 @@ namespace YSAI.RabbitMQ
|
|
|
public OperateResult Produce(string Topic, string Content)
|
|
|
{
|
|
|
Depart("Produce");
|
|
|
- OperateResult operateResult = Publish(Topic,Content);
|
|
|
+ OperateResult operateResult = Publish(Content, Topic);
|
|
|
return Break("Produce", operateResult.State, operateResult.Message);
|
|
|
}
|
|
|
|
|
|
@@ -306,7 +331,7 @@ namespace YSAI.RabbitMQ
|
|
|
public OperateResult Subscribe(string Topic)
|
|
|
{
|
|
|
Depart("Subscribe");
|
|
|
- OperateResult operateResult = Consume(Topic);
|
|
|
+ OperateResult operateResult = Consume(Topic:Topic);
|
|
|
return Break("Subscribe", operateResult.State, operateResult.Message);
|
|
|
}
|
|
|
|
|
|
@@ -320,8 +345,15 @@ namespace YSAI.RabbitMQ
|
|
|
Depart("UnSubscribe");
|
|
|
try
|
|
|
{
|
|
|
- Channels[basics.ExChangeName].QueueDelete(Topic);
|
|
|
- return Break("UnSubscribe", true);
|
|
|
+ if (connection == null || !connection.IsOpen)
|
|
|
+ {
|
|
|
+ return Break("UnSubscribe", false, "RabbitMQ连接未打开");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ Channels[basics.ExChangeName].QueueDelete(Topic);
|
|
|
+ return Break("UnSubscribe", true);
|
|
|
+ }
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|