|
|
@@ -1,5 +1,7 @@
|
|
|
using System.Collections.Concurrent;
|
|
|
+using System.Runtime.CompilerServices;
|
|
|
using YSAI.Core.data;
|
|
|
+using YSAI.Core.@enum;
|
|
|
using YSAI.Core.@interface;
|
|
|
using YSAI.Unility;
|
|
|
|
|
|
@@ -73,120 +75,6 @@ namespace YSAI.Core.subscription
|
|
|
/// </summary>
|
|
|
CancellationTokenSource MonitorSwitch;
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// 轮询任务处理
|
|
|
- /// </summary>
|
|
|
- Task PollTaskHandle;
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// 别的操作进来是否继续轮询
|
|
|
- /// </summary>
|
|
|
- bool GoOn = true;
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// 上一次的数据
|
|
|
- /// </summary>
|
|
|
- ConcurrentDictionary<string, AddressValue> UpParam;
|
|
|
- /// <summary>
|
|
|
- /// 上一次的数据
|
|
|
- /// </summary>
|
|
|
- List<ConcurrentDictionary<string, AddressValue>> UpParamArray;
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// 现场安全字典比对,是否一致
|
|
|
- /// </summary>
|
|
|
- /// <param name="Param1"></param>
|
|
|
- /// <param name="Param2"></param>
|
|
|
- /// <returns></returns>
|
|
|
- bool ConcurrentDictionaryEquals(ConcurrentDictionary<string, AddressValue> Param1, ConcurrentDictionary<string, AddressValue> Param2)
|
|
|
- {
|
|
|
- bool equal = false;
|
|
|
- if (Param1.Count == Param2.Count)
|
|
|
- {
|
|
|
- equal = true;
|
|
|
- foreach (var pair in Param1)
|
|
|
- {
|
|
|
- AddressValue value;
|
|
|
- if (Param2.TryGetValue(pair.Key, out value))
|
|
|
- {
|
|
|
- if (!value.Equals(pair.Value))
|
|
|
- {
|
|
|
- equal = false;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- equal = false;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return equal;
|
|
|
- }
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// 现场安全字典比对,是否一致
|
|
|
- /// </summary>
|
|
|
- /// <param name="Param1"></param>
|
|
|
- /// <param name="Param2"></param>
|
|
|
- /// <returns></returns>
|
|
|
- bool ConcurrentDictionaryEquals(List<ConcurrentDictionary<string, AddressValue>> Param1, List<ConcurrentDictionary<string, AddressValue>> Param2)
|
|
|
- {
|
|
|
- foreach (ConcurrentDictionary<string, AddressValue> a in Param2)
|
|
|
- {
|
|
|
- if (!Param1.Exists(b => ConcurrentDictionaryEquals(a, b)))
|
|
|
- {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// 执行轮询
|
|
|
- /// </summary>
|
|
|
- Task Polling(CancellationTokenSource token)
|
|
|
- {
|
|
|
- //起个新线程处理
|
|
|
- return Task.Factory.StartNew(() =>
|
|
|
- {
|
|
|
- while (!token.IsCancellationRequested)
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- if (!GoOn) continue;
|
|
|
- lock (basics.Address) //锁住不让其他操作
|
|
|
- {
|
|
|
- //数据必须大于0才进行
|
|
|
- if (basics.Address.AddressArray.Count > 0)
|
|
|
- {
|
|
|
- //执行委托
|
|
|
- OperateResult value = basics.Function?.Invoke(basics.Address);
|
|
|
- //判断一下
|
|
|
- if (!token.IsCancellationRequested)
|
|
|
- {
|
|
|
- //数据入列
|
|
|
- DataQueue.Enqueue(new QueueData() { value = value });
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- catch (Exception ex)
|
|
|
- {
|
|
|
- OnEventHandler(this, new EventResult(false, $"自定义订阅轮询异常:{ex.Message}"));
|
|
|
- }
|
|
|
- Thread.Sleep(basics.HandleInterval);
|
|
|
- }
|
|
|
- }, token.Token);
|
|
|
- }
|
|
|
/// <summary>
|
|
|
/// 释放
|
|
|
/// </summary>
|
|
|
@@ -284,7 +172,7 @@ namespace YSAI.Core.subscription
|
|
|
//当队列为空,初始化队列
|
|
|
if (DataQueue == null)
|
|
|
{
|
|
|
- DataQueue = new ConcurrentQueue<QueueData>();
|
|
|
+ DataQueue = new ConcurrentQueue<AddressValue>();
|
|
|
}
|
|
|
|
|
|
//任务为空创建任务
|
|
|
@@ -368,17 +256,6 @@ namespace YSAI.Core.subscription
|
|
|
}
|
|
|
|
|
|
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// 队列数据
|
|
|
- /// </summary>
|
|
|
- private class QueueData
|
|
|
- {
|
|
|
- /// <summary>
|
|
|
- /// 操作结果
|
|
|
- /// </summary>
|
|
|
- public OperateResult value { get; set; }
|
|
|
- }
|
|
|
/// <summary>
|
|
|
/// 任务集合
|
|
|
/// </summary>
|
|
|
@@ -386,101 +263,170 @@ namespace YSAI.Core.subscription
|
|
|
/// <summary>
|
|
|
/// 数据队列
|
|
|
/// </summary>
|
|
|
- private ConcurrentQueue<QueueData> DataQueue;
|
|
|
+ private ConcurrentQueue<AddressValue> DataQueue;
|
|
|
+ /// <summary>
|
|
|
+ /// 轮询任务处理
|
|
|
+ /// </summary>
|
|
|
+ Task PollTaskHandle;
|
|
|
|
|
|
/// <summary>
|
|
|
- /// 任务处理
|
|
|
+ /// 别的操作进来是否继续轮询
|
|
|
/// </summary>
|
|
|
- /// <param name="token">任务令牌</param>
|
|
|
- /// <returns>任务</returns>
|
|
|
- private Task TaskHandle(CancellationTokenSource token)
|
|
|
+ bool GoOn = true;
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 数据缓存池
|
|
|
+ /// </summary>
|
|
|
+ ConcurrentDictionary<string, AddressValue> DataCachePool = new ConcurrentDictionary<string, AddressValue>();
|
|
|
+ /// <summary>
|
|
|
+ /// 多组数据缓存池
|
|
|
+ /// </summary>
|
|
|
+ List<ConcurrentDictionary<string, AddressValue>> DataCachePoolArray = new List<ConcurrentDictionary<string, AddressValue>>();
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 执行轮询
|
|
|
+ /// </summary>
|
|
|
+ Task Polling(CancellationTokenSource token)
|
|
|
{
|
|
|
//起个新线程处理
|
|
|
return Task.Factory.StartNew(() =>
|
|
|
{
|
|
|
while (!token.IsCancellationRequested)
|
|
|
{
|
|
|
- //队列数据
|
|
|
- QueueData? queueData;
|
|
|
- while (DataQueue.TryDequeue(out queueData))
|
|
|
+ try
|
|
|
{
|
|
|
- if (queueData != null && !token.IsCancellationRequested)
|
|
|
+ if (!GoOn) continue;
|
|
|
+ lock (basics.Address) //锁住不让其他操作
|
|
|
{
|
|
|
- //状态判断
|
|
|
- if (queueData.value.State)
|
|
|
+ //数据必须大于0才进行
|
|
|
+ if (basics.Address.AddressArray.Count > 0)
|
|
|
{
|
|
|
- //判断数据类型
|
|
|
- switch (queueData.value.RType)
|
|
|
+ //执行委托
|
|
|
+ OperateResult value = basics.Function?.Invoke(basics.Address);
|
|
|
+ if (value.State)
|
|
|
{
|
|
|
- case @enum.ResultType.KeyValue:
|
|
|
- ConcurrentDictionary<string, AddressValue>? RData = queueData.value.RData as ConcurrentDictionary<string, AddressValue>;
|
|
|
- //做流程处理
|
|
|
- if (basics.ChangeOut)
|
|
|
+ //判断一下
|
|
|
+ if (!token.IsCancellationRequested)
|
|
|
+ {
|
|
|
+ switch (value.RType)
|
|
|
{
|
|
|
- //实例化对象
|
|
|
- if (UpParam == null)
|
|
|
- {
|
|
|
- UpParam = new ConcurrentDictionary<string, AddressValue>();
|
|
|
- }
|
|
|
- //空数据不做处理
|
|
|
- if (RData != null && RData.Count > 0)
|
|
|
- {
|
|
|
- //获取到第一次数据,准备节点检测数据是否变化
|
|
|
- if (!ConcurrentDictionaryEquals(UpParam, RData))
|
|
|
+ case ResultType.KeyValue:
|
|
|
+
|
|
|
+ ConcurrentDictionary<string, AddressValue> resultData = value.RData as ConcurrentDictionary<string, AddressValue>;
|
|
|
+ //空数据不做处理
|
|
|
+ if (resultData != null && resultData.Count > 0)
|
|
|
{
|
|
|
- //当节点数据变化,有一项数据未变化,也把这未变项与变化项一同抛出,在特殊用途中,确保此批点位数据都存在
|
|
|
- if (basics.AllOut)
|
|
|
+ if (basics.ChangeOut)
|
|
|
{
|
|
|
- //抛出差异数据
|
|
|
- OnEventHandler(this, new EventResult(true, "存在变化数据", RData, queueData.value.RType));
|
|
|
+ if (!ConcurrentDictionaryEquals(DataCachePool, resultData))
|
|
|
+ {
|
|
|
+ if (basics.AllOut)
|
|
|
+ {
|
|
|
+ //抛出差异数据
|
|
|
+ OnEventHandler(this, new EventResult(true, "存在变化数据", resultData, ResultType.KeyValue));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //数据入列
|
|
|
+ foreach (var item in resultData)
|
|
|
+ {
|
|
|
+ DataQueue.Enqueue(item.Value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ DataCachePool = resultData;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- //如果不一致,就找寻里面不一样的 键、值
|
|
|
- ConcurrentDictionary<string, AddressValue> data = new ConcurrentDictionary<string, AddressValue>(RData.Except(UpParam).ToDictionary(x => x.Key, x => x.Value));
|
|
|
//抛出差异数据
|
|
|
- OnEventHandler(this, new EventResult(true, "变化数据", data, queueData.value.RType));
|
|
|
+ OnEventHandler(this, new EventResult(true, "实时数据", resultData, ResultType.KeyValue));
|
|
|
}
|
|
|
- //把这次新数据 赋值到历史数据中
|
|
|
- UpParam = RData;
|
|
|
}
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- OnEventHandler(this, new EventResult(true, "实时数据", queueData.value.RData, queueData.value.RType));
|
|
|
- }
|
|
|
- break;
|
|
|
- case @enum.ResultType.KeyValueArray:
|
|
|
- List<ConcurrentDictionary<string, AddressValue>>? RDataArray = queueData.value.RData as List<ConcurrentDictionary<string, AddressValue>>;
|
|
|
- //实例化对象
|
|
|
- if (UpParamArray == null)
|
|
|
- {
|
|
|
- UpParamArray = new List<ConcurrentDictionary<string, AddressValue>>();
|
|
|
- }
|
|
|
- //空数据不做处理
|
|
|
- if (RDataArray != null && RDataArray.Count > 0)
|
|
|
- {
|
|
|
- //获取到第一次数据,准备节点检测数据是否变化
|
|
|
- if (!ConcurrentDictionaryEquals(UpParamArray, RDataArray))
|
|
|
- {
|
|
|
- foreach (var item in RDataArray)
|
|
|
+ break;
|
|
|
+ case ResultType.KeyValueArray:
|
|
|
+ List<ConcurrentDictionary<string, AddressValue>>? resultDataArray = value.RData as List<ConcurrentDictionary<string, AddressValue>>;
|
|
|
+ //空数据不做处理
|
|
|
+ if (resultDataArray != null && resultDataArray.Count > 0)
|
|
|
{
|
|
|
- //抛出差异数据
|
|
|
- OnEventHandler(this, new EventResult(true, "存在变化数据", RDataArray, queueData.value.RType));
|
|
|
+ if (basics.ChangeOut)
|
|
|
+ {
|
|
|
+ if (!ConcurrentDictionaryEquals(DataCachePoolArray, resultDataArray))
|
|
|
+ {
|
|
|
+ //抛出差异数据
|
|
|
+ OnEventHandler(this, new EventResult(true, "存在变化数据", resultDataArray, ResultType.KeyValueArray));
|
|
|
+ }
|
|
|
+ DataCachePoolArray = resultDataArray;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //抛出差异数据
|
|
|
+ OnEventHandler(this, new EventResult(true, "实时数据", resultDataArray, ResultType.KeyValueArray));
|
|
|
+ }
|
|
|
}
|
|
|
- //把这次新数据 赋值到历史数据中
|
|
|
- UpParamArray = RDataArray;
|
|
|
- }
|
|
|
+ break;
|
|
|
}
|
|
|
- break;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //当状态为false 说明读取失败,写入日志
|
|
|
+ OnEventHandler(this, new EventResult(false, $"自定义订阅轮询异常:{value.Message}"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ OnEventHandler(this, new EventResult(false, $"自定义订阅轮询异常:{ex.Message}"));
|
|
|
+ }
|
|
|
+ Thread.Sleep(basics.HandleInterval);
|
|
|
+ }
|
|
|
+ }, token.Token);
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 数据缓存
|
|
|
+ /// </summary>
|
|
|
+ ConcurrentDictionary<string, AddressValue> DataCache = new ConcurrentDictionary<string, AddressValue>();
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 任务处理
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="token">任务令牌</param>
|
|
|
+ /// <returns>任务</returns>
|
|
|
+ private Task TaskHandle(CancellationTokenSource token)
|
|
|
+ {
|
|
|
+ //起个新线程处理
|
|
|
+ return Task.Factory.StartNew(() =>
|
|
|
+ {
|
|
|
+ while (!token.IsCancellationRequested)
|
|
|
+ {
|
|
|
+ //队列数据
|
|
|
+ AddressValue? queueData;
|
|
|
+ while (DataQueue.TryDequeue(out queueData))
|
|
|
+ {
|
|
|
+ if (queueData != null && !token.IsCancellationRequested)
|
|
|
+ {
|
|
|
+ if (DataCache.ContainsKey(queueData.AddressName))
|
|
|
+ {
|
|
|
+ if (!DataCache[queueData.AddressName].Equals(queueData))
|
|
|
+ {
|
|
|
+ //抛出差异数据
|
|
|
+ OnEventHandler(this, new EventResult(true, "变化数据", new ConcurrentDictionary<string, AddressValue> { [queueData.AddressName] = queueData }, RType: ResultType.KeyValue));
|
|
|
}
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- //当状态为false 说明读取失败,写入日志
|
|
|
- OnEventHandler(this, new EventResult(false, $"自定义订阅轮询异常:{queueData.value.Message}"));
|
|
|
+ //抛出差异数据
|
|
|
+ OnEventHandler(this, new EventResult(true, "变化数据", new ConcurrentDictionary<string, AddressValue> { [queueData.AddressName] = queueData }, RType: ResultType.KeyValue));
|
|
|
}
|
|
|
+ DataCachePool.AddOrUpdate(queueData.AddressName, queueData, (k, v) => queueData);
|
|
|
}
|
|
|
}
|
|
|
//队列里面的数据处理完休息一下
|
|
|
@@ -488,5 +434,58 @@ namespace YSAI.Core.subscription
|
|
|
}
|
|
|
}, token.Token);
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 线程安全字典比对,是否一致
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="Param1"></param>
|
|
|
+ /// <param name="Param2"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ bool ConcurrentDictionaryEquals(ConcurrentDictionary<string, AddressValue> Param1, ConcurrentDictionary<string, AddressValue> Param2)
|
|
|
+ {
|
|
|
+ bool equal = false;
|
|
|
+ if (Param1.Count == Param2.Count)
|
|
|
+ {
|
|
|
+ equal = true;
|
|
|
+ foreach (var pair in Param1)
|
|
|
+ {
|
|
|
+ AddressValue value;
|
|
|
+ if (Param2.TryGetValue(pair.Key, out value))
|
|
|
+ {
|
|
|
+ if (!value.Equals(pair.Value))
|
|
|
+ {
|
|
|
+ equal = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ equal = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return equal;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 线程安全字典比对,是否一致
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="Param1"></param>
|
|
|
+ /// <param name="Param2"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ bool ConcurrentDictionaryEquals(List<ConcurrentDictionary<string, AddressValue>> Param1, List<ConcurrentDictionary<string, AddressValue>> Param2)
|
|
|
+ {
|
|
|
+ foreach (ConcurrentDictionary<string, AddressValue> a in Param2)
|
|
|
+ {
|
|
|
+ if (!Param1.Exists(b => ConcurrentDictionaryEquals(a, b)))
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
}
|