|
|
@@ -0,0 +1,545 @@
|
|
|
+using NetMQ;
|
|
|
+using NetMQ.Sockets;
|
|
|
+using System.Collections.Concurrent;
|
|
|
+using System.Reflection;
|
|
|
+using YSAI.Core.attribute;
|
|
|
+using YSAI.Core.data;
|
|
|
+using YSAI.Core.@interface;
|
|
|
+using YSAI.Unility;
|
|
|
+
|
|
|
+namespace YSAI.NetMQ
|
|
|
+{
|
|
|
+ /// <summary>
|
|
|
+ /// NetMQ 是轻量级消息传递库 ZeroMQ 的 100% 原生 C# 端
|
|
|
+ /// </summary>
|
|
|
+ public class NetMQOperate : IBaseAbstract, IRelay
|
|
|
+ {
|
|
|
+ protected override string TAG => "NetMQClientOperate";
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 锁
|
|
|
+ /// </summary>
|
|
|
+ private static readonly object Lock = new object();
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 自身对象集合
|
|
|
+ /// </summary>
|
|
|
+ private static List<NetMQOperate> ThisObjList = new List<NetMQOperate>();
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 单例模式
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ public static NetMQOperate Instance(NetMQData.Basics basics)
|
|
|
+ {
|
|
|
+ if (ThisObjList.Count >= MaxInstanceCount)
|
|
|
+ {
|
|
|
+ throw new Exception(ExceedMaxInstanceCountTips);
|
|
|
+ }
|
|
|
+ NetMQOperate? exp = ThisObjList.FirstOrDefault(c => c.basics.Comparer(basics).result);
|
|
|
+ if (exp == null)
|
|
|
+ {
|
|
|
+ lock (Lock)
|
|
|
+ {
|
|
|
+ if (ThisObjList.Count(c => c.basics.Comparer(basics).result) > 0)
|
|
|
+ {
|
|
|
+ return ThisObjList.First(c => c.basics.Comparer(basics).result);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ NetMQOperate exp2 = new NetMQOperate(basics);
|
|
|
+ ThisObjList.Add(exp2);
|
|
|
+ return exp2;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return exp;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 基础数据
|
|
|
+ /// </summary>
|
|
|
+ private NetMQData.Basics basics;
|
|
|
+
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 构造函数
|
|
|
+ /// </summary>
|
|
|
+ public NetMQOperate(NetMQData.Basics basics)
|
|
|
+ {
|
|
|
+ this.basics = basics;
|
|
|
+ }
|
|
|
+
|
|
|
+ public NetMQOperate()
|
|
|
+ { }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 发布者;
|
|
|
+ /// 服务端
|
|
|
+ /// </summary>
|
|
|
+ private PublisherSocket pSocket;
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 订阅对象IOC
|
|
|
+ /// </summary>
|
|
|
+ private ConcurrentDictionary<string, SubObj> SubIoc;
|
|
|
+
|
|
|
+
|
|
|
+ class SubObj
|
|
|
+ {
|
|
|
+ /// <summary>
|
|
|
+ /// 订阅对象
|
|
|
+ /// </summary>
|
|
|
+ public SubscriberSocket socket { get; set; }
|
|
|
+ /// <summary>
|
|
|
+ /// 任务对象
|
|
|
+ /// </summary>
|
|
|
+ public Task task { get; set; }
|
|
|
+ /// <summary>
|
|
|
+ /// 令牌
|
|
|
+ /// </summary>
|
|
|
+ public CancellationTokenSource token { get; set; }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 获取启动类型
|
|
|
+ /// 2:发布模式
|
|
|
+ /// 3:又订阅又发布
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ private int GType()
|
|
|
+ {
|
|
|
+ if (!string.IsNullOrWhiteSpace(basics.SAddress) && !string.IsNullOrWhiteSpace(basics.PAddress))
|
|
|
+ {
|
|
|
+ return 3;
|
|
|
+ }
|
|
|
+ if (string.IsNullOrWhiteSpace(basics.SAddress) && !string.IsNullOrWhiteSpace(basics.PAddress))
|
|
|
+ {
|
|
|
+ return 2;
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 释放
|
|
|
+ /// </summary>
|
|
|
+ /// <exception cref="NotImplementedException"></exception>
|
|
|
+ public void Dispose()
|
|
|
+ {
|
|
|
+ Off();
|
|
|
+ GC.Collect();
|
|
|
+ GC.SuppressFinalize(this);
|
|
|
+ ThisObjList.Remove(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult On()
|
|
|
+ {
|
|
|
+ string SN = Depart("On");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ int type = GType();
|
|
|
+ switch (type)
|
|
|
+ {
|
|
|
+ case 2: //发布模式(服务端)
|
|
|
+ case 3: //订阅发布模式(客户端服务端)
|
|
|
+ if (pSocket != null)
|
|
|
+ {
|
|
|
+ return Break(SN, false, type == 3 ? "发布订阅模式已打开" : "发布模式已打开");
|
|
|
+ }
|
|
|
+ //先启动服务端
|
|
|
+ pSocket = new PublisherSocket();
|
|
|
+ pSocket.Options.ReceiveHighWatermark = basics.ReceiveHighWatermark;
|
|
|
+ pSocket.Bind(basics.PAddress);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ return Break(SN, true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break(SN, false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> OnAsync()
|
|
|
+ {
|
|
|
+ return Task.Run(() => On());
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Off()
|
|
|
+ {
|
|
|
+ string SN = Depart("Off");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (SubIoc != null)
|
|
|
+ {
|
|
|
+ foreach (var item in SubIoc)
|
|
|
+ {
|
|
|
+ item.Value.token.Cancel();
|
|
|
+ item.Value.task.Wait();
|
|
|
+ item.Value.task.Dispose();
|
|
|
+ item.Value.socket.Close();
|
|
|
+ item.Value.socket.Dispose();
|
|
|
+ }
|
|
|
+ SubIoc.Clear();
|
|
|
+ SubIoc = null;
|
|
|
+ }
|
|
|
+ int type = GType();
|
|
|
+ switch (type)
|
|
|
+ {
|
|
|
+ case 2: //发布模式(服务端)
|
|
|
+ case 3: //订阅发布模式(客户端服务端)
|
|
|
+ if (pSocket == null)
|
|
|
+ {
|
|
|
+ return Break(SN, false, type == 3 ? "发布订阅模式未打开" : "发布模式未打开");
|
|
|
+ }
|
|
|
+ pSocket.Close();
|
|
|
+ pSocket.Dispose();
|
|
|
+ pSocket = null;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ return Break(SN, true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break(SN, false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> OffAsync()
|
|
|
+ {
|
|
|
+ return Task.Run(() => Off());
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Produce(string Topic, string Content)
|
|
|
+ {
|
|
|
+ string SN = Depart("Produce");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (pSocket == null)
|
|
|
+ {
|
|
|
+ return Break(SN, false, "发布模式未打开");
|
|
|
+ }
|
|
|
+ pSocket.SendMoreFrame(Topic).SendFrame(Content);
|
|
|
+ return Break(SN, true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break(SN, false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> ProduceAsync(string Topic, string Content)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Produce(Topic, Content));
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 订阅消息处理
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="source"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ Task SubMessageHandle(CancellationTokenSource source, SubscriberSocket sSocket, int timeout)
|
|
|
+ {
|
|
|
+ return Task.Factory.StartNew(() =>
|
|
|
+ {
|
|
|
+ while (!source.IsCancellationRequested)
|
|
|
+ {
|
|
|
+ string Content = string.Empty;
|
|
|
+ if (sSocket.TryReceiveFrameString(TimeSpan.FromSeconds(timeout), out string Topic, out bool moreFrames))
|
|
|
+ {
|
|
|
+ if (moreFrames)
|
|
|
+ {
|
|
|
+ Content = sSocket.ReceiveFrameString();
|
|
|
+ if (Content.IsJson())
|
|
|
+ {
|
|
|
+ OnEventHandler(this, new EventResult(true, $"{TAG} 接收到主题 ( {Topic} ) 内容 ( {Content} )", "{" + $"\"Topic\":\"{Topic}\",\"Content\":{Content}" + "}", Core.@enum.ResultType.Dynamic));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ OnEventHandler(this, new EventResult(true, $"{TAG} 接收到主题 ( {Topic} ) 内容 ( {Content} )", "{" + $"\"Topic\":\"{Topic}\",\"Content\":\"{Content}\"" + "}", Core.@enum.ResultType.Dynamic));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, source.Token);
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Subscribe(string Topic)
|
|
|
+ {
|
|
|
+ string SN = Depart("Subscribe");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (SubIoc == null)
|
|
|
+ {
|
|
|
+ SubIoc = new ConcurrentDictionary<string, SubObj>();
|
|
|
+ }
|
|
|
+ //判断是否有此主题的订阅
|
|
|
+ if (!SubIoc.ContainsKey(Topic))
|
|
|
+ {
|
|
|
+ //初始对象
|
|
|
+ SubscriberSocket socket = new SubscriberSocket();
|
|
|
+ socket.Options.ReceiveHighWatermark = basics.ReceiveHighWatermark;
|
|
|
+ socket.Connect(basics.SAddress);
|
|
|
+ //订阅主题
|
|
|
+ socket.Subscribe(Topic);
|
|
|
+ //初始令牌
|
|
|
+ CancellationTokenSource token = new CancellationTokenSource();
|
|
|
+ //启动任务
|
|
|
+ Task task = SubMessageHandle(token, socket, basics.TimeOut);
|
|
|
+ //设置新的对象
|
|
|
+ SubObj subObj = new SubObj()
|
|
|
+ {
|
|
|
+ socket = socket,
|
|
|
+ task = task,
|
|
|
+ token = token
|
|
|
+ };
|
|
|
+ //更新至容器
|
|
|
+ SubIoc.AddOrUpdate(Topic, subObj, (k, v) => subObj);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break(SN, false, "此主题已订阅");
|
|
|
+ }
|
|
|
+ return Break(SN, true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break(SN, false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> SubscribeAsync(string Topic)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Subscribe(Topic));
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult UnSubscribe(string Topic)
|
|
|
+ {
|
|
|
+ string SN = Depart("UnSubscribe");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (SubIoc == null)
|
|
|
+ {
|
|
|
+ return Break(SN, false, "尚未存在订阅数据");
|
|
|
+ }
|
|
|
+ //判断是否有此主题的订阅
|
|
|
+ if (SubIoc.ContainsKey(Topic))
|
|
|
+ {
|
|
|
+ if (SubIoc.Remove(Topic, out SubObj subObj))
|
|
|
+ {
|
|
|
+ subObj.token.Cancel();
|
|
|
+ subObj.task.Wait();
|
|
|
+ subObj.task.Dispose();
|
|
|
+ subObj.socket.Close();
|
|
|
+ subObj.socket.Dispose();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break(SN, false, "取消订阅失败");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break(SN, false, "此主题尚未订阅");
|
|
|
+ }
|
|
|
+ return Break(SN, true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break(SN, false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> UnSubscribeAsync(string Topic)
|
|
|
+ {
|
|
|
+ return Task.Run(() => UnSubscribe(Topic));
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult GetStatus()
|
|
|
+ {
|
|
|
+ string SN = Depart("GetStatus");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ switch (GType())
|
|
|
+ {
|
|
|
+ case 2: //发布模式(服务端)
|
|
|
+ if (pSocket == null)
|
|
|
+ {
|
|
|
+ return Break(SN, false, "发布模式未打开");
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case 3: //订阅发布模式(客户端服务端)
|
|
|
+ if (pSocket == null)
|
|
|
+ {
|
|
|
+ return Break(SN, false, "发布订阅模式未打开");
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return Break(SN, true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break(SN, false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> GetStatusAsync()
|
|
|
+ {
|
|
|
+ return Task.Run(() => GetStatus());
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult GetParam()
|
|
|
+ {
|
|
|
+ string SN = Depart("GetParam");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //通过反射得到参数信息
|
|
|
+ List<ReflexTool.LibInstanceParam>? libInstanceParams = ReflexTool.GetClassAllPropertyData<NetMQData.Basics>();
|
|
|
+ //名称
|
|
|
+ string name = TAG.Replace("Operate", string.Empty).Replace("Client", string.Empty);
|
|
|
+ //命名空间
|
|
|
+ string nameSpace = "YSAI.Netty.NettyClientOperate";
|
|
|
+ //对象实例
|
|
|
+ NetMQData.Basics basics = new NetMQData.Basics();
|
|
|
+ //参数结构体
|
|
|
+ ParamStructure? paramStructure = new ParamStructure()
|
|
|
+ {
|
|
|
+ Name = name,
|
|
|
+ Description = name,
|
|
|
+ Subset = new List<ParamStructure.subset>
|
|
|
+ {
|
|
|
+ new ParamStructure.subset
|
|
|
+ {
|
|
|
+ Description = name,
|
|
|
+ Name = name,
|
|
|
+ Propertie = new List<ParamStructure.subset.propertie>()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ paramStructure.Subset[0].Propertie.Add(new ParamStructure.subset.propertie
|
|
|
+ {
|
|
|
+ PropertyName = "ServiceName",
|
|
|
+ Description = "命名空间",
|
|
|
+ Show = false,
|
|
|
+ Use = false,
|
|
|
+ Default = nameSpace,
|
|
|
+ DataCate = null
|
|
|
+ });
|
|
|
+ paramStructure.Subset[0].Propertie.Add(new ParamStructure.subset.propertie
|
|
|
+ {
|
|
|
+ PropertyName = "Topic",
|
|
|
+ Description = "主题",
|
|
|
+ Show = true,
|
|
|
+ Use = true,
|
|
|
+ Default = "test",
|
|
|
+ DataCate = ParamStructure.dataCate.text
|
|
|
+ });
|
|
|
+ foreach (var lib in libInstanceParams)
|
|
|
+ {
|
|
|
+ //默认值
|
|
|
+ string Default = ReflexTool.GetModelValue(lib.Name, basics);
|
|
|
+ //前端展示特性
|
|
|
+ DisplayAttribute? displayAttribute = typeof(NetMQData.Basics).GetProperty(lib.Name).GetCustomAttribute<DisplayAttribute>();
|
|
|
+ //验证特性
|
|
|
+ VerifyAttribute? verifyAttribute = typeof(NetMQData.Basics).GetProperty(lib.Name).GetCustomAttribute<VerifyAttribute>();
|
|
|
+ //单位特性
|
|
|
+ UnitAttribute? unitAttribute = typeof(NetMQData.Basics).GetProperty(lib.Name).GetCustomAttribute<UnitAttribute>();
|
|
|
+ //描述上加单位
|
|
|
+ string Describe = lib.Describe;
|
|
|
+ if (unitAttribute != null && !string.IsNullOrWhiteSpace(unitAttribute.Unit))
|
|
|
+ {
|
|
|
+ Describe += $"({unitAttribute.Unit})";
|
|
|
+ }
|
|
|
+
|
|
|
+ ParamStructure.subset.propertie propertie = new ParamStructure.subset.propertie
|
|
|
+ {
|
|
|
+ PropertyName = lib.Name,
|
|
|
+ Description = Describe,
|
|
|
+
|
|
|
+ Show = displayAttribute?.Show ?? false,
|
|
|
+ Use = displayAttribute?.Use ?? false,
|
|
|
+ MustFillIn = displayAttribute?.MustFillIn ?? false,
|
|
|
+ DataCate = displayAttribute?.DataCate ?? null,
|
|
|
+
|
|
|
+ Regex = verifyAttribute?.Regex ?? null,
|
|
|
+ FailTips = verifyAttribute?.FailTips ?? null,
|
|
|
+
|
|
|
+ Default = Default
|
|
|
+ };
|
|
|
+ switch (displayAttribute?.DataCate)
|
|
|
+ {
|
|
|
+ case ParamStructure.dataCate.select:
|
|
|
+ propertie.Options = new List<ParamStructure.subset.propertie.options>();
|
|
|
+ foreach (var val in lib.EnumArray as List<dynamic>)
|
|
|
+ {
|
|
|
+ string des = val.Describe;
|
|
|
+ if (!string.IsNullOrEmpty(des))
|
|
|
+ {
|
|
|
+ des = $"({val.Describe})";
|
|
|
+ }
|
|
|
+ propertie.Options.Add(new ParamStructure.subset.propertie.options
|
|
|
+ {
|
|
|
+ Key = val.Name + des,
|
|
|
+ Value = val.Value,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case ParamStructure.dataCate.radio:
|
|
|
+ propertie.Options = new List<ParamStructure.subset.propertie.options>();
|
|
|
+ propertie.Options.Add(new ParamStructure.subset.propertie.options
|
|
|
+ {
|
|
|
+ Key = "是",
|
|
|
+ Value = true,
|
|
|
+ });
|
|
|
+ propertie.Options.Add(new ParamStructure.subset.propertie.options
|
|
|
+ {
|
|
|
+ Key = "否",
|
|
|
+ Value = false,
|
|
|
+ });
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ paramStructure.Subset[0].Propertie.Add(propertie);
|
|
|
+ }
|
|
|
+ return Break(SN, true, paramStructure.ToJson().JsonFormatting(), paramStructure, Core.@enum.ResultType.Object);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break(SN, false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> GetParamAsync()
|
|
|
+ {
|
|
|
+ return Task.Run(() => GetParam());
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult CreateInstance<T>(T Basics)
|
|
|
+ {
|
|
|
+ string SN = Depart("CreateInstance");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //先判断对象类型是否一致
|
|
|
+ if (typeof(T).FullName.Equals(typeof(NetMQData.Basics).FullName))
|
|
|
+ {
|
|
|
+ return Break(SN, true, RData: Instance(Basics as NetMQData.Basics));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break(SN, false, "对象类型错误,无法创建实例");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break(SN, false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> CreateInstanceAsync<T>(T Basics)
|
|
|
+ {
|
|
|
+ return Task.Run(() => CreateInstance(Basics));
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|