|
|
@@ -44,12 +44,6 @@ namespace Ys.Scada.Kafka
|
|
|
}
|
|
|
catch (ConsumeException e) when (_kafkaOptions.RetriableErrorCodes.Contains(e.Error.Code))
|
|
|
{
|
|
|
- var logArgs = new LogMessageEventArgs
|
|
|
- {
|
|
|
- LogType = MqLogType.ConsumeRetries,
|
|
|
- Reason = e.Error.ToString()
|
|
|
- };
|
|
|
- OnLogCallback!(logArgs);
|
|
|
continue;
|
|
|
}
|
|
|
if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue;
|
|
|
@@ -67,9 +61,7 @@ namespace Ys.Scada.Kafka
|
|
|
public void Connect()
|
|
|
{
|
|
|
if (_consumerClient != null) return;
|
|
|
-
|
|
|
ConnectionLock.Wait();
|
|
|
-
|
|
|
try
|
|
|
{
|
|
|
if (_consumerClient == null)
|
|
|
@@ -98,12 +90,6 @@ namespace Ys.Scada.Kafka
|
|
|
}
|
|
|
private void ConsumerClient_OnConsumeError(IConsumer<string, byte[]> consumer, Error e)
|
|
|
{
|
|
|
- var logArgs = new LogMessageEventArgs
|
|
|
- {
|
|
|
- LogType = MqLogType.ServerConnError,
|
|
|
- Reason = $"An error occurred during connect kafka --> {e.Reason}"
|
|
|
- };
|
|
|
- OnLogCallback!(logArgs);
|
|
|
}
|
|
|
public void Dispose()
|
|
|
{
|