|
|
@@ -68,9 +68,9 @@ namespace YSAI.Core.subscription
|
|
|
SubscribeData.Basics basics;
|
|
|
|
|
|
/// <summary>
|
|
|
- /// 线程
|
|
|
+ /// 轮询任务处理
|
|
|
/// </summary>
|
|
|
- Thread thread;
|
|
|
+ Task PollTaskHandle;
|
|
|
|
|
|
/// <summary>
|
|
|
/// 别的操作进来是否继续轮询
|
|
|
@@ -144,106 +144,37 @@ namespace YSAI.Core.subscription
|
|
|
/// <summary>
|
|
|
/// 执行轮询
|
|
|
/// </summary>
|
|
|
- void Polling()
|
|
|
+ Task Polling()
|
|
|
{
|
|
|
- while (!MonitorSwitch.IsCancellationRequested)
|
|
|
+ //起个新线程处理
|
|
|
+ return Task.Factory.StartNew(() =>
|
|
|
{
|
|
|
- try
|
|
|
+ while (!MonitorSwitch.IsCancellationRequested)
|
|
|
{
|
|
|
- if (!GoOn) continue;
|
|
|
- lock (basics.Address) //锁住不让其他操作
|
|
|
+ try
|
|
|
{
|
|
|
- //数据必须大于0才进行
|
|
|
- if (basics.Address.AddressArray.Count > 0)
|
|
|
+ if (!GoOn) continue;
|
|
|
+ lock (basics.Address) //锁住不让其他操作
|
|
|
{
|
|
|
- //执行委托
|
|
|
- OperateResult value = basics.Function?.Invoke(basics.Address);
|
|
|
- //状态判断
|
|
|
- if (value.State)
|
|
|
+ //数据必须大于0才进行
|
|
|
+ if (basics.Address.AddressArray.Count > 0)
|
|
|
{
|
|
|
- //判断数据类型
|
|
|
- switch (value.RType)
|
|
|
- {
|
|
|
- case @enum.ResultType.KeyValue:
|
|
|
- ConcurrentDictionary<string, AddressValue>? RData = value.RData as ConcurrentDictionary<string, AddressValue>;
|
|
|
- //做流程处理
|
|
|
- if (basics.DataChangeOut)
|
|
|
- {
|
|
|
- //实例化对象
|
|
|
- if (UpParam == null)
|
|
|
- {
|
|
|
- UpParam = new ConcurrentDictionary<string, AddressValue>();
|
|
|
- }
|
|
|
- //空数据不做处理
|
|
|
- if (RData != null && RData.Count > 0)
|
|
|
- {
|
|
|
- //获取到第一次数据,准备节点检测数据是否变化
|
|
|
- if (!ConcurrentDictionaryEquals(UpParam, RData))
|
|
|
- {
|
|
|
- //当节点数据变化,有一项数据未变化,也把这未变项与变化项一同抛出,在特殊用途中,确保此批点位数据都存在
|
|
|
- if (basics.SameDataOut)
|
|
|
- {
|
|
|
- //抛出差异数据
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(true, "存在变化数据", RData, value.RType));
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- //如果不一致,就找寻里面不一样的 键、值
|
|
|
- ConcurrentDictionary<string, AddressValue> data = new ConcurrentDictionary<string, AddressValue>(RData.Except(UpParam).ToDictionary(x => x.Key, x => x.Value));
|
|
|
- //抛出差异数据
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(true, "变化数据", data, value.RType));
|
|
|
- }
|
|
|
- //把这次新数据 赋值到历史数据中
|
|
|
- UpParam = RData;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(true, "实时数据", value.RData, value.RType));
|
|
|
- }
|
|
|
- break;
|
|
|
- case @enum.ResultType.KeyValueArray:
|
|
|
- List<ConcurrentDictionary<string, AddressValue>>? RDataArray = value.RData as List<ConcurrentDictionary<string, AddressValue>>;
|
|
|
- //实例化对象
|
|
|
- if (UpParamArray == null)
|
|
|
- {
|
|
|
- UpParamArray = new List<ConcurrentDictionary<string, AddressValue>>();
|
|
|
- }
|
|
|
- //空数据不做处理
|
|
|
- if (RDataArray != null && RDataArray.Count > 0)
|
|
|
- {
|
|
|
- //获取到第一次数据,准备节点检测数据是否变化
|
|
|
- if (!ConcurrentDictionaryEquals(UpParamArray, RDataArray))
|
|
|
- {
|
|
|
- foreach (var item in RDataArray)
|
|
|
- {
|
|
|
- //抛出差异数据
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(true, "存在变化数据", RDataArray, value.RType));
|
|
|
- }
|
|
|
- //把这次新数据 赋值到历史数据中
|
|
|
- UpParamArray = RDataArray;
|
|
|
- }
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- //当状态为false 说明读取失败,写入日志
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(false, $"自定义订阅轮询异常:{value.Message}"));
|
|
|
+ //执行委托
|
|
|
+ OperateResult value = basics.Function?.Invoke(basics.Address);
|
|
|
+
|
|
|
+ //数据入列
|
|
|
+ DataQueue.Enqueue(new QueueData() { value = value});
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $"自定义订阅轮询异常:{ex.Message}"));
|
|
|
+ }
|
|
|
+ Thread.Sleep(basics.SleepTime);
|
|
|
}
|
|
|
- catch (Exception ex)
|
|
|
- {
|
|
|
- OnEventHandler?.Invoke(this, new EventResult(false, $"自定义订阅轮询异常:{ex.Message}"));
|
|
|
- }
|
|
|
- Thread.Sleep(basics.SleepTime);
|
|
|
- }
|
|
|
+ });
|
|
|
}
|
|
|
-
|
|
|
/// <summary>
|
|
|
/// 释放
|
|
|
/// </summary>
|
|
|
@@ -331,12 +262,29 @@ namespace YSAI.Core.subscription
|
|
|
Depart("On");
|
|
|
try
|
|
|
{
|
|
|
- if (thread == null)
|
|
|
+ if (PollTaskHandle == null)
|
|
|
{
|
|
|
MonitorSwitch = new CancellationTokenSource();
|
|
|
- thread = new Thread(Polling);
|
|
|
- thread.IsBackground = true;
|
|
|
- thread.Start();
|
|
|
+
|
|
|
+ //轮询执行动作任务
|
|
|
+ PollTaskHandle = Polling();
|
|
|
+
|
|
|
+ //当队列为空,初始化队列
|
|
|
+ if (DataQueue == null)
|
|
|
+ {
|
|
|
+ DataQueue = new ConcurrentQueue<QueueData>();
|
|
|
+ }
|
|
|
+
|
|
|
+ //任务为空创建任务
|
|
|
+ if (TaskArray == null)
|
|
|
+ {
|
|
|
+ TaskArray = new ConcurrentDictionary<Guid, Task>();
|
|
|
+ //创建任务
|
|
|
+ for (int i = 0; i < basics.TaskHandleCount; i++)
|
|
|
+ {
|
|
|
+ TaskArray.TryAdd(Guid.NewGuid(), TaskHandle());
|
|
|
+ }
|
|
|
+ }
|
|
|
return Break("On", true);
|
|
|
}
|
|
|
else
|
|
|
@@ -369,7 +317,25 @@ namespace YSAI.Core.subscription
|
|
|
//立马停止轮询
|
|
|
GoOn = false;
|
|
|
//线程结束
|
|
|
- thread?.Interrupt();
|
|
|
+ PollTaskHandle.Dispose();
|
|
|
+
|
|
|
+ //任务清空
|
|
|
+ if (TaskArray != null)
|
|
|
+ {
|
|
|
+ foreach (var item in TaskArray)
|
|
|
+ {
|
|
|
+ item.Value.Dispose();
|
|
|
+ }
|
|
|
+ TaskArray.Clear();
|
|
|
+ TaskArray = null;
|
|
|
+ }
|
|
|
+ //队列清空
|
|
|
+ if (DataQueue != null)
|
|
|
+ {
|
|
|
+ DataQueue.Clear();
|
|
|
+ DataQueue = null;
|
|
|
+ }
|
|
|
+
|
|
|
return Break("Off", true);
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
@@ -377,5 +343,127 @@ namespace YSAI.Core.subscription
|
|
|
return Break("Off", false, ex.Message, Exc: ex);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 队列数据
|
|
|
+ /// </summary>
|
|
|
+ private class QueueData
|
|
|
+ {
|
|
|
+ /// <summary>
|
|
|
+ /// 操作结果
|
|
|
+ /// </summary>
|
|
|
+ public OperateResult value { get; set; }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 任务集合
|
|
|
+ /// </summary>
|
|
|
+ private ConcurrentDictionary<Guid, Task> TaskArray;
|
|
|
+ /// <summary>
|
|
|
+ /// 数据队列
|
|
|
+ /// </summary>
|
|
|
+ private ConcurrentQueue<QueueData> DataQueue;
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 任务处理
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ private Task TaskHandle()
|
|
|
+ {
|
|
|
+ //起个新线程处理
|
|
|
+ return Task.Factory.StartNew(() =>
|
|
|
+ {
|
|
|
+ //循环
|
|
|
+ while (TaskArray != null)
|
|
|
+ {
|
|
|
+ //队列数据
|
|
|
+ QueueData? queueData;
|
|
|
+ while (DataQueue.TryDequeue(out queueData))
|
|
|
+ {
|
|
|
+ if (queueData != null)
|
|
|
+ {
|
|
|
+ //状态判断
|
|
|
+ if (queueData.value.State)
|
|
|
+ {
|
|
|
+ //判断数据类型
|
|
|
+ switch (queueData.value.RType)
|
|
|
+ {
|
|
|
+ case @enum.ResultType.KeyValue:
|
|
|
+ ConcurrentDictionary<string, AddressValue>? RData = queueData.value.RData as ConcurrentDictionary<string, AddressValue>;
|
|
|
+ //做流程处理
|
|
|
+ if (basics.DataChangeOut)
|
|
|
+ {
|
|
|
+ //实例化对象
|
|
|
+ if (UpParam == null)
|
|
|
+ {
|
|
|
+ UpParam = new ConcurrentDictionary<string, AddressValue>();
|
|
|
+ }
|
|
|
+ //空数据不做处理
|
|
|
+ if (RData != null && RData.Count > 0)
|
|
|
+ {
|
|
|
+ //获取到第一次数据,准备节点检测数据是否变化
|
|
|
+ if (!ConcurrentDictionaryEquals(UpParam, RData))
|
|
|
+ {
|
|
|
+ //当节点数据变化,有一项数据未变化,也把这未变项与变化项一同抛出,在特殊用途中,确保此批点位数据都存在
|
|
|
+ if (basics.SameDataOut)
|
|
|
+ {
|
|
|
+ //抛出差异数据
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, "存在变化数据", RData, queueData.value.RType));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //如果不一致,就找寻里面不一样的 键、值
|
|
|
+ ConcurrentDictionary<string, AddressValue> data = new ConcurrentDictionary<string, AddressValue>(RData.Except(UpParam).ToDictionary(x => x.Key, x => x.Value));
|
|
|
+ //抛出差异数据
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, "变化数据", data, queueData.value.RType));
|
|
|
+ }
|
|
|
+ //把这次新数据 赋值到历史数据中
|
|
|
+ UpParam = RData;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, "实时数据", queueData.value.RData, queueData.value.RType));
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case @enum.ResultType.KeyValueArray:
|
|
|
+ List<ConcurrentDictionary<string, AddressValue>>? RDataArray = queueData.value.RData as List<ConcurrentDictionary<string, AddressValue>>;
|
|
|
+ //实例化对象
|
|
|
+ if (UpParamArray == null)
|
|
|
+ {
|
|
|
+ UpParamArray = new List<ConcurrentDictionary<string, AddressValue>>();
|
|
|
+ }
|
|
|
+ //空数据不做处理
|
|
|
+ if (RDataArray != null && RDataArray.Count > 0)
|
|
|
+ {
|
|
|
+ //获取到第一次数据,准备节点检测数据是否变化
|
|
|
+ if (!ConcurrentDictionaryEquals(UpParamArray, RDataArray))
|
|
|
+ {
|
|
|
+ foreach (var item in RDataArray)
|
|
|
+ {
|
|
|
+ //抛出差异数据
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, "存在变化数据", RDataArray, queueData.value.RType));
|
|
|
+ }
|
|
|
+ //把这次新数据 赋值到历史数据中
|
|
|
+ UpParamArray = RDataArray;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //当状态为false 说明读取失败,写入日志
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $"自定义订阅轮询异常:{queueData.value.Message}"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //队列里面的数据处理完休息一下
|
|
|
+ Thread.Sleep(basics.TaskHandleAccomplishSleepTime);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
}
|