|
|
@@ -9,6 +9,7 @@ using YSAI.Core.@interface.only;
|
|
|
using YSAI.Core.@interface.unify;
|
|
|
using YSAI.Log;
|
|
|
using YSAI.Unility;
|
|
|
+using static Confluent.Kafka.ConfigPropertyNames;
|
|
|
|
|
|
namespace YSAI.Kafka
|
|
|
{
|
|
|
@@ -54,66 +55,19 @@ namespace YSAI.Kafka
|
|
|
public KafkaOperate(KafkaData.Basics basics)
|
|
|
{
|
|
|
this.basics = basics;
|
|
|
-
|
|
|
- if (producerConfig == null)
|
|
|
- {
|
|
|
- //创建配置
|
|
|
- producerConfig = new ProducerConfig()
|
|
|
- {
|
|
|
- ClientId = Guid.NewGuid().ToString(),
|
|
|
- BootstrapServers = basics.BootstrapServers
|
|
|
- //,
|
|
|
- //SecurityProtocol = basics.SecurityProtocol,
|
|
|
- //SaslMechanism = basics.SaslMechanism,
|
|
|
- //SaslKerberosServiceName = basics.SaslKerberosServiceName,
|
|
|
- //SaslKerberosKeytab = basics.SaslKerberosKeytab,
|
|
|
- //SaslKerberosPrincipal = basics.SaslKerberosPrincipal
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- if (consumerConfig == null)
|
|
|
- {
|
|
|
- //创建配置
|
|
|
- consumerConfig = new ConsumerConfig()
|
|
|
- {
|
|
|
- BootstrapServers = basics.BootstrapServers,
|
|
|
- GroupId = Guid.NewGuid().ToString(),
|
|
|
- AutoOffsetReset = basics.AutoOffsetReset
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- if (adminClientConfig == null)
|
|
|
- {
|
|
|
- //创建配置
|
|
|
- adminClientConfig = new AdminClientConfig()
|
|
|
- {
|
|
|
- BootstrapServers = basics.BootstrapServers,
|
|
|
- SecurityProtocol = basics.SecurityProtocol
|
|
|
- };
|
|
|
- }
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
/// <summary>
|
|
|
/// 生产者配置
|
|
|
/// </summary>
|
|
|
- private ProducerConfig producerConfig { get; set; }
|
|
|
+ private ProducerConfig? producerConfig { get; set; }
|
|
|
/// <summary>
|
|
|
/// 消费者配置
|
|
|
/// </summary>
|
|
|
- private ConsumerConfig consumerConfig { get; set; }
|
|
|
+ private ConsumerConfig? consumerConfig { get; set; }
|
|
|
/// <summary>
|
|
|
/// 客户端管理员配置
|
|
|
/// </summary>
|
|
|
- private AdminClientConfig adminClientConfig { get; set; }
|
|
|
-
|
|
|
- public void Dispose()
|
|
|
- {
|
|
|
- GC.Collect();
|
|
|
- GC.SuppressFinalize(this);
|
|
|
- ThisObjList.Remove(this);
|
|
|
- }
|
|
|
-
|
|
|
+ private AdminClientConfig? adminClientConfig { get; set; }
|
|
|
/// <summary>
|
|
|
/// 主题集合
|
|
|
/// </summary>
|
|
|
@@ -123,10 +77,23 @@ namespace YSAI.Kafka
|
|
|
/// </summary>
|
|
|
private IConsumer<object, object>? Consumer = null;
|
|
|
/// <summary>
|
|
|
+ /// 是否打开
|
|
|
+ /// </summary>
|
|
|
+ private bool IsOpen { get; set; }
|
|
|
+ /// <summary>
|
|
|
/// 轮询状态
|
|
|
/// </summary>
|
|
|
private bool PollingState = false;
|
|
|
-
|
|
|
+ /// <summary>
|
|
|
+ /// 释放
|
|
|
+ /// </summary>
|
|
|
+ public void Dispose()
|
|
|
+ {
|
|
|
+ Off();
|
|
|
+ GC.Collect();
|
|
|
+ GC.SuppressFinalize(this);
|
|
|
+ ThisObjList.Remove(this);
|
|
|
+ }
|
|
|
/// <summary>
|
|
|
/// 轮询消费
|
|
|
/// </summary>
|
|
|
@@ -168,30 +135,38 @@ namespace YSAI.Kafka
|
|
|
Depart("CreateTopics");
|
|
|
try
|
|
|
{
|
|
|
- using (IAdminClient adminClient = new AdminClientBuilder(adminClientConfig).Build())
|
|
|
+ if (IsOpen)
|
|
|
{
|
|
|
- //先获取kafka已存在的主题
|
|
|
- Metadata metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(5)); //超时时间5秒
|
|
|
- //已存在的主题集合
|
|
|
- List<string> adnimTopics = new List<string>();
|
|
|
- foreach (var item in metadata.Topics) { adnimTopics.Add(item.Topic); }
|
|
|
- //得到主题差异集合
|
|
|
- List<string> topicDifferenceArray = Topics.Except(adnimTopics).ToList();
|
|
|
- //差异主题集合
|
|
|
- List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
|
|
|
- //循环添加主题
|
|
|
- foreach (var topic in topicDifferenceArray)
|
|
|
+ using (IAdminClient adminClient = new AdminClientBuilder(adminClientConfig).Build())
|
|
|
{
|
|
|
- topicSpecifications.Add(new TopicSpecification()
|
|
|
+ //先获取kafka已存在的主题
|
|
|
+ Metadata metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(5)); //超时时间5秒
|
|
|
+ //已存在的主题集合
|
|
|
+ List<string> adnimTopics = new List<string>();
|
|
|
+ foreach (var item in metadata.Topics) { adnimTopics.Add(item.Topic); }
|
|
|
+ //得到主题差异集合
|
|
|
+ List<string> topicDifferenceArray = Topics.Except(adnimTopics).ToList();
|
|
|
+ //差异主题集合
|
|
|
+ List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
|
|
|
+ //循环添加主题
|
|
|
+ foreach (var topic in topicDifferenceArray)
|
|
|
{
|
|
|
- Name = topic
|
|
|
- });
|
|
|
+ topicSpecifications.Add(new TopicSpecification()
|
|
|
+ {
|
|
|
+ Name = topic
|
|
|
+ });
|
|
|
+ }
|
|
|
+ //执行创建
|
|
|
+ adminClient.CreateTopicsAsync(topicSpecifications).Wait();
|
|
|
+ //返回
|
|
|
+ return Break("CreateTopics", true, "主题集合创建成功");
|
|
|
}
|
|
|
- //执行创建
|
|
|
- adminClient.CreateTopicsAsync(topicSpecifications).Wait();
|
|
|
- //返回
|
|
|
- return Break("CreateTopics", true, "主题集合创建成功");
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break("CreateTopics", false, "未打开");
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
@@ -205,43 +180,50 @@ namespace YSAI.Kafka
|
|
|
Depart("Produce");
|
|
|
try
|
|
|
{
|
|
|
- //失败的消息
|
|
|
- List<string> MessageFail = new List<string>();
|
|
|
-
|
|
|
- using (IProducer<object, object> producer = new ProducerBuilder<object, object>(producerConfig).Build())
|
|
|
+ if (IsOpen)
|
|
|
{
|
|
|
- //返回结果
|
|
|
- OperateResult? result = null;
|
|
|
- //发布结果
|
|
|
- producer.Produce(Topic, new Message<object, object>() { Key = Key, Value = Content }, Result =>
|
|
|
+ //失败的消息
|
|
|
+ List<string> MessageFail = new List<string>();
|
|
|
+
|
|
|
+ using (IProducer<object, object> producer = new ProducerBuilder<object, object>(producerConfig).Build())
|
|
|
{
|
|
|
- if (Result.Error.Code != ErrorCode.NoError)
|
|
|
+ //返回结果
|
|
|
+ OperateResult? result = null;
|
|
|
+ //发布结果
|
|
|
+ producer.Produce(Topic, new Message<object, object>() { Key = Key, Value = Content }, Result =>
|
|
|
{
|
|
|
- result = Break("Produce", false, $"主题 [ {Topic} ] 发送失败:{Result.Error.Reason}");
|
|
|
+ if (Result.Error.Code != ErrorCode.NoError)
|
|
|
+ {
|
|
|
+ result = Break("Produce", false, $"主题 [ {Topic} ] 发送失败:{Result.Error.Reason}");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ result = Break("Produce", true, $"主题 [ {Topic} ] 发送成功,偏移量:{Result.TopicPartitionOffset}");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ //最多等待时间
|
|
|
+ producer.Flush(new TimeSpan(0, 0, 0, 0, basics.WaitTime));
|
|
|
+ if (result == null)
|
|
|
+ {
|
|
|
+ result = Break("Produce", false, $"主题 [ {Topic} ] 发送失败");
|
|
|
}
|
|
|
- else
|
|
|
+ if (!result.State)
|
|
|
{
|
|
|
- result = Break("Produce", true, $"主题 [ {Topic} ] 发送成功,偏移量:{Result.TopicPartitionOffset}");
|
|
|
+ MessageFail.Add(result.Message);
|
|
|
}
|
|
|
- });
|
|
|
- //最多等待时间
|
|
|
- producer.Flush(new TimeSpan(0, 0, 0, 0, basics.WaitTime));
|
|
|
- if (result == null)
|
|
|
+ }
|
|
|
+ if (MessageFail.Count > 0)
|
|
|
{
|
|
|
- result = Break("Produce", false, $"主题 [ {Topic} ] 发送失败");
|
|
|
+ return Break("Produce", false, $"批量发送失败,存在发送失败数据:{MessageFail.ToJson()}");
|
|
|
}
|
|
|
- if (!result.State)
|
|
|
+ else
|
|
|
{
|
|
|
- MessageFail.Add(result.Message);
|
|
|
+ return Break("Produce", true, $"批量发送成功");
|
|
|
}
|
|
|
}
|
|
|
- if (MessageFail.Count > 0)
|
|
|
- {
|
|
|
- return Break("Produce", false, $"批量发送失败,存在发送失败数据:{MessageFail.ToJson()}");
|
|
|
- }
|
|
|
else
|
|
|
{
|
|
|
- return Break("Produce", true, $"批量发送成功");
|
|
|
+ return Break("Off", false, "未打开");
|
|
|
}
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
@@ -274,7 +256,53 @@ namespace YSAI.Kafka
|
|
|
|
|
|
public OperateResult On()
|
|
|
{
|
|
|
- return Break(Depart("On"), true);
|
|
|
+ Depart("On");
|
|
|
+
|
|
|
+ if (!IsOpen)
|
|
|
+ {
|
|
|
+ if (producerConfig == null)
|
|
|
+ {
|
|
|
+ //创建配置
|
|
|
+ producerConfig = new ProducerConfig()
|
|
|
+ {
|
|
|
+ ClientId = Guid.NewGuid().ToString(),
|
|
|
+ BootstrapServers = basics.BootstrapServers
|
|
|
+ //,
|
|
|
+ //SecurityProtocol = basics.SecurityProtocol,
|
|
|
+ //SaslMechanism = basics.SaslMechanism,
|
|
|
+ //SaslKerberosServiceName = basics.SaslKerberosServiceName,
|
|
|
+ //SaslKerberosKeytab = basics.SaslKerberosKeytab,
|
|
|
+ //SaslKerberosPrincipal = basics.SaslKerberosPrincipal
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ if (consumerConfig == null)
|
|
|
+ {
|
|
|
+ //创建配置
|
|
|
+ consumerConfig = new ConsumerConfig()
|
|
|
+ {
|
|
|
+ BootstrapServers = basics.BootstrapServers,
|
|
|
+ GroupId = Guid.NewGuid().ToString(),
|
|
|
+ AutoOffsetReset = basics.AutoOffsetReset
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ if (adminClientConfig == null)
|
|
|
+ {
|
|
|
+ //创建配置
|
|
|
+ adminClientConfig = new AdminClientConfig()
|
|
|
+ {
|
|
|
+ BootstrapServers = basics.BootstrapServers,
|
|
|
+ SecurityProtocol = basics.SecurityProtocol
|
|
|
+ };
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break("On", false, "已打开");
|
|
|
+ }
|
|
|
+
|
|
|
+ return Break("On", true);
|
|
|
}
|
|
|
|
|
|
public Task<OperateResult> OffAsync()
|
|
|
@@ -284,7 +312,25 @@ namespace YSAI.Kafka
|
|
|
|
|
|
public OperateResult Off()
|
|
|
{
|
|
|
- return Break(Depart("Off"), true);
|
|
|
+ Depart("Off");
|
|
|
+
|
|
|
+ if (IsOpen)
|
|
|
+ {
|
|
|
+
|
|
|
+ producerConfig = null;
|
|
|
+ consumerConfig = null;
|
|
|
+ adminClientConfig = null;
|
|
|
+ Consumer?.Close();
|
|
|
+ Consumer?.Dispose();
|
|
|
+ Consumer = null;
|
|
|
+ TopicArray.Clear();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break("Off", false, "未打开");
|
|
|
+ }
|
|
|
+
|
|
|
+ return Break("Off", true);
|
|
|
}
|
|
|
|
|
|
public Task<OperateResult> ConsumeAsync(string Topic)
|
|
|
@@ -298,35 +344,42 @@ namespace YSAI.Kafka
|
|
|
Depart("Consume");
|
|
|
try
|
|
|
{
|
|
|
- //创建一个实例
|
|
|
- if (Consumer == null)
|
|
|
+ if (IsOpen)
|
|
|
{
|
|
|
- Consumer = new ConsumerBuilder<object, object>(consumerConfig).Build();
|
|
|
- }
|
|
|
+ //创建一个实例
|
|
|
+ if (Consumer == null)
|
|
|
+ {
|
|
|
+ Consumer = new ConsumerBuilder<object, object>(consumerConfig).Build();
|
|
|
+ }
|
|
|
|
|
|
- if (TopicArray.Contains(Topic))
|
|
|
- {
|
|
|
- return Break("Consume", false, $"{Topic} 此主题已订阅");
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- //添加主题
|
|
|
- TopicArray.Add(Topic);
|
|
|
+ if (TopicArray.Contains(Topic))
|
|
|
+ {
|
|
|
+ return Break("Consume", false, $"{Topic} 此主题已订阅");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //添加主题
|
|
|
+ TopicArray.Add(Topic);
|
|
|
|
|
|
- //订阅多个主题,会自动取消订阅之前的主题
|
|
|
- Consumer.Subscribe(TopicArray);
|
|
|
+ //订阅多个主题,会自动取消订阅之前的主题
|
|
|
+ Consumer.Subscribe(TopicArray);
|
|
|
|
|
|
- //当订阅状态为false 调用轮询函数
|
|
|
- if (!PollingState)
|
|
|
- {
|
|
|
- //设置轮询状态
|
|
|
- PollingState = true;
|
|
|
+ //当订阅状态为false 调用轮询函数
|
|
|
+ if (!PollingState)
|
|
|
+ {
|
|
|
+ //设置轮询状态
|
|
|
+ PollingState = true;
|
|
|
|
|
|
- //异步轮询
|
|
|
- Polling();
|
|
|
- }
|
|
|
+ //异步轮询
|
|
|
+ Polling();
|
|
|
+ }
|
|
|
|
|
|
- return Break("Consume", true);
|
|
|
+ return Break("Consume", true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break("Off", false, "未打开");
|
|
|
}
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
@@ -352,38 +405,45 @@ namespace YSAI.Kafka
|
|
|
Depart("UnSubscribe");
|
|
|
try
|
|
|
{
|
|
|
- if (!TopicArray.Contains(Topic))
|
|
|
- {
|
|
|
- return Break("UnSubscribe", false, $"{Topic} 此主题不存在");
|
|
|
- }
|
|
|
- else
|
|
|
+ if (IsOpen)
|
|
|
{
|
|
|
- //移除这个主题
|
|
|
- TopicArray.Remove(Topic);
|
|
|
-
|
|
|
- //主题数量判断
|
|
|
- if (TopicArray.Count > 0)
|
|
|
+ if (!TopicArray.Contains(Topic))
|
|
|
{
|
|
|
- //大于零直接修改订阅
|
|
|
- //订阅多个主题,会自动取消订阅之前的主题
|
|
|
- Consumer.Subscribe(TopicArray);
|
|
|
+ return Break("UnSubscribe", false, $"{Topic} 此主题不存在");
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- //小于或等于零,直接关闭订阅
|
|
|
- //设置轮询状态
|
|
|
- PollingState = false;
|
|
|
- //先关订阅
|
|
|
- Consumer.Unsubscribe();
|
|
|
- //关闭
|
|
|
- Consumer.Close();
|
|
|
- //释放掉这个实例
|
|
|
- Consumer.Dispose();
|
|
|
- //置空
|
|
|
- Consumer = null;
|
|
|
- }
|
|
|
+ //移除这个主题
|
|
|
+ TopicArray.Remove(Topic);
|
|
|
|
|
|
- return Break("UnSubscribe", true);
|
|
|
+ //主题数量判断
|
|
|
+ if (TopicArray.Count > 0)
|
|
|
+ {
|
|
|
+ //大于零直接修改订阅
|
|
|
+ //订阅多个主题,会自动取消订阅之前的主题
|
|
|
+ Consumer.Subscribe(TopicArray);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //小于或等于零,直接关闭订阅
|
|
|
+ //设置轮询状态
|
|
|
+ PollingState = false;
|
|
|
+ //先关订阅
|
|
|
+ Consumer.Unsubscribe();
|
|
|
+ //关闭
|
|
|
+ Consumer.Close();
|
|
|
+ //释放掉这个实例
|
|
|
+ Consumer.Dispose();
|
|
|
+ //置空
|
|
|
+ Consumer = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ return Break("UnSubscribe", true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Break("Off", false, "未打开");
|
|
|
}
|
|
|
}
|
|
|
catch (Exception ex)
|