|
|
@@ -1,53 +1,49 @@
|
|
|
-using System;
|
|
|
+using Newtonsoft.Json.Linq;
|
|
|
+using System;
|
|
|
using System.Collections.Concurrent;
|
|
|
using System.Collections.Generic;
|
|
|
using System.Linq;
|
|
|
+using System.Reflection;
|
|
|
using System.Text;
|
|
|
using System.Threading.Tasks;
|
|
|
using YSAI.Core.data;
|
|
|
using YSAI.Core.@enum;
|
|
|
using YSAI.Core.@interface.unify;
|
|
|
using YSAI.Core.reflection;
|
|
|
-using YSAI.Kafka;
|
|
|
using YSAI.Log;
|
|
|
-using YSAI.Mqtt.client;
|
|
|
-using YSAI.RabbitMQ;
|
|
|
using YSAI.Unility;
|
|
|
|
|
|
namespace YSAI.RelayManage
|
|
|
{
|
|
|
/// <summary>
|
|
|
- /// 转发操作
|
|
|
+ /// 转发管理操作,
|
|
|
+ /// 库:*.dll,
|
|
|
+ /// 库配置:命名空间 + 类名.SN.Config.json
|
|
|
/// </summary>
|
|
|
- public class RelayManageOperate : IBaseAbstract, IRelay
|
|
|
+ public class RelayManageOperate : IBaseAbstract
|
|
|
{
|
|
|
protected override string LogHead => "[ RelayManageOperate 操作 ]";
|
|
|
protected override string ClassName => "RelayManageOperate";
|
|
|
/// <summary>
|
|
|
/// 文件夹
|
|
|
/// </summary>
|
|
|
- public static string ConfigDirectory = "config";
|
|
|
+ public readonly static string ConfigDirectory = "config";
|
|
|
/// <summary>
|
|
|
/// 文件
|
|
|
/// </summary>
|
|
|
- public static string ConfigFile = $"{AppDomain.CurrentDomain.BaseDirectory}{ConfigDirectory}//RelayManageConfig.json";
|
|
|
+ public readonly static string ConfigFile = $"{AppDomain.CurrentDomain.BaseDirectory}{ConfigDirectory}//RelayManageConfig.json";
|
|
|
|
|
|
/// <summary>
|
|
|
- /// 统一返回
|
|
|
+ /// 配置数据
|
|
|
/// </summary>
|
|
|
- private OperateResult? operateResult = null;
|
|
|
- /// <summary>
|
|
|
- /// 基础数据
|
|
|
- /// </summary>
|
|
|
- private RelayManageData.Basics basics;
|
|
|
-
|
|
|
+ private RelayManageData.Basics? basics;
|
|
|
private static readonly object Lock = new object(); //锁
|
|
|
- private static ReflectionOperate? ThisObjList; //自身对象
|
|
|
+ private static RelayManageOperate? ThisObjList; //自身对象
|
|
|
/// <summary>
|
|
|
/// 单例模式
|
|
|
/// </summary>
|
|
|
/// <returns></returns>
|
|
|
- public static ReflectionOperate Instance()
|
|
|
+ public static RelayManageOperate Instance()
|
|
|
{
|
|
|
if (ThisObjList == null)
|
|
|
{
|
|
|
@@ -55,25 +51,82 @@ namespace YSAI.RelayManage
|
|
|
{
|
|
|
if (ThisObjList == null)
|
|
|
{
|
|
|
- ThisObjList = new ReflectionOperate();
|
|
|
+ ThisObjList = new RelayManageOperate();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return ThisObjList;
|
|
|
}
|
|
|
+ /// <summary>
|
|
|
+ /// 获取配置
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ public RelayManageData.Basics? GetConfig()
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (File.Exists(ConfigFile)) //配置存在
|
|
|
+ {
|
|
|
+ return JsonTool.StringToJsonEntity<RelayManageData.Basics>(FileTool.FileToString(ConfigFile));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ FileTool.StringToFile(ConfigFile, new RelayManageData.Basics().ToJson().JsonFormatting());
|
|
|
+ return GetConfig();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ Console.WriteLine($"获取配置异常:{ex.Message}");
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
|
|
|
/// <summary>
|
|
|
- /// kafka生产者
|
|
|
+ /// 设置配置
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ public bool SetConfig(RelayManageData.Basics basics)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //实体转JSON字符串
|
|
|
+ string json = JsonTool.JsonEntityToString(basics);
|
|
|
+ //写入文件
|
|
|
+ File.WriteAllText(ConfigFile, JsonTool.JsonFormatting(json));
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ Console.WriteLine($"设置配置异常:{ex.Message}");
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 构造函数处理配置
|
|
|
+ /// </summary>
|
|
|
+ public RelayManageOperate()
|
|
|
+ {
|
|
|
+ Monitor();
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 实例容器集合
|
|
|
+ /// </summary>
|
|
|
+ private ConcurrentDictionary<string, IRelay> InstanceIoc;
|
|
|
+ /// <summary>
|
|
|
+ /// 库类型容器
|
|
|
/// </summary>
|
|
|
- private ConcurrentDictionary<string, KafkaProducerOperate> KafkaProducerArray;
|
|
|
+ private ConcurrentDictionary<string, Type> TypeIoc;
|
|
|
/// <summary>
|
|
|
- /// MQTT
|
|
|
+ /// 文件夹监控
|
|
|
/// </summary>
|
|
|
- private ConcurrentDictionary<string, MqttClientOperate> MqttClientArray;
|
|
|
+ private FileSystemWatcher watcherLibFolder;
|
|
|
/// <summary>
|
|
|
- /// RabbitMQ
|
|
|
+ /// 文件夹监控
|
|
|
/// </summary>
|
|
|
- private ConcurrentDictionary<string, RabbitMQOperate> RabbitMQArray;
|
|
|
+ private FileSystemWatcher watcherLibConfigFolder;
|
|
|
/// <summary>
|
|
|
/// 任务集合
|
|
|
/// </summary>
|
|
|
@@ -96,250 +149,661 @@ namespace YSAI.RelayManage
|
|
|
/// </summary>
|
|
|
public string Content { get; set; }
|
|
|
}
|
|
|
-
|
|
|
/// <summary>
|
|
|
- /// 任务处理
|
|
|
+ /// 监控文件夹
|
|
|
/// </summary>
|
|
|
- /// <returns></returns>
|
|
|
- public Task TaskHandle()
|
|
|
+ public Task Monitor()
|
|
|
{
|
|
|
- string logName = "TaskHandle.log";
|
|
|
- //起个新线程处理
|
|
|
+ //起一个新线程
|
|
|
return Task.Factory.StartNew(() =>
|
|
|
{
|
|
|
- //循环
|
|
|
- while (TaskArray != null)
|
|
|
+ //实例容器实例化
|
|
|
+ if (InstanceIoc == null)
|
|
|
{
|
|
|
- try
|
|
|
+ InstanceIoc = new ConcurrentDictionary<string, IRelay>();
|
|
|
+ }
|
|
|
+ //程序集
|
|
|
+ if (TypeIoc == null)
|
|
|
+ {
|
|
|
+ TypeIoc = new ConcurrentDictionary<string, Type>();
|
|
|
+ }
|
|
|
+ //获取配置
|
|
|
+ if (basics == null)
|
|
|
+ {
|
|
|
+ this.basics = GetConfig();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (basics != null)
|
|
|
+ {
|
|
|
+ //检索
|
|
|
+ Search();
|
|
|
+
|
|
|
+ //文件夹监控
|
|
|
+ watcherLibFolder = new FileSystemWatcher(basics.LibFolder);
|
|
|
+ //监控的配置
|
|
|
+ watcherLibFolder.Filter = "*.dll";
|
|
|
+ //当文件夹中新增文件
|
|
|
+ watcherLibFolder.Created += delegate (object sender, FileSystemEventArgs e) { Watcher_Created(sender, e, 0); };
|
|
|
+ //当文件夹中删除文件
|
|
|
+ watcherLibFolder.Deleted += delegate (object sender, FileSystemEventArgs e) { Watcher_Deleted(sender, e, 0); };
|
|
|
+ //启动监听
|
|
|
+ watcherLibFolder.EnableRaisingEvents = true;
|
|
|
+
|
|
|
+
|
|
|
+ //文件夹监视
|
|
|
+ watcherLibConfigFolder = new FileSystemWatcher(basics.LibConfigFolder);
|
|
|
+ //监控的配置
|
|
|
+ watcherLibConfigFolder.Filter = "*.Config.json";
|
|
|
+ //当文件夹中新增文件
|
|
|
+ watcherLibConfigFolder.Created += delegate (object sender, FileSystemEventArgs e) { Watcher_Created(sender, e, 1); };
|
|
|
+ //当文件夹中删除文件
|
|
|
+ watcherLibConfigFolder.Deleted += delegate (object sender, FileSystemEventArgs e) { Watcher_Deleted(sender, e, 1); };
|
|
|
+ //启动监听
|
|
|
+ watcherLibConfigFolder.EnableRaisingEvents = true;
|
|
|
+
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $"配置文件不存在"));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 当文件夹中删除文件
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="sender"></param>
|
|
|
+ /// <param name="e"></param>
|
|
|
+ /// <param name="Type">【0:库文件】【1:配置文件】</param>
|
|
|
+ private void Watcher_Deleted(object sender, FileSystemEventArgs e, int Type)
|
|
|
+ {
|
|
|
+ //程序集SN
|
|
|
+ string TypeSN = string.Empty;
|
|
|
+ switch (Type)
|
|
|
+ {
|
|
|
+ case 0:
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, $"{e.Name} 文件被删除,移除此库程序集,并移除所有实例"));
|
|
|
+ //程序集SN
|
|
|
+ TypeSN = e.Name.Replace(".dll", string.Empty);
|
|
|
+
|
|
|
+ foreach (var item in InstanceIoc)
|
|
|
{
|
|
|
- //队列数据
|
|
|
- QueueData? queueData;
|
|
|
- //出列
|
|
|
- while (DataQueue.TryDequeue(out queueData))
|
|
|
+ if (item.Key.Contains(TypeSN))
|
|
|
{
|
|
|
- if (queueData != null)
|
|
|
+ InstanceIoc[item.Key].Dispose();
|
|
|
+ if (InstanceIoc.Remove(item.Key, out _))
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, $"{e.Name} 移除配置实例 {item.Key} 成功"));
|
|
|
+ }
|
|
|
+ else
|
|
|
{
|
|
|
- if (KafkaProducerArray != null)
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $"{e.Name} 移除配置实例 {item.Key} 失败"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ foreach (var item in TypeIoc)
|
|
|
+ {
|
|
|
+ if (item.Key.Contains(TypeSN))
|
|
|
+ {
|
|
|
+ if (TypeIoc.Remove(item.Key, out _))
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, $"{e.Name} 移除程序集 {item.Key} 成功"));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $"{e.Name} 移除程序集 {item.Key} 失败"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case 1:
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, $"{e.Name} 文件被删除,移除对应配置实例"));
|
|
|
+ //程序集SN
|
|
|
+ TypeSN = e.Name.Split('-')[0].Replace(".Config", string.Empty);
|
|
|
+ //分割
|
|
|
+ string[] strs = e.Name.Split('-');
|
|
|
+ if (strs.Length > 1)
|
|
|
+ {
|
|
|
+ //获取配置实例SN
|
|
|
+ string SN = $"{TypeSN}:{e.Name.Split('-')[1].Split('.')[0]}";
|
|
|
+ if (InstanceIoc.ContainsKey(SN))
|
|
|
+ {
|
|
|
+ InstanceIoc[SN].Dispose();
|
|
|
+ if (InstanceIoc.Remove(SN, out _))
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, $"{e.Name} 移除配置实例成功"));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $"{e.Name} 移除配置实例失败"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $"{e.Name} 移除配置实例失败 {TypeSN} 实例不存在"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 当文件夹中新增文件
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="sender"></param>
|
|
|
+ /// <param name="e"></param>
|
|
|
+ /// <param name="Type">【0:库文件】【1:配置文件】</param>
|
|
|
+ private void Watcher_Created(object sender, FileSystemEventArgs e, int Type)
|
|
|
+ {
|
|
|
+ switch (Type)
|
|
|
+ {
|
|
|
+ case 0:
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, $"{e.Name} 文件新增,添加此库程序集"));
|
|
|
+ SearchType(e.FullPath);
|
|
|
+ break;
|
|
|
+ case 1:
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, $"{e.Name} 文件新增,新增对应配置实例"));
|
|
|
+ #region 新增对应配置实例
|
|
|
+ int until = 5;
|
|
|
+ int i = 0;
|
|
|
+ bool success = false;
|
|
|
+ while (!success && i < until)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ string path = String.Format(e.FullPath);
|
|
|
+ string? filename = Path.GetFileName(path);
|
|
|
+ using (Stream fs = File.OpenRead(@path))
|
|
|
+ {
|
|
|
+ StreamReader srdPreview = new StreamReader(fs);
|
|
|
+ String temp = string.Empty;
|
|
|
+ while (srdPreview.Peek() > -1)
|
|
|
{
|
|
|
- foreach (var operate in KafkaProducerArray)
|
|
|
- {
|
|
|
- operateResult = operate.Value.Produce(queueData.Topic, queueData.Content);
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(operateResult.State, "请查看RData", operateResult, ResultType.OperateResult));
|
|
|
- }
|
|
|
+ String input = srdPreview.ReadLine();
|
|
|
+ temp += input;
|
|
|
}
|
|
|
- if (MqttClientArray != null)
|
|
|
+ srdPreview.Close();
|
|
|
+ srdPreview.Dispose();
|
|
|
+
|
|
|
+ //获取程序集SN
|
|
|
+ string TypeSN = e.Name.Split('-')[0].Replace(".Config", string.Empty);
|
|
|
+ //分割
|
|
|
+ string[] strs = e.Name.Split('-');
|
|
|
+ if (strs.Length > 1)
|
|
|
{
|
|
|
- foreach (var operate in MqttClientArray)
|
|
|
+ //获取配置实例SN
|
|
|
+ string SN = $"{TypeSN}:{strs[1].Split('.')[0]}";
|
|
|
+ //判断是否存在此程序集SN
|
|
|
+ if (TypeIoc.ContainsKey(TypeSN))
|
|
|
{
|
|
|
- operateResult = operate.Value.Produce(queueData.Topic, queueData.Content);
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(operateResult.State, "请查看RData", operateResult, ResultType.OperateResult));
|
|
|
+ if (!InstanceIoc.ContainsKey(SN))
|
|
|
+ {
|
|
|
+ ConfigCreateInstance(TypeIoc[TypeSN], temp);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $" {e.Name} 此配置实例已存在"));
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- if (RabbitMQArray != null)
|
|
|
- {
|
|
|
- foreach (var operate in RabbitMQArray)
|
|
|
+ else
|
|
|
{
|
|
|
- operateResult = operate.Value.Produce(queueData.Topic, queueData.Content);
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(operateResult.State, "请查看RData", operateResult, ResultType.OperateResult));
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $" {e.Name} 新增对应配置创建实例失败 {TypeSN} 程序集不存在"));
|
|
|
}
|
|
|
}
|
|
|
+ fs.Close();
|
|
|
+ fs.Dispose();
|
|
|
}
|
|
|
+ success = true;
|
|
|
+ }
|
|
|
+ catch
|
|
|
+ {
|
|
|
+ i++;
|
|
|
+ Thread.Sleep(TimeSpan.FromSeconds(1));
|
|
|
}
|
|
|
}
|
|
|
- catch (Exception ex)
|
|
|
- {
|
|
|
- LogHelper.Error($"任务处理异常:{ex.Message}", logName);
|
|
|
- }
|
|
|
- Thread.Sleep(basics.TaskHandleAccomplishSleepTime);
|
|
|
- }
|
|
|
- });
|
|
|
+ #endregion
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
- /// 无惨构造函数
|
|
|
+ /// 检索文件并创建实例
|
|
|
/// </summary>
|
|
|
- public RelayManageOperate()
|
|
|
+ private void Search()
|
|
|
{
|
|
|
- this.basics = GetConfig();
|
|
|
- Init();
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //库
|
|
|
+ List<string> libs = Directory.GetFiles(basics.LibFolder, "*.dll", SearchOption.AllDirectories).ToList();
|
|
|
+ //循环文件,添加程序集
|
|
|
+ foreach (var lib in libs)
|
|
|
+ {
|
|
|
+ SearchType(lib);
|
|
|
+ }
|
|
|
+ //获取配置
|
|
|
+ foreach (var type in TypeIoc)
|
|
|
+ {
|
|
|
+ TypeSearchConfig(type.Value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $"检索异常:{ex.Message}"));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
- /// 获取配置
|
|
|
+ /// 通过DLL检索程序集
|
|
|
/// </summary>
|
|
|
- /// <returns></returns>
|
|
|
- private RelayManageData.Basics? GetConfig()
|
|
|
+ /// <param name="lib">库文件</param>
|
|
|
+ private void SearchType(string lib)
|
|
|
{
|
|
|
- try
|
|
|
+ //加载程序集
|
|
|
+ Assembly assembly = Assembly.LoadFrom(lib);
|
|
|
+ //获取所有类
|
|
|
+ Type[] types = assembly.GetExportedTypes();
|
|
|
+ //过滤器
|
|
|
+ TypeFilter typeFilter = new TypeFilter(InterfaceFilter);
|
|
|
+ //集合
|
|
|
+ List<Type> typesArray = new List<Type>();
|
|
|
+ //检索类是否继承接口
|
|
|
+ foreach (Type type in types)
|
|
|
{
|
|
|
- if (File.Exists(ConfigFile)) //配置存在
|
|
|
+ if (type.FindInterfaces(typeFilter, basics.InterfaceFullName).Count() > 0)
|
|
|
{
|
|
|
- return JsonTool.StringToJsonEntity<RelayManageData.Basics>(FileTool.FileToString(ConfigFile));
|
|
|
+ typesArray.Add(type);
|
|
|
}
|
|
|
}
|
|
|
- catch (Exception ex)
|
|
|
+ //添加至集合
|
|
|
+ foreach (Type type in typesArray)
|
|
|
{
|
|
|
- Console.WriteLine($"获取配置异常:{ex.Message}");
|
|
|
+ TypeIoc.TryAdd(type.FullName, type);
|
|
|
+ //抛出信息
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, $"{type.FullName} 程序集添加成功"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 通过程序集检索配置
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="type">程序集</param>
|
|
|
+ private void TypeSearchConfig(Type type)
|
|
|
+ {
|
|
|
+ //组织配置文件
|
|
|
+ string configFile = string.Format(basics.ConfigFileNameFormat, type.FullName);
|
|
|
+ //目录信息
|
|
|
+ DirectoryInfo directoryInfo = new DirectoryInfo(basics.LibConfigFolder);
|
|
|
+ //文件检索得到文件信息
|
|
|
+ List<FileInfo> fieldInfos = directoryInfo.GetFiles(configFile, SearchOption.AllDirectories).ToList();
|
|
|
+ //如果文件信息为空则不创建实例
|
|
|
+ if (fieldInfos.Count > 0)
|
|
|
+ {
|
|
|
+ //循环检索配置文件信息
|
|
|
+ foreach (FileInfo fi in fieldInfos)
|
|
|
+ {
|
|
|
+ ConfigCreateInstance(type, FileTool.FileToString(fi.FullName));
|
|
|
+ }
|
|
|
}
|
|
|
- return null;
|
|
|
}
|
|
|
-
|
|
|
/// <summary>
|
|
|
- /// 初始化状态
|
|
|
+ /// 通过配置创建实例
|
|
|
/// </summary>
|
|
|
- private bool InitState = false;
|
|
|
+ /// <param name="type">程序集</param>
|
|
|
+ /// <param name="content">内容</param>
|
|
|
+ private void ConfigCreateInstance(Type type, string content)
|
|
|
+ {
|
|
|
+ //获取结构参数
|
|
|
+ JObject? jsonObject = Newtonsoft.Json.JsonConvert.DeserializeObject<JObject>(content);
|
|
|
+ //获取唯一标识符
|
|
|
+ string SN = $"{type.FullName}.{jsonObject[basics.LibConfigSNKey]}";
|
|
|
+ //获取实例
|
|
|
+ IRelay? instance = CreateInstance(type, new object[] { jsonObject }) as IRelay;
|
|
|
+ //实例不为空
|
|
|
+ if (instance != null)
|
|
|
+ {
|
|
|
+ //把这个实例添加到容器中
|
|
|
+ InstanceIoc.TryAdd(SN, instance);
|
|
|
+ //抛出信息
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, $"{SN} 实例创建成功"));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //抛出信息
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $"{SN} 实例创建失败"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 创建实例
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="NamespaceAndClassNameType">类型</param>
|
|
|
+ /// <param name="ConstructorParam">构造函数入参</param>
|
|
|
+ /// <returns></returns>
|
|
|
+ private object? CreateInstance(Type? NamespaceAndClassNameType, object[]? ConstructorParam)
|
|
|
+ {
|
|
|
+ //转换后的参数
|
|
|
+ object[]? Param = null;
|
|
|
+
|
|
|
+ if (ConstructorParam != null)
|
|
|
+ {
|
|
|
+ //获取构造函数信息
|
|
|
+ ConstructorInfo? constructorInfo = NamespaceAndClassNameType?.GetConstructors().FirstOrDefault();
|
|
|
+ //数据类型转换
|
|
|
+ Param = ParamTypeConvert(ConstructorParam, constructorInfo);
|
|
|
+ }
|
|
|
+ //创建实例
|
|
|
+ return Activator.CreateInstance(NamespaceAndClassNameType, Param);
|
|
|
+ }
|
|
|
|
|
|
/// <summary>
|
|
|
- /// 初始化
|
|
|
+ /// 参数类型转换
|
|
|
/// </summary>
|
|
|
- private void Init()
|
|
|
+ /// <returns></returns>
|
|
|
+ private object[]? ParamTypeConvert(object[] Data, object Info)
|
|
|
{
|
|
|
- if (basics != null)
|
|
|
+ if (Data != null)
|
|
|
{
|
|
|
- if (basics.RabbitMQDataArray != null)
|
|
|
+ ParameterInfo[]? ParamArray = null;
|
|
|
+ List<object> param = new List<object>();
|
|
|
+ if (Info.GetType().ToString().Contains("MethodInfo"))
|
|
|
{
|
|
|
- foreach (var configData in basics.RabbitMQDataArray)
|
|
|
+ ParamArray = (Info as MethodInfo).GetParameters();
|
|
|
+ if (ParamArray != null)
|
|
|
{
|
|
|
- RabbitMQOperate operate = RabbitMQOperate.Instance(configData);
|
|
|
- operateResult = operate.On();
|
|
|
- if (operateResult.State)
|
|
|
+ for (int i = 0; i < ParamArray.Count(); i++)
|
|
|
{
|
|
|
- if (RabbitMQArray == null)
|
|
|
+ if (string.IsNullOrEmpty(ParamArray[i].ParameterType.FullName))
|
|
|
{
|
|
|
- RabbitMQArray = new ConcurrentDictionary<string, RabbitMQOperate>();
|
|
|
+ param.Add(Data[i]);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ param.Add(Convert.ChangeType(Data[i], ParamArray[i].ParameterType));
|
|
|
}
|
|
|
- RabbitMQArray.TryAdd(configData.SN, operate);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- //通知到外部,让外部来觉得是否重新初始化
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(operateResult.State, "请查看RData", operateResult, ResultType.OperateResult));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- if (basics.MqttClientDataArray != null)
|
|
|
+ if (Info.GetType().ToString().Contains("ConstructorInfo"))
|
|
|
{
|
|
|
- foreach (var configData in basics.MqttClientDataArray)
|
|
|
+ ParamArray = (Info as ConstructorInfo).GetParameters();
|
|
|
+ if (ParamArray != null)
|
|
|
{
|
|
|
- MqttClientOperate operate = MqttClientOperate.Instance(configData);
|
|
|
- operateResult = operate.On();
|
|
|
- if (operateResult.State)
|
|
|
+ for (int i = 0; i < ParamArray.Count(); i++)
|
|
|
{
|
|
|
- if (MqttClientArray == null)
|
|
|
+ object? model = Activator.CreateInstance(ParamArray[i].ParameterType);
|
|
|
+ PropertyInfo[] properties = ParamArray[i].ParameterType.GetProperties();
|
|
|
+ foreach (var propertie in properties)
|
|
|
{
|
|
|
- MqttClientArray = new ConcurrentDictionary<string, MqttClientOperate>();
|
|
|
+ JObject? JsonObject = Newtonsoft.Json.JsonConvert.DeserializeObject<JObject>(Data[i].ToJson());
|
|
|
+ propertie.SetValue(model, Convert.ChangeType(JsonObject[propertie.Name], propertie.PropertyType));
|
|
|
+ }
|
|
|
+ if (model != null)
|
|
|
+ {
|
|
|
+ param.Add(model);
|
|
|
}
|
|
|
- MqttClientArray.TryAdd(configData.SN, operate);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- //通知到外部,让外部来觉得是否重新初始化
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(operateResult.State, "请查看RData", operateResult, ResultType.OperateResult));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- if (basics.KafkaProducerDataArray != null)
|
|
|
+ if (param.Count > 0)
|
|
|
{
|
|
|
- foreach (var configData in basics.KafkaProducerDataArray)
|
|
|
- {
|
|
|
- KafkaProducerOperate operate = KafkaProducerOperate.Instance(configData);
|
|
|
- if (KafkaProducerArray == null)
|
|
|
- {
|
|
|
- KafkaProducerArray = new ConcurrentDictionary<string, KafkaProducerOperate>();
|
|
|
- }
|
|
|
- KafkaProducerArray.TryAdd(configData.SN, operate);
|
|
|
- }
|
|
|
+ return param.ToArray();
|
|
|
}
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
|
|
|
- InitState = true;
|
|
|
+ /// <summary>
|
|
|
+ /// 接口过滤器
|
|
|
+ /// </summary>
|
|
|
+ private bool InterfaceFilter(Type typeObj, Object criteriaObj)
|
|
|
+ {
|
|
|
+ if (typeObj.ToString() == criteriaObj.ToString())
|
|
|
+ return true;
|
|
|
+ else
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 这是释放所有包含自身对象
|
|
|
+ /// </summary>
|
|
|
+ public void Dispose()
|
|
|
+ {
|
|
|
+ //容器实例释放
|
|
|
+ foreach (var item in InstanceIoc) { item.Value.Dispose(); }
|
|
|
+ if (TaskArray != null) { foreach (var item in TaskArray) { item.Value.Dispose(); } TaskArray = null; }
|
|
|
+ DataQueue.Clear();
|
|
|
+ DataQueue = null;
|
|
|
+ //清空
|
|
|
+ InstanceIoc.Clear();
|
|
|
+ InstanceIoc = null;
|
|
|
+ TypeIoc.Clear();
|
|
|
+ TypeIoc = null;
|
|
|
|
|
|
+ GC.Collect();
|
|
|
+ GC.SuppressFinalize(this);
|
|
|
+ ThisObjList = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 释放
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="SN">实例的唯一标识符</param>
|
|
|
+ public OperateResult Dispose(string SN)
|
|
|
+ {
|
|
|
+ Depart("Dispose");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (InstanceIoc.ContainsKey(SN))
|
|
|
+ {
|
|
|
+ InstanceIoc[SN].Dispose();
|
|
|
+ return Break("Dispose", true);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break("Dispose", false, $"未找到 {SN} 的实例");
|
|
|
+ }
|
|
|
}
|
|
|
- else
|
|
|
+ catch (Exception ex)
|
|
|
{
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(false, "请查看RData", Break(Depart("Init"), false, "配置文件不存在"), ResultType.OperateResult));
|
|
|
+ return Break("Dispose", false, ex.Message);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- public OperateResult Produce(string Topic, string Content)
|
|
|
+ /// <summary>
|
|
|
+ /// 释放
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="SN">实例的唯一标识符</param>
|
|
|
+ /// <returns>统一出参</returns>
|
|
|
+ public Task<OperateResult> DisposeAsync(string SN)
|
|
|
{
|
|
|
- Depart("Produce");
|
|
|
+ return Task.Run(() => Dispose(SN));
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 移除实例
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="SN">实例的唯一标识符</param>
|
|
|
+ /// <returns>统一出参</returns>
|
|
|
+ public OperateResult Remove(string SN)
|
|
|
+ {
|
|
|
+ Depart("Remove");
|
|
|
try
|
|
|
{
|
|
|
- //主题
|
|
|
- string topic = Topic;
|
|
|
- try
|
|
|
+ if (InstanceIoc.ContainsKey(SN))
|
|
|
{
|
|
|
- //如果包类型可以被转换,直接走对应配置,如果不能,则走传入的主题
|
|
|
- PacketType? packetType = (PacketType)Enum.Parse(typeof(PacketType), Topic);
|
|
|
- if (packetType != null)
|
|
|
+ if (InstanceIoc.Remove(SN, out _))
|
|
|
{
|
|
|
- switch (packetType)
|
|
|
- {
|
|
|
- case PacketType.Message:
|
|
|
- topic = basics.MessageTopic;
|
|
|
- break;
|
|
|
- case PacketType.Status:
|
|
|
- topic = basics.StatusTopic;
|
|
|
- break;
|
|
|
- }
|
|
|
+ return Break("Remove", true);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break("Remove", false, $"{SN} 的实例移除失败");
|
|
|
}
|
|
|
}
|
|
|
- catch { }
|
|
|
- if (!InitState)
|
|
|
+ else
|
|
|
{
|
|
|
- return Break("Produce", false, "尚未初始化");
|
|
|
+ return Break("Remove", false, $"未找到 {SN} 的实例");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("Remove", false, ex.Message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 移除实例
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="SN">实例的唯一标识符</param>
|
|
|
+ /// <returns>统一出参</returns>
|
|
|
+ public Task<OperateResult> RemoveAsync(string SN)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Remove(SN));
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 打开
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="SN">实例的唯一标识符</param>
|
|
|
+ /// <returns>统一出参</returns>
|
|
|
+ public Task<OperateResult> OnAsync(string SN)
|
|
|
+ {
|
|
|
+ return Task.Run(() => On(SN));
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 打开
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="SN">实例的唯一标识符</param>
|
|
|
+ /// <returns>统一出参</returns>
|
|
|
+ public OperateResult On(string SN)
|
|
|
+ {
|
|
|
+ Depart("On");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (InstanceIoc.ContainsKey(SN))
|
|
|
+ {
|
|
|
+ return InstanceIoc[SN].On();
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- //当队列为空,初始化队列
|
|
|
- if (DataQueue == null)
|
|
|
- {
|
|
|
- DataQueue = new ConcurrentQueue<QueueData>();
|
|
|
- TaskArray = new ConcurrentDictionary<Guid, Task>();
|
|
|
- //启动
|
|
|
- for (int i = 0; i < basics.TaskHandleCount; i++)
|
|
|
- {
|
|
|
- TaskArray.TryAdd(Guid.NewGuid(), TaskHandle());
|
|
|
- }
|
|
|
- }
|
|
|
- //入列
|
|
|
- DataQueue.Enqueue(new QueueData() { Topic = Topic, Content = Content });
|
|
|
+ return Break("On", false, $"未找到 {SN} 的实例");
|
|
|
}
|
|
|
- return Break("Produce", true);
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
- return Break("Produce", false, ex.Message);
|
|
|
+ return Break("On", false, ex.Message);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- public Task<OperateResult> ProduceAsync(string Topic, string Content)
|
|
|
+ /// <summary>
|
|
|
+ /// 关闭
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="SN">实例的唯一标识符</param>
|
|
|
+ /// <returns>统一出参</returns>
|
|
|
+ public Task<OperateResult> OffAsync(string SN)
|
|
|
{
|
|
|
- return Task.Run(() => Produce(Topic, Content));
|
|
|
+ return Task.Run(() => Off(SN));
|
|
|
}
|
|
|
-
|
|
|
- public OperateResult Consume(string Topic)
|
|
|
+ /// <summary>
|
|
|
+ /// 关闭
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="SN">实例的唯一标识符</param>
|
|
|
+ /// <returns>统一出参</returns>
|
|
|
+ public OperateResult Off(string SN)
|
|
|
{
|
|
|
- return Break(Depart("Consume"), false, "目前暂不支持消费");
|
|
|
+ Depart("Off");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (InstanceIoc.ContainsKey(SN))
|
|
|
+ {
|
|
|
+ return InstanceIoc[SN].Off();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break("Off", false, $"未找到 {SN} 的实例");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("Off", false, ex.Message);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public Task<OperateResult> ConsumeAsync(string Topic)
|
|
|
+ /// <summary>
|
|
|
+ /// 任务处理
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ private Task TaskHandle()
|
|
|
{
|
|
|
- return Task.Run(() => Consume(Topic));
|
|
|
+ string logName = "TaskHandle.log";
|
|
|
+ //起个新线程处理
|
|
|
+ return Task.Factory.StartNew(() =>
|
|
|
+ {
|
|
|
+ //循环
|
|
|
+ while (TaskArray != null)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //队列数据
|
|
|
+ QueueData? queueData;
|
|
|
+ //出列
|
|
|
+ while (DataQueue.TryDequeue(out queueData))
|
|
|
+ {
|
|
|
+ if (queueData != null)
|
|
|
+ {
|
|
|
+ foreach (var item in InstanceIoc)
|
|
|
+ {
|
|
|
+ OperateResult operateResult = item.Value.Produce(queueData.Topic, queueData.Content);
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(operateResult.State, operateResult.Message, operateResult.RData, operateResult.RType));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ LogHelper.Error($"任务处理异常:{ex.Message}", logName);
|
|
|
+ }
|
|
|
+ Thread.Sleep(basics.TaskHandleAccomplishSleepTime);
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- public void Dispose()
|
|
|
+ /// <summary>
|
|
|
+ /// 生产
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="Topic">主题</param>
|
|
|
+ /// <param name="Content">内容</param>
|
|
|
+ /// <returns>统一出参</returns>
|
|
|
+ public OperateResult Produce(string Topic, string Content)
|
|
|
{
|
|
|
- if (KafkaProducerArray != null) { foreach (var item in KafkaProducerArray) { item.Value.Dispose(); } KafkaProducerArray = null; }
|
|
|
- if (MqttClientArray != null) { foreach (var item in MqttClientArray) { item.Value.Dispose(); } MqttClientArray = null; }
|
|
|
- if (RabbitMQArray != null) { foreach (var item in RabbitMQArray) { item.Value.Dispose(); } RabbitMQArray = null; }
|
|
|
- if (TaskArray != null) { foreach (var item in TaskArray) { item.Value.Dispose(); } TaskArray = null; }
|
|
|
- DataQueue.Clear();
|
|
|
- ThisObjList = null;
|
|
|
- GC.Collect();
|
|
|
- GC.SuppressFinalize(this);
|
|
|
+ Depart("Produce");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //当队列为空,初始化队列
|
|
|
+ if (DataQueue == null)
|
|
|
+ {
|
|
|
+ DataQueue = new ConcurrentQueue<QueueData>();
|
|
|
+ }
|
|
|
+ if (TaskArray == null)
|
|
|
+ {
|
|
|
+ TaskArray = new ConcurrentDictionary<Guid, Task>();
|
|
|
+ //创建任务
|
|
|
+ for (int i = 0; i < basics.TaskHandleCount; i++)
|
|
|
+ {
|
|
|
+ TaskArray.TryAdd(Guid.NewGuid(), TaskHandle());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //入列
|
|
|
+ DataQueue.Enqueue(new QueueData() { Topic = Topic, Content = Content });
|
|
|
+ return Break("Produce", true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("Produce", false, ex.Message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 生产
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="Topic">主题</param>
|
|
|
+ /// <param name="Content">内容</param>
|
|
|
+ /// <returns>统一出参</returns>
|
|
|
+ public Task<OperateResult> ProduceAsync(string Topic, string Content)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Produce(Topic, Content));
|
|
|
}
|
|
|
}
|
|
|
}
|