lixun 2 éve
szülő
commit
17cde73cd7

+ 1 - 1
src/YSAI.DAQ/YSAI.Core/interface/only/IDB.cs

@@ -10,5 +10,5 @@ namespace YSAI.Core.@interface.only
     /// <summary>
     /// 数据库接口
     /// </summary>
-    public interface IDB : IRead, IDisposable { }
+    public interface IDB : IDaq, IDisposable { }
 }

+ 4 - 8
src/YSAI.DAQ/YSAI.Core/interface/only/IOpc.cs

@@ -19,17 +19,15 @@ namespace YSAI.Core.@interface.only
         /// </summary>
         /// <param name="GroupName">组名</param>
         /// <param name="IsSubscribed">是否需要订阅</param>
-        /// <param name="UpdateRate">更新频率</param>
         /// <returns>统一出参</returns>
-        OperateResult AddGroup(string GroupName, bool IsSubscribed = false, int UpdateRate = 200);
+        OperateResult AddGroup(string GroupName, bool IsSubscribed = false);
         /// <summary>
         /// 添加组
         /// </summary>
         /// <param name="GroupName">组名</param>
         /// <param name="IsSubscribed">是否需要订阅</param>
-        /// <param name="UpdateRate">更新频率</param>
         /// <returns>统一出参</returns>
-        Task<OperateResult> AddGroupAsync(string GroupName, bool IsSubscribed = false, int UpdateRate = 200);
+        Task<OperateResult> AddGroupAsync(string GroupName, bool IsSubscribed = false);
 
         /// <summary>
         /// 添加节点,重复的地址直接过滤不重复添加
@@ -37,18 +35,16 @@ namespace YSAI.Core.@interface.only
         /// <param name="address">节点</param>
         /// <param name="GroupName">组名,组为空时,自动添加默认组</param>
         /// <param name="IsSubscribed">是否订阅组,当为true 时,请绑定事件</param>
-        /// <param name="UpdateRate">更新频率</param>
         /// <returns>统一出参</returns>
-        OperateResult AddNode(Address address, string GroupName, bool IsSubscribed = false, int UpdateRate = 200);
+        OperateResult AddNode(Address address, string GroupName, bool IsSubscribed = false);
         /// <summary>
         /// 添加节点,重复的地址直接过滤不重复添加
         /// </summary>
         /// <param name="address">节点</param>
         /// <param name="GroupName">组名,组为空时,自动添加默认组</param>
         /// <param name="IsSubscribed">是否订阅组,当为true 时,请绑定事件</param>
-        /// <param name="UpdateRate">更新频率</param>
         /// <returns>统一出参</returns>
-        Task<OperateResult> AddNodeAsync(Address address, string GroupName, bool IsSubscribed = false, int UpdateRate = 200);
+        Task<OperateResult> AddNodeAsync(Address address, string GroupName, bool IsSubscribed = false);
 
         /// <summary>
         /// 修改指定组的订阅状态

+ 2 - 2
src/YSAI.DAQ/YSAI.Core/interface/only/ISubscribe.cs

@@ -14,14 +14,14 @@ namespace YSAI.Core.@interface.only
     public interface ISubscribe : IOn, IOff, IDisposable
     {
         /// <summary>
-        /// 添加自定义订阅
+        /// 订阅
         /// </summary>
         /// <param name="address">节点数据</param>
         /// <returns>统一出参</returns>
         OperateResult Subscribe(Address address);
 
         /// <summary>
-        /// 添加自定义订阅
+        /// 订阅
         /// </summary>
         /// <param name="address">节点数据</param>
         /// <returns>统一出参</returns>

+ 2 - 1
src/YSAI.DAQ/YSAI.Core/interface/unify/IDaq.cs

@@ -3,13 +3,14 @@ using System.Collections.Generic;
 using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
+using YSAI.Core.@interface.only;
 
 namespace YSAI.Core.@interface.unify
 {
     /// <summary>
     /// 数采接口
     /// </summary>
-    public interface IDaq : IOn, IOff, IRead, IWrite, IDisposable
+    public interface IDaq : IOn, IOff, IRead, IWrite, ISubscribe, IDisposable
     {
 
     }

+ 1 - 0
src/YSAI.DAQ/YSAI.Core/reflection/ReflectionOperate.cs

@@ -271,6 +271,7 @@ namespace YSAI.Core.reflection
             return Activator.CreateInstance(NamespaceAndClassNameType, Param);
         }
 
+
         /// <summary>
         /// 无参初始化
         /// </summary>

+ 54 - 13
src/YSAI.DAQ/YSAI.Core/subscribe/SubscribeData.cs

@@ -1,4 +1,5 @@
 using System.Collections.Concurrent;
+using System.ComponentModel;
 using YSAI.Core.data;
 
 namespace YSAI.Core.subscription
@@ -11,20 +12,16 @@ namespace YSAI.Core.subscription
         /// <summary>
         /// 基础数据
         /// </summary>
-        public class Basics
+        public class Basics : SCData
         {
-            /// <summary>
-            /// 点位
-            /// </summary>
-            public Address? Address { get; set; }
             /// <summary>
             /// 唯一标识符
             /// </summary>
             public string? SN { get; set; }
             /// <summary>
-            /// 休眠时间(毫秒)
+            /// 点位 可为空,可后期赋值
             /// </summary>
-            public int SleepTime { get; set; }
+            public Address? Address { get; set; }
             /// <summary>
             /// 执行方法的委托,读取方法,每个通信设备都应该存在
             /// Address:请求参数
@@ -32,17 +29,64 @@ namespace YSAI.Core.subscription
             /// </summary>
             public Func<Address, OperateResult>? Function { get; set; }
             /// <summary>
+            /// 重写基类中的Equals方法
+            /// </summary>
+            /// <param name="obj"></param>
+            /// <returns></returns>
+            public override bool Equals(object obj)
+            {
+                if (obj == null)
+                {
+                    return false;
+                }
+                Basics? Obj = obj as Basics;
+                if (Obj == null)
+                {
+                    return false;
+                }
+                else
+                {
+                    if (SN == Obj.SN &&
+                        Function == Obj.Function &&
+                        Address == Obj.Address &&
+                        SleepTime == Obj.SleepTime &&
+                        SameDataOut == Obj.SameDataOut &&
+                        DataChangeOut == Obj.DataChangeOut)
+                    {
+                        return true;
+                    }
+                    else
+                    {
+                        return false;
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// 订阅核心数据
+        /// </summary>
+        public class SCData
+        {
+            /// <summary>
+            /// 休眠时间(毫秒)
+            /// </summary>
+            [Description("休眠时间")]
+            public int SleepTime { get; set; }
+            /// <summary>
             /// 数据变化抛出 false则为实时数据
             /// 只抛出变化项
             /// 数据库查询,此项无效
             /// </summary>
-            public bool DataChangeOut { get; set; }
+            [Description("数据变化抛出")]
+            public bool DataChangeOut { get; set; } = true;
             /// <summary>
             /// 当节点数据变化,有一项数据未变化,也把这未变项与变化项一同抛出,在特殊用途中,确保此批点位数据都存在
             /// 当 DataChangeOut 为 true 此项生效
             /// 数据库查询,此项无效
             /// </summary>
-            public bool SameDataOut { get; set; }
+            [Description("未变项与变化项一同抛出")]
+            public bool SameDataOut { get; set; } = false;
             /// <summary>
             /// 重写基类中的Equals方法
             /// </summary>
@@ -61,10 +105,7 @@ namespace YSAI.Core.subscription
                 }
                 else
                 {
-                    if (SN == Obj.SN &&
-                        Function == Obj.Function &&
-                        Address == Obj.Address &&
-                        SleepTime == Obj.SleepTime &&
+                    if (SleepTime == Obj.SleepTime &&
                         SameDataOut == Obj.SameDataOut &&
                         DataChangeOut == Obj.DataChangeOut)
                     {

+ 3 - 1
src/YSAI.DAQ/YSAI.Core/subscribe/SubscribeOperate.cs

@@ -141,6 +141,7 @@ namespace YSAI.Core.subscription
             }
             return true;
         }
+
         /// <summary>
         /// 执行轮询
         /// </summary>
@@ -239,6 +240,7 @@ namespace YSAI.Core.subscription
                 Thread.Sleep(basics.SleepTime);
             }
         }
+
         /// <summary>
         /// 释放
         /// </summary>
@@ -260,7 +262,7 @@ namespace YSAI.Core.subscription
                 {
                     //添加新节点
                     basics.Address.AddressArray.AddRange(address.AddressArray);
-                    //去重
+                    //Distinct()去重
                     basics.Address.AddressArray = basics.Address.AddressArray.Distinct().ToList();
                 }
                 GoOn = true;

+ 2 - 2
src/YSAI.DAQ/YSAI.DAQ.sln

@@ -55,9 +55,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "YSAI.WindowMessage", "YSAI.
 EndProject
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "YSAI.Unility.Windows", "YSAI.Unility.Windows\YSAI.Unility.Windows.csproj", "{257F1474-B220-4C61-88C6-5B83BEF7B3A7}"
 EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "YSAI.Tool.Windows", "YSAI.Tool.Windows\YSAI.Tool.Windows.csproj", "{49133ADB-D3BF-4682-AA5A-CC1CF1917D45}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "YSAI.Tool.Windows", "YSAI.Tool.Windows\YSAI.Tool.Windows.csproj", "{49133ADB-D3BF-4682-AA5A-CC1CF1917D45}"
 EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "YSAI.Manage.Windows", "YSAI.Manage.Windows\YSAI.Manage.Windows.csproj", "{2221CE78-FA24-40E3-8453-9C66E32F9C0C}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "YSAI.Manage.Windows", "YSAI.Manage.Windows\YSAI.Manage.Windows.csproj", "{2221CE78-FA24-40E3-8453-9C66E32F9C0C}"
 EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution

+ 6 - 2
src/YSAI.DAQ/YSAI.DB/DBData.cs

@@ -6,6 +6,7 @@ using System.ComponentModel;
 using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
+using YSAI.Core.subscription;
 
 namespace YSAI.DB
 {
@@ -14,7 +15,7 @@ namespace YSAI.DB
         /// <summary>
         /// 基础数据
         /// </summary>
-        public class Basics
+        public class Basics : SubscribeData.SCData
         {
             /// <summary>
             /// 唯一标识符
@@ -50,7 +51,10 @@ namespace YSAI.DB
                 }
                 else
                 {
-                    if (ConnectStr == Obj.ConnectStr && DBType == Obj.DBType && SN == Obj.SN)
+                    if (SleepTime == Obj.SleepTime &&
+                    SameDataOut == Obj.SameDataOut &&
+                    DataChangeOut == Obj.DataChangeOut && 
+                    ConnectStr == Obj.ConnectStr && DBType == Obj.DBType && SN == Obj.SN)
                     {
                         return true;
                     }

+ 109 - 1
src/YSAI.DAQ/YSAI.DB/DBOperate.cs

@@ -1,4 +1,5 @@
 using Dapper;
+using Google.Protobuf.WellKnownTypes;
 using MySql.Data.MySqlClient;
 using MySqlX.XDevAPI.Relational;
 using Newtonsoft.Json.Linq;
@@ -11,6 +12,7 @@ using System.Data.SQLite;
 using YSAI.Core.data;
 using YSAI.Core.@interface.only;
 using YSAI.Core.@interface.unify;
+using YSAI.Core.subscription;
 using YSAI.Core.virtualAddress;
 using YSAI.Log;
 using YSAI.Unility;
@@ -22,7 +24,7 @@ namespace YSAI.DB
     /// <summary>
     /// 数据库操作
     /// </summary>
-    public sealed class DBOperate : IBaseAbstract,IDB
+    public sealed class DBOperate : IBaseAbstract<SubscribeData.EventParam>,IDB
     {
         private static readonly object Lock = new object();  //锁
         private static List<DBOperate> ThisObjList = new List<DBOperate>(); //自身对象集合
@@ -689,5 +691,111 @@ namespace YSAI.DB
                 return Break("Read", false, ex.Message);
             }
         }
+
+        /// <summary>
+        /// 实现订阅功能
+        /// </summary>
+        private SubscribeOperate subscribeOperate;
+
+        public OperateResult Subscribe(Address address)
+        {
+            Depart("Subscribe");
+            try
+            {
+                if (subscribeOperate == null)
+                {
+                    subscribeOperate = SubscribeOperate.Instance(new SubscribeData.Basics()
+                    {
+                        Address = address,
+                        DataChangeOut = basics.DataChangeOut,
+                        Function = Read,
+                        SameDataOut = basics.SameDataOut,
+                        SleepTime = basics.SleepTime
+                    });
+                    subscribeOperate.OnEvent += SubscribeOperate_OnEvent;
+                    OperateResult operateResult = subscribeOperate.On();
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+                else
+                {
+                    OperateResult operateResult = subscribeOperate.Subscribe(address);
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+            }
+            catch (Exception ex)
+            {
+                return Break("Subscribe", false, ex.Message);
+            }
+        }
+        public Task<OperateResult> SubscribeAsync(Address address)
+        {
+            return Task.Run(() => Subscribe(address));
+        }
+
+        public OperateResult UnSubscribe(Address address)
+        {
+            Depart("UnSubscribe");
+            try
+            {
+                if (subscribeOperate != null)
+                {
+                    OperateResult operateResult = subscribeOperate.UnSubscribe(address);
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+                else
+                {
+                    return Break("UnSubscribe", false, "当前尚未订阅");
+                }
+            }
+            catch (Exception ex)
+            {
+                return Break("UnSubscribe", false, ex.Message);
+            }
+        }
+
+        public Task<OperateResult> UnSubscribeAsync(Address address)
+        {
+            return Task.Run(() => UnSubscribe(address));
+        }
+
+        /// <summary>
+        /// 事件抛出
+        /// </summary>
+        /// <param name="sender">自定义订阅对象</param>
+        /// <param name="e">返回的参数</param>
+        private void SubscribeOperate_OnEvent(object? sender, SubscribeData.EventParam e)
+        {
+            OnEventHandler?.Invoke(this, e);
+        }
+
+        public Task<OperateResult> WriteAsync<V>(ConcurrentDictionary<string, V> Values)
+        {
+            return Task.Run(()=> Write(Values));
+        }
+
+        public OperateResult Write<V>(ConcurrentDictionary<string, V> Values)
+        {
+            return Break(Depart("Write"),false, "不支持 Write 操作");
+        }
+
+        public Task<OperateResult> OnAsync()
+        {
+            return Task.Run(() => On());
+        }
+
+        public OperateResult On()
+        {
+            return Break(Depart("On"), true);
+        }
+
+        public Task<OperateResult> OffAsync()
+        {
+            return Task.Run(() => Off());
+        }
+
+        public OperateResult Off()
+        {
+            return Break(Depart("Off"), true);
+        }
     }
 }

+ 3 - 3
src/YSAI.DAQ/YSAI.Manage.Core/base/ManageBaseOperate.cs

@@ -640,8 +640,8 @@ namespace YSAI.Manage.Core.@base
                         {
                             case DaqType.OpcUa:
                                 //OPCUA 自带订阅功能
-                                ConcurrentDictionary<string, List<AddressDetails>> sub = new ConcurrentDictionary<string, List<AddressDetails>>();
-                                sub.TryAdd(addressManage.SN, addressManage.AddressArray);
+                                ConcurrentDictionary<string, Address> sub = new ConcurrentDictionary<string, Address>();
+                                sub.TryAdd(addressManage.SN, addressManage);
                                 operateResult = OpcUaDaqObjArray[SN].AddSubscribe(sub);
                                 if (!operateResult.State)
                                 {
@@ -766,7 +766,7 @@ namespace YSAI.Manage.Core.@base
                             switch (DType)
                             {
                                 case DaqType.OpcUa:
-                                    operateResult = OpcUaDaqObjArray[SN].RemoveSubscribe(basics.AManages.FirstOrDefault(c => c.DType.Equals(DType) && c.SN.Equals(SN)).AddressArray);
+                                    operateResult = OpcUaDaqObjArray[SN].RemoveSubscribe(basics.AManages.FirstOrDefault(c => c.DType.Equals(DType) && c.SN.Equals(SN)));
                                     if (!operateResult.State)
                                     {
                                         return Break("Off", false, operateResult.Message);

+ 11 - 4
src/YSAI.DAQ/YSAI.Modbus/client/ModbusClientData.cs

@@ -1,7 +1,10 @@
 using Newtonsoft.Json;
 using Newtonsoft.Json.Converters;
+using System.Collections.Concurrent;
 using System.ComponentModel;
 using System.IO.Ports;
+using YSAI.Core.data;
+using YSAI.Core.subscription;
 
 namespace YSAI.Modbus.client
 {
@@ -230,7 +233,7 @@ namespace YSAI.Modbus.client
         /// <summary>
         /// 基础数据
         /// </summary>
-        public class Basics
+        public class Basics: SubscribeData.SCData
         {
             /// <summary>
             /// 唯一标识符
@@ -308,9 +311,12 @@ namespace YSAI.Modbus.client
                 }
                 else
                 {
-                    if (ProtocolType == Obj.ProtocolType &&
-                    ReadTimeOut == Obj.ReadTimeOut &&
-                    WriteTimeOut == Obj.WriteTimeOut)
+                    if (SleepTime == Obj.SleepTime &&
+                        SameDataOut == Obj.SameDataOut &&
+                        DataChangeOut == Obj.DataChangeOut &&
+                        ProtocolType == Obj.ProtocolType &&
+                        ReadTimeOut == Obj.ReadTimeOut &&
+                        WriteTimeOut == Obj.WriteTimeOut)
                     {
                         bool State = false;
                         switch (ProtocolType)
@@ -399,5 +405,6 @@ namespace YSAI.Modbus.client
             [Description("空")]
             NULL
         }
+
     }
 }

+ 80 - 1
src/YSAI.DAQ/YSAI.Modbus/client/ModbusClientOperate.cs

@@ -19,6 +19,7 @@ using System.Threading.Tasks;
 using YSAI.Core.data;
 using YSAI.Core.@interface.only;
 using YSAI.Core.@interface.unify;
+using YSAI.Core.subscription;
 using YSAI.Core.virtualAddress;
 using YSAI.Unility;
 using static YSAI.Modbus.client.ModbusClientData;
@@ -28,7 +29,7 @@ namespace YSAI.Modbus.client
     /// <summary>
     /// modbus 操作
     /// </summary>
-    public sealed class ModbusClientOperate : IBaseAbstract, IModbusClient
+    public sealed class ModbusClientOperate : IBaseAbstract<SubscribeData.EventParam>, IModbusClient
     {
         private static readonly object Lock = new object();  //锁
         private static List<ModbusClientOperate> ThisObjList = new List<ModbusClientOperate>(); //自身对象集合
@@ -865,5 +866,83 @@ namespace YSAI.Modbus.client
                 return Break("Write", false, "写入类型错误");
             }
         }
+
+
+        /// <summary>
+        /// 实现订阅功能
+        /// </summary>
+        private SubscribeOperate subscribeOperate;
+
+        public OperateResult Subscribe(Address address)
+        {
+            Depart("Subscribe");
+            try
+            {
+                if (subscribeOperate == null)
+                {
+                    subscribeOperate = SubscribeOperate.Instance(new SubscribeData.Basics()
+                    {
+                        Address = address,
+                        DataChangeOut = basics.DataChangeOut,
+                        Function = Read,
+                        SameDataOut = basics.SameDataOut,
+                        SleepTime = basics.SleepTime
+                    });
+                    subscribeOperate.OnEvent += SubscribeOperate_OnEvent;
+                    OperateResult operateResult = subscribeOperate.On();
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+                else
+                {
+                    OperateResult operateResult = subscribeOperate.Subscribe(address);
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+            }
+            catch (Exception ex)
+            {
+                return Break("Subscribe", false, ex.Message);
+            }
+        }
+        public Task<OperateResult> SubscribeAsync(Address address)
+        {
+            return Task.Run(()=> Subscribe(address));
+        }
+
+        public OperateResult UnSubscribe(Address address)
+        {
+            Depart("UnSubscribe");
+            try
+            {
+                if (subscribeOperate != null)
+                {
+                    OperateResult operateResult = subscribeOperate.UnSubscribe(address);
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+                else
+                {
+                    return Break("UnSubscribe", false, "当前尚未订阅");
+                }
+            }
+            catch (Exception ex)
+            {
+                return Break("UnSubscribe", false, ex.Message);
+            }
+        }
+
+        public Task<OperateResult> UnSubscribeAsync(Address address)
+        {
+            return Task.Run(() => UnSubscribe(address));
+        }
+
+        /// <summary>
+        /// 事件抛出
+        /// </summary>
+        /// <param name="sender">自定义订阅对象</param>
+        /// <param name="e">返回的参数</param>
+        private void SubscribeOperate_OnEvent(object? sender, SubscribeData.EventParam e)
+        {
+            OnEventHandler?.Invoke(this, e);
+        }
     }
+
 }

+ 18 - 11
src/YSAI.DAQ/YSAI.Opc/da/client/OpcDaClientData.cs

@@ -37,7 +37,11 @@ namespace YSAI.Opc.da.client
             /// </summary>
             [Description("接口版本")]
             public Specification SpecificationVer { get; set; }
-
+            /// <summary>
+            /// 更新频率
+            /// </summary>
+            [Description("更新频率")]
+            public int UpdateRate { get; set; } = 100;
             /// <summary>
             /// 重写Equals
             /// </summary>
@@ -59,6 +63,18 @@ namespace YSAI.Opc.da.client
                 }
             }
         }
+
+
+        /// <summary>
+        /// 事件响应数据带参
+        /// </summary>
+        public class EventParam : Event
+        {
+            /// <summary>
+            /// 键值集合
+            /// </summary>
+            public ConcurrentDictionary<string, AddressValue> Params { get; set; }
+        }
         /// <summary>
         /// 事件响应数据
         /// </summary>
@@ -77,15 +93,6 @@ namespace YSAI.Opc.da.client
             /// </summary>
             public DateTime? Time { get; set; } = DateTime.Now.ToLocalTime();
         }
-        /// <summary>
-        /// 事件响应数据带参
-        /// </summary>
-        public class EventParam : Event
-        {
-            /// <summary>
-            /// 键值集合
-            /// </summary>
-            public ConcurrentDictionary<string, AddressValue> Params { get; set; }
-        }
+
     }
 }

+ 58 - 8
src/YSAI.DAQ/YSAI.Opc/da/client/OpcDaClientOperate.cs

@@ -224,7 +224,7 @@ namespace YSAI.Opc.da.client
         protected override string LogHead => "[ OpcDaClientOperate 操作 ]";
         protected override string ClassName => "OpcDaClientOperate";
 
-        public OperateResult AddGroup(string GroupName, bool IsSubscribed = false, int UpdateRate = 200)
+        public OperateResult AddGroup(string GroupName, bool IsSubscribed = false)
         {
             //开始记录运行时间
             Depart("AddGroup");
@@ -243,7 +243,7 @@ namespace YSAI.Opc.da.client
                         subscriptionState.KeepAlive = 1000;
                         subscriptionState.Deadband = 0;
                         subscriptionState.Active = true;
-                        subscriptionState.UpdateRate = UpdateRate;
+                        subscriptionState.UpdateRate =basics.UpdateRate;
                         subscriptionState.ClientHandle = Guid.NewGuid().ToString();
                         subscriptionState.Name = GroupName;
                         ISubscription subscription = opcDaClient.CreateSubscription(subscriptionState);
@@ -270,12 +270,12 @@ namespace YSAI.Opc.da.client
             }
         }
 
-        public Task<OperateResult> AddGroupAsync(string GroupName, bool IsSubscribed = false, int UpdateRate = 200)
+        public Task<OperateResult> AddGroupAsync(string GroupName, bool IsSubscribed = false)
         {
-            return Task.Run(() => AddGroup(GroupName, IsSubscribed, UpdateRate));
+            return Task.Run(() => AddGroup(GroupName, IsSubscribed));
         }
 
-        public OperateResult AddNode(Address address, string GroupName, bool IsSubscribed = false, int UpdateRate = 200)
+        public OperateResult AddNode(Address address, string GroupName, bool IsSubscribed = false)
         {
             //开始记录运行时间
             Depart("AddNode");
@@ -291,7 +291,7 @@ namespace YSAI.Opc.da.client
                         if (!SubscriptionArray.ContainsKey(DefaultGroupName))
                         {
                             //创建默认组
-                            OperateResult operateResult = AddGroup(DefaultGroupName, IsSubscribed, UpdateRate);
+                            OperateResult operateResult = AddGroup(DefaultGroupName, IsSubscribed);
                             if (!operateResult.State)
                             {
                                 return operateResult;
@@ -372,9 +372,9 @@ namespace YSAI.Opc.da.client
             }
         }
 
-        public Task<OperateResult> AddNodeAsync(Address address, string GroupName, bool IsSubscribed = false, int UpdateRate = 200)
+        public Task<OperateResult> AddNodeAsync(Address address, string GroupName, bool IsSubscribed = false)
         {
-            return Task.Run(() => AddNode(address, GroupName, IsSubscribed, UpdateRate));
+            return Task.Run(() => AddNode(address, GroupName, IsSubscribed));
         }
 
         public OperateResult UpdateGroupSubscribedState(string GroupName, bool IsSubscribed)
@@ -761,5 +761,55 @@ namespace YSAI.Opc.da.client
                 return Break("Write", false, ex.Message);
             }
         }
+
+        public OperateResult Subscribe(Address address)
+        {
+            Depart("Subscribe");
+            try
+            {
+                OperateResult operateResult = AddNode(address, string.Empty, true);
+                return Break("Subscribe", operateResult.State, operateResult.Message);
+            }
+            catch (Exception ex)
+            {
+                return Break("Subscribe", false, ex.Message);
+            }
+        }
+
+        public Task<OperateResult> SubscribeAsync(Address address)
+        {
+            return Task.Run(()=> SubscribeAsync(address));
+        }
+
+        public OperateResult UnSubscribe(Address address)
+        {
+            Depart("UnSubscribe");
+            try
+            {
+                List<string> FailMessage = new List<string>();
+                foreach (var item in address.AddressArray)
+                {
+                    OperateResult operateResult = RemoveNode(item.AddressName);
+                    if (!operateResult.State)
+                    {
+                        FailMessage.Add(operateResult.Message);
+                    }
+                }
+                if (FailMessage.Count > 0)
+                {
+                    return Break("UnSubscribe", false, FailMessage.ToJson(),RData:FailMessage);
+                }
+                return Break("UnSubscribe", true);
+            }
+            catch (Exception ex)
+            {
+                return Break("UnSubscribe", false, ex.Message);
+            }
+        }
+
+        public Task<OperateResult> UnSubscribeAsync(Address address)
+        {
+            return Task.Run(() => UnSubscribeAsync(address));
+        }
     }
 }

+ 6 - 2
src/YSAI.DAQ/YSAI.Opc/da/http/OpcDaHttpData.cs

@@ -7,6 +7,7 @@ using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
 using YSAI.Core.data;
+using YSAI.Core.subscription;
 
 namespace YSAI.Opc.da.http
 {
@@ -18,7 +19,7 @@ namespace YSAI.Opc.da.http
         /// <summary>
         /// 基础数据
         /// </summary>
-        public class Basics
+        public class Basics : SubscribeData.SCData
         {
             /// <summary>
             /// 唯一标识
@@ -58,7 +59,10 @@ namespace YSAI.Opc.da.http
             {
                 Basics? obj = Obj as Basics;
                 if (obj == null) return false;
-                if (obj.SN == this.SN &&
+                if (SleepTime == obj.SleepTime &&
+                    SameDataOut == obj.SameDataOut &&
+                    DataChangeOut == obj.DataChangeOut &&
+                    obj.SN == this.SN &&
                     obj.ServerIP == this.ServerIP && 
                     obj.Port == this.Port && 
                     obj.Key == this.Key && 

+ 78 - 1
src/YSAI.DAQ/YSAI.Opc/da/http/OpcDaHttpOperate.cs

@@ -17,10 +17,11 @@ using YSAI.Core.virtualAddress;
 using YSAI.Core.@interface.only;
 using YSAI.Opc.da.client;
 using YSAI.Core.@interface.unify;
+using YSAI.Core.subscription;
 
 namespace YSAI.Opc.da.http
 {
-    public sealed class OpcDaHttpOperate : IBaseAbstract<OpcDaHttpData.EventParam>, IOpcDaHttp
+    public sealed class OpcDaHttpOperate : IBaseAbstract<OpcDaHttpData.EventParam, SubscribeData.EventParam>, IOpcDaHttp
     {
         /// <summary>
         /// 构造函数
@@ -358,5 +359,81 @@ namespace YSAI.Opc.da.http
         {
             return Break(Depart("Write"), false, "不支持写入功能");
         }
+
+        /// <summary>
+        /// 实现订阅功能
+        /// </summary>
+        private SubscribeOperate subscribeOperate;
+
+        public OperateResult Subscribe(Address address)
+        {
+            Depart("Subscribe");
+            try
+            {
+                if (subscribeOperate == null)
+                {
+                    subscribeOperate = SubscribeOperate.Instance(new SubscribeData.Basics()
+                    {
+                        Address = address,
+                        DataChangeOut = basics.DataChangeOut,
+                        Function = Read,
+                        SameDataOut = basics.SameDataOut,
+                        SleepTime = basics.SleepTime
+                    });
+                    subscribeOperate.OnEvent += SubscribeOperate_OnEvent;
+                    OperateResult operateResult = subscribeOperate.On();
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+                else
+                {
+                    OperateResult operateResult = subscribeOperate.Subscribe(address);
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+            }
+            catch (Exception ex)
+            {
+                return Break("Subscribe", false, ex.Message);
+            }
+        }
+        public Task<OperateResult> SubscribeAsync(Address address)
+        {
+            return Task.Run(() => Subscribe(address));
+        }
+
+        public OperateResult UnSubscribe(Address address)
+        {
+            Depart("UnSubscribe");
+            try
+            {
+                if (subscribeOperate != null)
+                {
+                    OperateResult operateResult = subscribeOperate.UnSubscribe(address);
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+                else
+                {
+                    return Break("UnSubscribe", false, "当前尚未订阅");
+                }
+            }
+            catch (Exception ex)
+            {
+                return Break("UnSubscribe", false, ex.Message);
+            }
+        }
+
+        public Task<OperateResult> UnSubscribeAsync(Address address)
+        {
+            return Task.Run(() => UnSubscribe(address));
+        }
+
+        /// <summary>
+        /// 事件抛出
+        /// </summary>
+        /// <param name="sender">自定义订阅对象</param>
+        /// <param name="e">返回的参数</param>
+        private void SubscribeOperate_OnEvent(object? sender, SubscribeData.EventParam e)
+        {
+            OnEventParamHandler?.Invoke(this, e);
+        }
     }
 }

+ 1 - 1
src/YSAI.DAQ/YSAI.Opc/ua/client/OpcUaClientData.cs

@@ -74,7 +74,7 @@ namespace YSAI.Opc.ua.client
             /// 发布时间间隔
             /// </summary>
             [Description("发布时间间隔")]
-            public byte PublishingInterval { get; set; } = 1000;
+            public byte PublishingInterval { get; set; } = 20;
 
             /// <summary>
             /// 重写基类中的Equals方法

+ 48 - 4
src/YSAI.DAQ/YSAI.Opc/ua/client/OpcUaClientOperate.cs

@@ -74,6 +74,10 @@ namespace YSAI.Opc.ua.client
 
         #region 私有属性
         /// <summary>
+        /// 默认标签
+        /// </summary>
+        private string DefaultTag = "YSAI";
+        /// <summary>
         /// 基础数据
         /// </summary>
         private Basics basics { get; set; }
@@ -231,7 +235,7 @@ namespace YSAI.Opc.ua.client
             };
 
             // 读取当前值
-            ReadResponse readResponse =  clientSession.ReadAsync(
+            ReadResponse readResponse = clientSession.ReadAsync(
                 null,
                 0,
                 TimestampsToReturn.Neither,
@@ -697,7 +701,7 @@ namespace YSAI.Opc.ua.client
 
         public Task<OperateResult> AddSubscribeAsync(ConcurrentDictionary<string, Address> param)
         {
-            return Task.Run(()=> AddSubscribe(param));
+            return Task.Run(() => AddSubscribe(param));
         }
 
         public OperateResult AddSubscribe(ConcurrentDictionary<string, Address> param)
@@ -921,7 +925,7 @@ namespace YSAI.Opc.ua.client
 
         public Task<OperateResult> DeleteNodeAsync(string Key)
         {
-            return Task.Run(()=> DeleteNode(Key));
+            return Task.Run(() => DeleteNode(Key));
         }
 
         public OperateResult DeleteNode(string Key)
@@ -1065,7 +1069,7 @@ namespace YSAI.Opc.ua.client
 
         public Task<OperateResult> OffAsync()
         {
-            return Task.Run(()=> Off());
+            return Task.Run(() => Off());
         }
 
         public OperateResult Off()
@@ -1249,5 +1253,45 @@ namespace YSAI.Opc.ua.client
                 return Break("Writes", false, ex.Message);
             }
         }
+
+        public OperateResult Subscribe(Address address)
+        {
+            Depart("Subscribe");
+            try
+            {
+                ConcurrentDictionary<string, Address> PARAM = new ConcurrentDictionary<string, Address>();
+                PARAM.TryAdd(DefaultTag, address);
+                OperateResult operateResult = AddSubscribe(PARAM);
+                return Break("Subscribe", operateResult.State, operateResult.Message);
+            }
+            catch (Exception ex)
+            {
+                return Break("Subscribe", false, ex.Message);
+            }
+        }
+
+        public Task<OperateResult> SubscribeAsync(Address address)
+        {
+            return Task.Run(() => SubscribeAsync(address));
+        }
+
+        public OperateResult UnSubscribe(Address address)
+        {
+            Depart("UnSubscribe");
+            try
+            {
+                OperateResult operateResult = RemoveSubscribe(address);
+                return Break("UnSubscribe", operateResult.State, operateResult.Message);
+            }
+            catch (Exception ex)
+            {
+                return Break("UnSubscribe", false, ex.Message);
+            }
+        }
+
+        public Task<OperateResult> UnSubscribeAsync(Address address)
+        {
+            return Task.Run(() => UnSubscribeAsync(address));
+        }
     }
 }

+ 6 - 2
src/YSAI.DAQ/YSAI.S7/client/S7ClientData.cs

@@ -8,6 +8,7 @@ using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
 using YSAI.Core.@enum;
+using YSAI.Core.subscription;
 
 namespace YSAI.S7.client
 {
@@ -17,7 +18,7 @@ namespace YSAI.S7.client
         /// <summary>
         /// 基础数据
         /// </summary>
-        public class Basics
+        public class Basics : SubscribeData.SCData
         {
             /// <summary>
             /// 唯一标识符
@@ -63,7 +64,10 @@ namespace YSAI.S7.client
             {
                 Basics? obj = Obj as Basics;
                 if (obj == null) return false;
-                if (obj.SN == SN && 
+                if (SleepTime == obj.SleepTime &&
+                    SameDataOut == obj.SameDataOut &&
+                    DataChangeOut == obj.DataChangeOut &&
+                    obj.SN == SN &&
                     obj.Ip == Ip &&
                     obj.Port == Port &&
                     obj.S7CpuType == S7CpuType &&

+ 80 - 1
src/YSAI.DAQ/YSAI.S7/client/S7ClientOperate.cs

@@ -2,6 +2,7 @@
 using S7.Net;
 using S7.Net.Types;
 using StackExchange.Redis;
+using System;
 using System.Collections.Concurrent;
 using System.Net.Sockets;
 using System.Runtime.CompilerServices;
@@ -9,6 +10,7 @@ using System.Text;
 using YSAI.Core.data;
 using YSAI.Core.@interface.only;
 using YSAI.Core.@interface.unify;
+using YSAI.Core.subscription;
 using YSAI.Core.virtualAddress;
 using YSAI.Log;
 using YSAI.Unility;
@@ -18,7 +20,7 @@ namespace YSAI.S7.client
     /// <summary>
     /// s7操作
     /// </summary>
-    public sealed class S7ClientOperate : IBaseAbstract, IS7Client
+    public sealed class S7ClientOperate : IBaseAbstract<SubscribeData.EventParam>, IS7Client
     {
         private static readonly object Lock = new object();  //锁
         private static List<S7ClientOperate> ThisObjList = new List<S7ClientOperate>(); //自身对象集合
@@ -287,5 +289,82 @@ namespace YSAI.S7.client
                 return Break("Write", false, ex.Message);
             }
         }
+
+
+        /// <summary>
+        /// 实现订阅功能
+        /// </summary>
+        private SubscribeOperate subscribeOperate;
+
+        public OperateResult Subscribe(Address address)
+        {
+            Depart("Subscribe");
+            try
+            {
+                if (subscribeOperate == null)
+                {
+                    subscribeOperate = SubscribeOperate.Instance(new SubscribeData.Basics()
+                    {
+                        Address = address,
+                        DataChangeOut = basics.DataChangeOut,
+                        Function = Read,
+                        SameDataOut = basics.SameDataOut,
+                        SleepTime = basics.SleepTime
+                    });
+                    subscribeOperate.OnEvent += SubscribeOperate_OnEvent;
+                    OperateResult operateResult = subscribeOperate.On();
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+                else
+                {
+                    OperateResult operateResult = subscribeOperate.Subscribe(address);
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+            }
+            catch (Exception ex)
+            {
+                return Break("Subscribe", false, ex.Message);
+            }
+        }
+        public Task<OperateResult> SubscribeAsync(Address address)
+        {
+            return Task.Run(() => Subscribe(address));
+        }
+
+        public OperateResult UnSubscribe(Address address)
+        {
+            Depart("UnSubscribe");
+            try
+            {
+                if (subscribeOperate != null)
+                {
+                    OperateResult operateResult = subscribeOperate.UnSubscribe(address);
+                    return Break("Subscribe", operateResult.State, operateResult.Message);
+                }
+                else
+                {
+                    return Break("UnSubscribe", false, "当前尚未订阅");
+                }
+            }
+            catch (Exception ex)
+            {
+                return Break("UnSubscribe", false, ex.Message);
+            }
+        }
+
+        public Task<OperateResult> UnSubscribeAsync(Address address)
+        {
+            return Task.Run(() => UnSubscribe(address));
+        }
+
+        /// <summary>
+        /// 事件抛出
+        /// </summary>
+        /// <param name="sender">自定义订阅对象</param>
+        /// <param name="e">返回的参数</param>
+        private void SubscribeOperate_OnEvent(object? sender, SubscribeData.EventParam e)
+        {
+            OnEventHandler?.Invoke(this, e);
+        }
     }
 }

+ 3 - 3
src/YSAI.DAQ/YSAI.Test/TestAll.cs

@@ -396,8 +396,8 @@ namespace YSAI.Test
                     //Console.WriteLine("回车启动订阅模式");
                     //Console.Read();
 
-                    ConcurrentDictionary<string, List<AddressDetails>> pairs = new ConcurrentDictionary<string, List<AddressDetails>>();
-                    pairs.TryAdd("test", address.AddressArray);
+                    ConcurrentDictionary<string, Address> pairs = new ConcurrentDictionary<string, Address>();
+                    pairs.TryAdd("test", address);
 
                     operateResult = opcUaClientOperate.AddSubscribe(pairs);
                     Console.WriteLine(operateResult.Message);
@@ -405,7 +405,7 @@ namespace YSAI.Test
                     Console.WriteLine("5秒后将移除订阅");
                     Thread.Sleep(5000);
 
-                    operateResult = opcUaClientOperate.RemoveSubscribe(address.AddressArray);
+                    operateResult = opcUaClientOperate.RemoveSubscribe(address);
                     Console.WriteLine(operateResult.Message);
 
                     operateResult = opcUaClientOperate.Off();