|
|
@@ -0,0 +1,421 @@
|
|
|
+using DotNetty.Codecs;
|
|
|
+using DotNetty.Handlers.Tls;
|
|
|
+using DotNetty.Transport.Bootstrapping;
|
|
|
+using DotNetty.Transport.Channels;
|
|
|
+using DotNetty.Transport.Channels.Sockets;
|
|
|
+using Newtonsoft.Json;
|
|
|
+using Newtonsoft.Json.Linq;
|
|
|
+using System.Collections.Concurrent;
|
|
|
+using System.Net;
|
|
|
+using System.Net.Security;
|
|
|
+using System.Security.Cryptography.X509Certificates;
|
|
|
+using System.Text;
|
|
|
+using System.Threading.Channels;
|
|
|
+using YSAI.Core.data;
|
|
|
+using YSAI.Core.@interface;
|
|
|
+using YSAI.Log;
|
|
|
+using YSAI.Unility;
|
|
|
+using static YSAI.Netty.client.NettyClientOperate.SecureChatClientHandler;
|
|
|
+
|
|
|
+namespace YSAI.Netty.client
|
|
|
+{
|
|
|
+ public class NettyClientOperate : IBaseAbstract, IRelay
|
|
|
+ {
|
|
|
+ protected override string TAG => "NettyClientOperate";
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 锁
|
|
|
+ /// </summary>
|
|
|
+ private static readonly object Lock = new object();
|
|
|
+ /// <summary>
|
|
|
+ /// 自身对象集合
|
|
|
+ /// </summary>
|
|
|
+ private static List<NettyClientOperate> ThisObjList = new List<NettyClientOperate>();
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 单例模式
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ public static NettyClientOperate Instance(NettyClientData.Basics basics)
|
|
|
+ {
|
|
|
+ if (ThisObjList.Count >= MaxInstanceCount)
|
|
|
+ {
|
|
|
+ throw new Exception(ExceedMaxInstanceCountTips);
|
|
|
+ }
|
|
|
+ NettyClientOperate? exp = ThisObjList.FirstOrDefault(c => c.basics.Comparer(basics).result);
|
|
|
+ if (exp == null)
|
|
|
+ {
|
|
|
+ lock (Lock)
|
|
|
+ {
|
|
|
+ if (ThisObjList.Count(c => c.basics.Comparer(basics).result) > 0)
|
|
|
+ {
|
|
|
+ return ThisObjList.First(c => c.basics.Comparer(basics).result);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ NettyClientOperate exp2 = new NettyClientOperate(basics);
|
|
|
+ ThisObjList.Add(exp2);
|
|
|
+ return exp2;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return exp;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 基础数据
|
|
|
+ /// </summary>
|
|
|
+ private NettyClientData.Basics basics;
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 通信库
|
|
|
+ /// </summary>
|
|
|
+ private Bootstrap Communication;
|
|
|
+ /// <summary>
|
|
|
+ /// 组
|
|
|
+ /// </summary>
|
|
|
+ private MultithreadEventLoopGroup Group;
|
|
|
+ /// <summary>
|
|
|
+ /// 通道
|
|
|
+ /// </summary>
|
|
|
+ private IChannel Channel;
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 订阅的主题项容器
|
|
|
+ /// </summary>
|
|
|
+ private ConcurrentDictionary<string, DateTime> SubIoc = new ConcurrentDictionary<string, DateTime>();
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 构造函数
|
|
|
+ /// </summary>
|
|
|
+ public NettyClientOperate(NettyClientData.Basics basics)
|
|
|
+ {
|
|
|
+ this.basics = basics;
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 释放
|
|
|
+ /// </summary>
|
|
|
+ /// <exception cref="NotImplementedException"></exception>
|
|
|
+ public void Dispose()
|
|
|
+ {
|
|
|
+ Off();
|
|
|
+ GC.Collect();
|
|
|
+ GC.SuppressFinalize(this);
|
|
|
+ ThisObjList.Remove(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult GetStatus()
|
|
|
+ {
|
|
|
+ if (Channel == null)
|
|
|
+ {
|
|
|
+ return Break("GetStatus", false, "未连接");
|
|
|
+ }
|
|
|
+ return Break("GetStatus", true, "已连接");
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> GetStatusAsync()
|
|
|
+ {
|
|
|
+ return Task.Run(() => GetStatus());
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Off()
|
|
|
+ {
|
|
|
+ Depart("Off");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (Channel == null)
|
|
|
+ {
|
|
|
+ return Break("Off", false, "未连接");
|
|
|
+ }
|
|
|
+ Communication = null;
|
|
|
+ Group?.ShutdownGracefullyAsync().Wait();
|
|
|
+ Channel.CloseAsync().Wait();
|
|
|
+ Channel.DisconnectAsync().Wait();
|
|
|
+ Channel = null;
|
|
|
+ return Break("Off", true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("Off", false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> OffAsync()
|
|
|
+ {
|
|
|
+ return Task.Run(() => Off());
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 内部消息
|
|
|
+ /// </summary>
|
|
|
+ private class M
|
|
|
+ {
|
|
|
+ /// <summary>
|
|
|
+ /// 主题
|
|
|
+ /// </summary>
|
|
|
+ public string T { get; set; }
|
|
|
+ /// <summary>
|
|
|
+ /// 内容
|
|
|
+ /// </summary>
|
|
|
+ public string C { get; set; }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 内部消息接收
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="sender"></param>
|
|
|
+ /// <param name="e"></param>
|
|
|
+ protected void OnMe(object? sender, EventResult e)
|
|
|
+ {
|
|
|
+ if (e.State)
|
|
|
+ {
|
|
|
+ //收到数据
|
|
|
+ if (SubIoc.Count > 0)
|
|
|
+ {
|
|
|
+ Info? info = e.RData as Info;
|
|
|
+ if (info != null && !string.IsNullOrEmpty(info.Msg))
|
|
|
+ {
|
|
|
+ string Con = info.Msg;
|
|
|
+ string? Terminal = info.Context.Channel.RemoteAddress.ToString();
|
|
|
+ if (Con.IsJson())
|
|
|
+ {
|
|
|
+ M? msg = Con.JsonToObject<M>();
|
|
|
+ if (SubIoc.ContainsKey(msg.T))
|
|
|
+ {
|
|
|
+ string Topic = msg.T;
|
|
|
+ string Content = msg.C;
|
|
|
+
|
|
|
+ if (Content.IsJson())
|
|
|
+ {
|
|
|
+ OnEventHandler(this, new EventResult(true, $"{TAG} 接收到主题 ( {Topic} ) 内容 ( {Content} )", "{" + $"\"Topic\":\"{Topic}\",\"Content\":{Content},\"Terminal\":\"{Terminal}\"" + "}", Core.@enum.ResultType.Dynamic));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ OnEventHandler(this, new EventResult(true, $"{TAG} 接收到主题 ( {Topic} ) 内容 ( {Content} )", "{" + $"\"Topic\":\"{Topic}\",\"Content\":\"{Content}\",\"Terminal\":\"{Terminal}\"" + "}", Core.@enum.ResultType.Dynamic));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //出现异常
|
|
|
+ OnEventHandler(this, new EventResult(false, $"订阅异常:{e.Message}"));
|
|
|
+ Off();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 消息接收处理
|
|
|
+ /// </summary>
|
|
|
+ public class SecureChatClientHandler : SimpleChannelInboundHandler<string>
|
|
|
+ {
|
|
|
+ /// <summary>
|
|
|
+ /// 消息事件
|
|
|
+ /// </summary>
|
|
|
+ Action<object?, EventResult> MessageEvent;
|
|
|
+ /// <summary>
|
|
|
+ /// 构造函数
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="MessageEvent">息事件</param>
|
|
|
+ public SecureChatClientHandler(Action<object?, EventResult> MessageEvent)
|
|
|
+ {
|
|
|
+ this.MessageEvent = MessageEvent;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 信息
|
|
|
+ /// </summary>
|
|
|
+ public class Info
|
|
|
+ {
|
|
|
+ /// <summary>
|
|
|
+ /// 通道处理程序上下文
|
|
|
+ /// </summary>
|
|
|
+ public IChannelHandlerContext Context { get; set; }
|
|
|
+ /// <summary>
|
|
|
+ /// 信息
|
|
|
+ /// </summary>
|
|
|
+ public string Msg { get; set; }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 通道消息
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="contex">通道处理程序上下文</param>
|
|
|
+ /// <param name="msg">消息</param>
|
|
|
+ protected override void ChannelRead0(IChannelHandlerContext contex, string msg)
|
|
|
+ {
|
|
|
+ //消息抛出
|
|
|
+ MessageEvent?.Invoke(this, new EventResult(true, msg, new Info { Context = contex, Msg = msg }, Core.@enum.ResultType.Dynamic));
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 出现了异常
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="contex">通道处理程序上下文</param>
|
|
|
+ /// <param name="e">异常信息</param>
|
|
|
+ public override void ExceptionCaught(IChannelHandlerContext contex, Exception ex)
|
|
|
+ {
|
|
|
+ //消息抛出
|
|
|
+ MessageEvent?.Invoke(this, new EventResult(false, ex.Message));
|
|
|
+
|
|
|
+ //通道关闭
|
|
|
+ contex.CloseAsync();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult On()
|
|
|
+ {
|
|
|
+ Depart("On");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (Channel != null)
|
|
|
+ {
|
|
|
+ return Break("On", false, "已连接");
|
|
|
+ }
|
|
|
+ //证书
|
|
|
+ X509Certificate2? Cert = null;
|
|
|
+ //证书目标主机
|
|
|
+ string? CertTargetHost = string.Empty;
|
|
|
+ //如果当证书不为空
|
|
|
+ if (!string.IsNullOrWhiteSpace(basics.SslFilePath) && !string.IsNullOrWhiteSpace(basics.SslFilePassword))
|
|
|
+ {
|
|
|
+ Cert = new X509Certificate2(basics.SslFilePath, basics.SslFilePassword);
|
|
|
+ CertTargetHost = Cert.GetNameInfo(X509NameType.DnsName, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ //实例化通信库
|
|
|
+ Communication = new Bootstrap();
|
|
|
+ //实例化组
|
|
|
+ Group = new MultithreadEventLoopGroup();
|
|
|
+ //走TCP通道
|
|
|
+ Communication
|
|
|
+ .Group(Group) //创建一个组
|
|
|
+ .Channel<TcpSocketChannel>() //创建一个通道
|
|
|
+ .Option(ChannelOption.TcpNodelay, true) //TCP节点布局
|
|
|
+ .Handler(new ActionChannelInitializer<ISocketChannel>(channel => { //处理
|
|
|
+
|
|
|
+ IChannelPipeline pipeline = channel.Pipeline;
|
|
|
+ if (Cert != null)
|
|
|
+ {
|
|
|
+ pipeline.AddLast(new TlsHandler(stream => new SslStream(stream, true, (sender, certificate, chain, errors) => true), new ClientTlsSettings(CertTargetHost)));
|
|
|
+ }
|
|
|
+ pipeline.AddLast(new DelimiterBasedFrameDecoder(8192, Delimiters.LineDelimiter()));
|
|
|
+ //这里就是启动订阅
|
|
|
+ pipeline.AddLast(new StringEncoder(), new StringDecoder(), new SecureChatClientHandler(OnMe));
|
|
|
+
|
|
|
+ }));
|
|
|
+
|
|
|
+ //连接
|
|
|
+ Channel = Communication.ConnectAsync(new IPEndPoint(IPAddress.Parse(basics.Host), basics.Port)).Result;
|
|
|
+
|
|
|
+ return Break("On", true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("On", false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> OnAsync()
|
|
|
+ {
|
|
|
+ return Task.Run(() => On());
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Produce(string Topic, string Content)
|
|
|
+ {
|
|
|
+ Depart("Produce");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (Channel == null)
|
|
|
+ {
|
|
|
+ return Break("Produce", false, "未连接");
|
|
|
+ }
|
|
|
+
|
|
|
+ //内部消息组织
|
|
|
+ string msg = string.Empty;
|
|
|
+ if (Content.IsJson())
|
|
|
+ {
|
|
|
+ msg = "{" + string.Format("\"T\":\"{0}\",\"C\":{1}", Topic, Content) + "}\r\n";
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ msg = "{" + string.Format("\"T\":\"{0}\",\"C\":\"{1}\"", Topic, Content) + "}\r\n";
|
|
|
+ }
|
|
|
+ Channel.WriteAndFlushAsync(msg);
|
|
|
+ //延时
|
|
|
+ //TimeTool.DelayUs(0.2);
|
|
|
+ return Break("Produce", true);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("Produce", false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> ProduceAsync(string Topic, string Content)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Produce(Topic, Content));
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Subscribe(string Topic)
|
|
|
+ {
|
|
|
+ Depart("Subscribe");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (Channel == null)
|
|
|
+ {
|
|
|
+ return Break("Subscribe", false, "未连接");
|
|
|
+ }
|
|
|
+ if (SubIoc.ContainsKey(Topic))
|
|
|
+ {
|
|
|
+ return Break("Subscribe", false, "主题已存在");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ SubIoc.AddOrUpdate(Topic, DateTime.Now, (k, v) => DateTime.Now);
|
|
|
+ return Break("Subscribe", true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("Subscribe", false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> SubscribeAsync(string Topic)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Subscribe(Topic));
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult UnSubscribe(string Topic)
|
|
|
+ {
|
|
|
+ Depart("UnSubscribe");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (Channel == null)
|
|
|
+ {
|
|
|
+ return Break("UnSubscribe", false, "未连接");
|
|
|
+ }
|
|
|
+ if (SubIoc.ContainsKey(Topic))
|
|
|
+ {
|
|
|
+ SubIoc.Remove(Topic, out _);
|
|
|
+ return Break("UnSubscribe", true);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break("UnSubscribe", false, "主题不存在");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("UnSubscribe", false, ex.Message, Exception: ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> UnSubscribeAsync(string Topic)
|
|
|
+ {
|
|
|
+ return Task.Run(() => UnSubscribe(Topic));
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|