|
|
@@ -0,0 +1,400 @@
|
|
|
+using Confluent.Kafka;
|
|
|
+using Confluent.Kafka.Admin;
|
|
|
+using Microsoft.IdentityModel.Tokens;
|
|
|
+using Newtonsoft.Json.Linq;
|
|
|
+using System.Collections.Concurrent;
|
|
|
+using System.Data;
|
|
|
+using YSAI.Core.data;
|
|
|
+using YSAI.Core.@interface.only;
|
|
|
+using YSAI.Core.@interface.unify;
|
|
|
+using YSAI.Log;
|
|
|
+using YSAI.Unility;
|
|
|
+
|
|
|
+namespace YSAI.Kafka
|
|
|
+{
|
|
|
+ public sealed class KafkaOperate : IBaseAbstract, IKafka, IRelay
|
|
|
+ {
|
|
|
+ protected override string LogHead => "[ KafkaOperate 操作 ]";
|
|
|
+ protected override string ClassName => "KafkaOperate";
|
|
|
+ private static readonly object Lock = new object(); //锁
|
|
|
+ private static List<KafkaOperate> ThisObjList = new List<KafkaOperate>(); //自身对象集合
|
|
|
+ /// <summary>
|
|
|
+ /// 单例模式
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ public static KafkaOperate Instance(KafkaData.Basics basics)
|
|
|
+ {
|
|
|
+ KafkaOperate? exp = ThisObjList.FirstOrDefault(c => c.basics.Equals(basics));
|
|
|
+ if (exp == null)
|
|
|
+ {
|
|
|
+ lock (Lock)
|
|
|
+ {
|
|
|
+ if (ThisObjList.Count(c => c.basics.Equals(basics)) > 0)
|
|
|
+ {
|
|
|
+ return ThisObjList.First(c => c.basics.Equals(basics));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ KafkaOperate exp2 = new KafkaOperate(basics);
|
|
|
+ ThisObjList.Add(exp2);
|
|
|
+ return exp2;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return exp;
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 基础数据
|
|
|
+ /// </summary>
|
|
|
+ private KafkaData.Basics basics { get; set; }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 构造函数
|
|
|
+ /// </summary>
|
|
|
+ public KafkaOperate(KafkaData.Basics basics)
|
|
|
+ {
|
|
|
+ this.basics = basics;
|
|
|
+
|
|
|
+ if (producerConfig == null)
|
|
|
+ {
|
|
|
+ //创建配置
|
|
|
+ producerConfig = new ProducerConfig()
|
|
|
+ {
|
|
|
+ ClientId = Guid.NewGuid().ToString(),
|
|
|
+ BootstrapServers = basics.BootstrapServers
|
|
|
+ //,
|
|
|
+ //SecurityProtocol = basics.SecurityProtocol,
|
|
|
+ //SaslMechanism = basics.SaslMechanism,
|
|
|
+ //SaslKerberosServiceName = basics.SaslKerberosServiceName,
|
|
|
+ //SaslKerberosKeytab = basics.SaslKerberosKeytab,
|
|
|
+ //SaslKerberosPrincipal = basics.SaslKerberosPrincipal
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ if (consumerConfig == null)
|
|
|
+ {
|
|
|
+ //创建配置
|
|
|
+ consumerConfig = new ConsumerConfig()
|
|
|
+ {
|
|
|
+ BootstrapServers = basics.BootstrapServers,
|
|
|
+ GroupId = Guid.NewGuid().ToString(),
|
|
|
+ AutoOffsetReset = basics.AutoOffsetReset
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ if (adminClientConfig == null)
|
|
|
+ {
|
|
|
+ //创建配置
|
|
|
+ adminClientConfig = new AdminClientConfig()
|
|
|
+ {
|
|
|
+ BootstrapServers = basics.BootstrapServers,
|
|
|
+ SecurityProtocol = basics.SecurityProtocol
|
|
|
+ };
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 生产者配置
|
|
|
+ /// </summary>
|
|
|
+ private ProducerConfig producerConfig { get; set; }
|
|
|
+ /// <summary>
|
|
|
+ /// 消费者配置
|
|
|
+ /// </summary>
|
|
|
+ private ConsumerConfig consumerConfig { get; set; }
|
|
|
+ /// <summary>
|
|
|
+ /// 客户端管理员配置
|
|
|
+ /// </summary>
|
|
|
+ private AdminClientConfig adminClientConfig { get; set; }
|
|
|
+
|
|
|
+ public void Dispose()
|
|
|
+ {
|
|
|
+ GC.Collect();
|
|
|
+ GC.SuppressFinalize(this);
|
|
|
+ ThisObjList.Remove(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 主题集合
|
|
|
+ /// </summary>
|
|
|
+ private List<string> TopicArray = new List<string>();
|
|
|
+ /// <summary>
|
|
|
+ /// 订阅的消费者
|
|
|
+ /// </summary>
|
|
|
+ private IConsumer<object, object>? Consumer = null;
|
|
|
+ /// <summary>
|
|
|
+ /// 轮询状态
|
|
|
+ /// </summary>
|
|
|
+ private bool PollingState = false;
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 轮询消费
|
|
|
+ /// </summary>
|
|
|
+ /// <returns></returns>
|
|
|
+ private Task Polling()
|
|
|
+ {
|
|
|
+ return Task.Run(() =>
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ while (PollingState)
|
|
|
+ {
|
|
|
+ //超时时间1秒
|
|
|
+ ConsumeResult<object, object>? result = Consumer?.Consume(new TimeSpan(0, 0, 0, 0, basics.WaitTime));
|
|
|
+
|
|
|
+ if (result != null)
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(true, $"消费成功,收到主题:{result.Topic} - Key:{result.Message.Key} - Value:{result.Message.Value}", new KafkaData.ConsumeData<object, object>() { State = true, Key = result.Message.Key, Value = result.Message.Value, Topic = result.Topic, Message = $"收到主题:{result.Topic} - Key:{result.Message.Key} - Value:{result.Message.Value}" }, Core.@enum.ResultType.Dynamic));
|
|
|
+ //消息已处理提交偏移量
|
|
|
+ Consumer?.Commit();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ OnEventHandler?.Invoke(this, new EventResult(false, $"轮询消费异常:{ex.Message}"));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> CreateTopicsAsync(List<string> Topics)
|
|
|
+ {
|
|
|
+ return Task.Run(() => CreateTopics(Topics));
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult CreateTopics(List<string> Topics)
|
|
|
+ {
|
|
|
+ //开始记录运行时间
|
|
|
+ Depart("CreateTopics");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ using (IAdminClient adminClient = new AdminClientBuilder(adminClientConfig).Build())
|
|
|
+ {
|
|
|
+ //先获取kafka已存在的主题
|
|
|
+ Metadata metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(5)); //超时时间5秒
|
|
|
+ //已存在的主题集合
|
|
|
+ List<string> adnimTopics = new List<string>();
|
|
|
+ foreach (var item in metadata.Topics) { adnimTopics.Add(item.Topic); }
|
|
|
+ //得到主题差异集合
|
|
|
+ List<string> topicDifferenceArray = Topics.Except(adnimTopics).ToList();
|
|
|
+ //差异主题集合
|
|
|
+ List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
|
|
|
+ //循环添加主题
|
|
|
+ foreach (var topic in topicDifferenceArray)
|
|
|
+ {
|
|
|
+ topicSpecifications.Add(new TopicSpecification()
|
|
|
+ {
|
|
|
+ Name = topic
|
|
|
+ });
|
|
|
+ }
|
|
|
+ //执行创建
|
|
|
+ adminClient.CreateTopicsAsync(topicSpecifications).Wait();
|
|
|
+ //返回
|
|
|
+ return Break("CreateTopics", true, "主题集合创建成功");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("CreateTopics", false, ex.Message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Produce(string Topic, object Key, object Content)
|
|
|
+ {
|
|
|
+ //开始记录运行时间
|
|
|
+ Depart("Produce");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //失败的消息
|
|
|
+ List<string> MessageFail = new List<string>();
|
|
|
+
|
|
|
+ using (IProducer<object, object> producer = new ProducerBuilder<object, object>(producerConfig).Build())
|
|
|
+ {
|
|
|
+ //返回结果
|
|
|
+ OperateResult? result = null;
|
|
|
+ //发布结果
|
|
|
+ producer.Produce(Topic, new Message<object, object>() { Key = Key, Value = Content }, Result =>
|
|
|
+ {
|
|
|
+ if (Result.Error.Code != ErrorCode.NoError)
|
|
|
+ {
|
|
|
+ result = Break("Produce", false, $"主题 [ {Topic} ] 发送失败:{Result.Error.Reason}");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ result = Break("Produce", true, $"主题 [ {Topic} ] 发送成功,偏移量:{Result.TopicPartitionOffset}");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ //最多等待时间
|
|
|
+ producer.Flush(new TimeSpan(0, 0, 0, 0, basics.WaitTime));
|
|
|
+ if (result == null)
|
|
|
+ {
|
|
|
+ result = Break("Produce", false, $"主题 [ {Topic} ] 发送失败");
|
|
|
+ }
|
|
|
+ if (!result.State)
|
|
|
+ {
|
|
|
+ MessageFail.Add(result.Message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (MessageFail.Count > 0)
|
|
|
+ {
|
|
|
+ return Break("Produce", false, $"批量发送失败,存在发送失败数据:{MessageFail.ToJson()}");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break("Produce", true, $"批量发送成功");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("Produce", false, ex.Message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> ProduceAsync(string Topic, object Key, object Content)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Produce(Topic, Key, Content));
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Produce(string Topic, string Content)
|
|
|
+ {
|
|
|
+ Depart("Produce");
|
|
|
+ OperateResult operateResult = Produce(Topic, null, Content);
|
|
|
+ return Break("Produce", operateResult.State, operateResult.Message);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> ProduceAsync(string Topic, string Content)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Produce(Topic, Content));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> OnAsync()
|
|
|
+ {
|
|
|
+ return Task.Run(() => On());
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult On()
|
|
|
+ {
|
|
|
+ return Break(Depart("On"), true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> OffAsync()
|
|
|
+ {
|
|
|
+ return Task.Run(() => Off());
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Off()
|
|
|
+ {
|
|
|
+ return Break(Depart("Off"), true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> ConsumeAsync(string Topic)
|
|
|
+ {
|
|
|
+ return Task.Run(() => Consume(Topic));
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Consume(string Topic)
|
|
|
+ {
|
|
|
+ //开始记录运行时间
|
|
|
+ Depart("Consume");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ //创建一个实例
|
|
|
+ if (Consumer == null)
|
|
|
+ {
|
|
|
+ Consumer = new ConsumerBuilder<object, object>(consumerConfig).Build();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (TopicArray.Contains(Topic))
|
|
|
+ {
|
|
|
+ return Break("Consume", false, $"{Topic} 此主题已订阅");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //添加主题
|
|
|
+ TopicArray.Add(Topic);
|
|
|
+
|
|
|
+ //订阅多个主题,会自动取消订阅之前的主题
|
|
|
+ Consumer.Subscribe(TopicArray);
|
|
|
+
|
|
|
+ //当订阅状态为false 调用轮询函数
|
|
|
+ if (!PollingState)
|
|
|
+ {
|
|
|
+ //设置轮询状态
|
|
|
+ PollingState = true;
|
|
|
+
|
|
|
+ //异步轮询
|
|
|
+ Polling();
|
|
|
+ }
|
|
|
+
|
|
|
+ return Break("Consume", true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("Consume", false, ex.Message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult Subscribe(string Topic)
|
|
|
+ {
|
|
|
+ Depart("Subscribe");
|
|
|
+ OperateResult operateResult = Consume(Topic);
|
|
|
+ return Break("Subscribe", operateResult.State, operateResult.Message);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> SubscribeAsync(string Topic)
|
|
|
+ {
|
|
|
+ return Task.Run(()=> Subscribe(Topic));
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperateResult UnSubscribe(string Topic)
|
|
|
+ {
|
|
|
+ Depart("UnSubscribe");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (!TopicArray.Contains(Topic))
|
|
|
+ {
|
|
|
+ return Break("UnSubscribe", false, $"{Topic} 此主题不存在");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //移除这个主题
|
|
|
+ TopicArray.Remove(Topic);
|
|
|
+
|
|
|
+ //主题数量判断
|
|
|
+ if (TopicArray.Count > 0)
|
|
|
+ {
|
|
|
+ //大于零直接修改订阅
|
|
|
+ //订阅多个主题,会自动取消订阅之前的主题
|
|
|
+ Consumer.Subscribe(TopicArray);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //小于或等于零,直接关闭订阅
|
|
|
+ //设置轮询状态
|
|
|
+ PollingState = false;
|
|
|
+ //先关订阅
|
|
|
+ Consumer.Unsubscribe();
|
|
|
+ //关闭
|
|
|
+ Consumer.Close();
|
|
|
+ //释放掉这个实例
|
|
|
+ Consumer.Dispose();
|
|
|
+ //置空
|
|
|
+ Consumer = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ return Break("UnSubscribe", true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ return Break("UnSubscribe", false, ex.Message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task<OperateResult> UnSubscribeAsync(string Topic)
|
|
|
+ {
|
|
|
+ return Task.Run(() => UnSubscribe(Topic));
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|