|
|
@@ -5,111 +5,327 @@ using System.Linq;
|
|
|
using System.Text;
|
|
|
using System.Threading.Tasks;
|
|
|
using YSAI.Core.data;
|
|
|
+using YSAI.Core.@enum;
|
|
|
using YSAI.Core.@interface.unify;
|
|
|
+using YSAI.Core.reflection;
|
|
|
using YSAI.Kafka;
|
|
|
+using YSAI.Log;
|
|
|
using YSAI.Mqtt.client;
|
|
|
using YSAI.RabbitMQ;
|
|
|
+using YSAI.Unility;
|
|
|
|
|
|
namespace YSAI.Relay
|
|
|
{
|
|
|
/// <summary>
|
|
|
/// 转发操作
|
|
|
/// </summary>
|
|
|
- public class RelayOperate:IBaseAbstract,IRelay
|
|
|
+ public class RelayOperate : IBaseAbstract<object>, IRelay
|
|
|
{
|
|
|
protected override string LogHead => "[ RelayOperate 操作 ]";
|
|
|
protected override string ClassName => "RelayOperate";
|
|
|
+ /// <summary>
|
|
|
+ /// 文件夹
|
|
|
+ /// </summary>
|
|
|
+ public static string ConfigDirectory = "config";
|
|
|
+ /// <summary>
|
|
|
+ /// 文件
|
|
|
+ /// </summary>
|
|
|
+ public static string ConfigFile = $"{AppDomain.CurrentDomain.BaseDirectory}{ConfigDirectory}//RelayConfig.json";
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 统一返回
|
|
|
+ /// </summary>
|
|
|
+ private OperateResult? operateResult = null;
|
|
|
+ /// <summary>
|
|
|
+ /// 基础数据
|
|
|
+ /// </summary>
|
|
|
+ private RelayData.Basics basics;
|
|
|
|
|
|
private static readonly object Lock = new object(); //锁
|
|
|
- private static List<RelayOperate> ThisObjList = new List<RelayOperate>(); //自身对象集合
|
|
|
+ private static ReflectionOperate? ThisObjList; //自身对象
|
|
|
/// <summary>
|
|
|
/// 单例模式
|
|
|
/// </summary>
|
|
|
/// <returns></returns>
|
|
|
- public static RelayOperate Instance(RelayData.Basics basics)
|
|
|
+ public static ReflectionOperate Instance()
|
|
|
{
|
|
|
- RelayOperate? exp = ThisObjList.FirstOrDefault(c => c.basics.Equals(basics));
|
|
|
- if (exp == null)
|
|
|
+ if (ThisObjList == null)
|
|
|
{
|
|
|
lock (Lock)
|
|
|
{
|
|
|
- if (ThisObjList.Count(c => c.basics.Equals(basics)) > 0)
|
|
|
+ if (ThisObjList == null)
|
|
|
{
|
|
|
- return ThisObjList.First(c => c.basics.Equals(basics));
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- RelayOperate exp2 = new RelayOperate(basics);
|
|
|
- ThisObjList.Add(exp2);
|
|
|
- return exp2;
|
|
|
+ ThisObjList = new ReflectionOperate();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return exp;
|
|
|
+ return ThisObjList;
|
|
|
}
|
|
|
- /// <summary>
|
|
|
- /// kafka消费者
|
|
|
- /// </summary>
|
|
|
- public ConcurrentDictionary<string, KafkaConsumerOperate> KafkaConsumerArray;
|
|
|
+
|
|
|
/// <summary>
|
|
|
/// kafka生产者
|
|
|
/// </summary>
|
|
|
- public ConcurrentDictionary<string, KafkaProducerOperate> KafkaProducerArray;
|
|
|
+ private ConcurrentDictionary<string, KafkaProducerOperate> KafkaProducerArray;
|
|
|
/// <summary>
|
|
|
/// MQTT
|
|
|
/// </summary>
|
|
|
- public ConcurrentDictionary<string, MqttClientOperate> MqttClientArray;
|
|
|
+ private ConcurrentDictionary<string, MqttClientOperate> MqttClientArray;
|
|
|
/// <summary>
|
|
|
/// RabbitMQ
|
|
|
/// </summary>
|
|
|
- public ConcurrentDictionary<string, RabbitMQOperate> RabbitMQArray;
|
|
|
-
|
|
|
-
|
|
|
+ private ConcurrentDictionary<string, RabbitMQOperate> RabbitMQArray;
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// 任务集合
|
|
|
+ /// </summary>
|
|
|
+ public ConcurrentDictionary<string, Task> TaskArray;
|
|
|
+ /// <summary>
|
|
|
+ /// 数据队列
|
|
|
+ /// </summary>
|
|
|
+ private ConcurrentQueue<QueueData> DataQueue;
|
|
|
+ /// <summary>
|
|
|
+ /// 队列里面的数据
|
|
|
+ /// </summary>
|
|
|
+ public class QueueData
|
|
|
+ {
|
|
|
+ /// <summary>
|
|
|
+ /// 主题
|
|
|
+ /// </summary>
|
|
|
+ public string Topic { get; set; }
|
|
|
+ /// <summary>
|
|
|
+ /// 内容
|
|
|
+ /// </summary>
|
|
|
+ public string Content { get; set; }
|
|
|
+ }
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// 任务处理
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ public Task TaskHandle()
|
|
|
+ {
|
|
|
+ string logName = "TaskHandle.log";
|
|
|
+ //起个新线程处理
|
|
|
+ return Task.Factory.StartNew(() =>
|
|
|
+ {
|
|
|
+ //循环
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //队列数据
|
|
|
+ QueueData? queueData;
|
|
|
+ //出列
|
|
|
+ while (DataQueue.TryDequeue(out queueData))
|
|
|
+ {
|
|
|
+ if (queueData != null)
|
|
|
+ {
|
|
|
+ if (KafkaProducerArray != null)
|
|
|
+ {
|
|
|
+ foreach (var operate in KafkaProducerArray)
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, operate.Value.Produce(queueData.Topic, queueData.Content));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (MqttClientArray != null)
|
|
|
+ {
|
|
|
+ foreach (var operate in MqttClientArray)
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, operate.Value.Produce(queueData.Topic, queueData.Content));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (RabbitMQArray != null)
|
|
|
+ {
|
|
|
+ foreach (var operate in RabbitMQArray)
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, operate.Value.Produce(queueData.Topic, queueData.Content));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ LogHelper.Error($"任务处理异常:{ex.Message}", logName);
|
|
|
+ }
|
|
|
+ Thread.Sleep(basics.TaskHandleAccomplishSleepTime);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
/// <summary>
|
|
|
- /// 构造函数
|
|
|
+ /// 无惨构造函数
|
|
|
/// </summary>
|
|
|
- /// <param name="basics"></param>
|
|
|
- public RelayOperate(RelayData.Basics basics)
|
|
|
+ public RelayOperate()
|
|
|
{
|
|
|
- this.basics = basics;
|
|
|
- //当进入构造函数直接初始化
|
|
|
+ this.basics = GetConfig();
|
|
|
+ Init();
|
|
|
+ }
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// 获取配置
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ private RelayData.Basics? GetConfig()
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (File.Exists(ConfigFile)) //配置存在
|
|
|
+ {
|
|
|
+ return JsonTool.StringToJsonEntity<RelayData.Basics>(FileTool.FileToString(ConfigFile));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ Console.WriteLine($"获取配置异常:{ex.Message}");
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
- /// 基础数据
|
|
|
+ /// 初始化状态
|
|
|
/// </summary>
|
|
|
- private RelayData.Basics basics;
|
|
|
+ private bool InitState = false;
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 初始化
|
|
|
+ /// </summary>
|
|
|
+ private void Init()
|
|
|
+ {
|
|
|
+ if (basics != null)
|
|
|
+ {
|
|
|
+
|
|
|
+ if (basics.RabbitMQDataArray != null)
|
|
|
+ {
|
|
|
+ foreach (var configData in basics.RabbitMQDataArray)
|
|
|
+ {
|
|
|
+ RabbitMQOperate operate = RabbitMQOperate.Instance(configData);
|
|
|
+ operateResult = operate.On();
|
|
|
+ if (operateResult.State)
|
|
|
+ {
|
|
|
+ RabbitMQArray.TryAdd(configData.SN, operate);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //通知到外部,让外部来觉得是否重新初始化
|
|
|
+ OnEventHandler?.Invoke(this, operateResult);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (basics.MqttClientDataArray != null)
|
|
|
+ {
|
|
|
+ foreach (var configData in basics.MqttClientDataArray)
|
|
|
+ {
|
|
|
+ MqttClientOperate operate = MqttClientOperate.Instance(configData);
|
|
|
+ operateResult = operate.On();
|
|
|
+ if (operateResult.State)
|
|
|
+ {
|
|
|
+ MqttClientArray.TryAdd(configData.SN, operate);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //通知到外部,让外部来觉得是否重新初始化
|
|
|
+ OnEventHandler?.Invoke(this, operateResult);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (basics.KafkaProducerDataArray != null)
|
|
|
+ {
|
|
|
+ foreach (var configData in basics.KafkaProducerDataArray)
|
|
|
+ {
|
|
|
+ KafkaProducerOperate operate = KafkaProducerOperate.Instance(configData);
|
|
|
+ KafkaProducerArray.TryAdd(configData.SN, operate);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ InitState = true;
|
|
|
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, Break(Depart("Init"), false, "配置文件不存在"));
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
public OperateResult Produce(string Topic, string Content)
|
|
|
{
|
|
|
- throw new NotImplementedException();
|
|
|
+ Depart("Produce");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //主题
|
|
|
+ string topic = Topic;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //如果包类型可以被转换,直接走对应配置,如果不能,则走传入的主题
|
|
|
+ PacketType? packetType = (PacketType)Enum.Parse(typeof(PacketType), Topic);
|
|
|
+ if (packetType != null)
|
|
|
+ {
|
|
|
+ switch (packetType)
|
|
|
+ {
|
|
|
+ case PacketType.Message:
|
|
|
+ topic = basics.MessageTopic;
|
|
|
+ break;
|
|
|
+ case PacketType.Status:
|
|
|
+ topic = basics.StatusTopic;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch { }
|
|
|
+ if (!InitState)
|
|
|
+ {
|
|
|
+ return Break("Produce", false, "尚未初始化");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //当队列为空,初始化队列
|
|
|
+ if (DataQueue == null)
|
|
|
+ {
|
|
|
+ DataQueue = new ConcurrentQueue<QueueData>();
|
|
|
+ TaskArray = new ConcurrentDictionary<string, Task>();
|
|
|
+ //启动
|
|
|
+ for (int i = 0; i < basics.TaskHandleCount; i++)
|
|
|
+ {
|
|
|
+ TaskArray.TryAdd(Guid.NewGuid().ToString(), TaskHandle());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //入列
|
|
|
+ DataQueue.Enqueue(new QueueData() { Topic = Topic, Content = Content });
|
|
|
+ }
|
|
|
+ return Break("Produce", true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("Produce", false, ex.Message);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public Task<OperateResult> ProduceAsync(string Topic, string Content)
|
|
|
{
|
|
|
- throw new NotImplementedException();
|
|
|
+ return Task.Run(() => Produce(Topic, Content));
|
|
|
}
|
|
|
|
|
|
public OperateResult Consume(string Topic)
|
|
|
{
|
|
|
- throw new NotImplementedException();
|
|
|
+ return Break(Depart("Consume"), false, "目前暂不支持消费");
|
|
|
}
|
|
|
|
|
|
public Task<OperateResult> ConsumeAsync(string Topic)
|
|
|
{
|
|
|
- throw new NotImplementedException();
|
|
|
+ return Task.Run(() => Consume(Topic));
|
|
|
}
|
|
|
|
|
|
public void Dispose()
|
|
|
{
|
|
|
- throw new NotImplementedException();
|
|
|
+ if (KafkaProducerArray != null) { foreach (var item in KafkaProducerArray) { item.Value.Dispose(); } KafkaProducerArray = null; }
|
|
|
+ if (MqttClientArray != null) { foreach (var item in MqttClientArray) { item.Value.Dispose(); } MqttClientArray = null; }
|
|
|
+ if (RabbitMQArray != null) { foreach (var item in RabbitMQArray) { item.Value.Dispose(); } RabbitMQArray = null; }
|
|
|
+ if (TaskArray != null) { foreach (var item in TaskArray) { item.Value.Dispose(); } TaskArray = null; }
|
|
|
+ ThisObjList = null;
|
|
|
+ GC.Collect();
|
|
|
+ GC.SuppressFinalize(this);
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
}
|