diff --git a/src/NATS.Client.JetStream/INatsJSNotification.cs b/src/NATS.Client.JetStream/INatsJSNotification.cs new file mode 100644 index 000000000..844d4baa2 --- /dev/null +++ b/src/NATS.Client.JetStream/INatsJSNotification.cs @@ -0,0 +1,11 @@ +namespace NATS.Client.JetStream; + +public interface INatsJSNotification +{ + string Name { get; } +} + +public class NatsJSTimeoutNotification : INatsJSNotification +{ + public string Name => "Timeout"; +} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index ec872f50d..20c6d87ea 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -28,6 +28,7 @@ internal class NatsJSConsume : NatsSubBase private readonly INatsDeserialize _serializer; private readonly Timer _timer; private readonly Task _pullTask; + private readonly NatsJSNotificationChannel? _notificationChannel; private readonly long _maxMsgs; private readonly long _expires; @@ -54,6 +55,7 @@ public NatsJSConsume( string consumer, string subject, string? queueGroup, + Func? notificationHandler, INatsDeserialize serializer, NatsSubOpts? opts, CancellationToken cancellationToken) @@ -67,6 +69,11 @@ public NatsJSConsume( _consumer = consumer; _serializer = serializer; + if (notificationHandler is { } handler) + { + _notificationChannel = new NatsJSNotificationChannel(handler, e => _userMsgs?.Writer.TryComplete(e), cancellationToken); + } + _maxMsgs = maxMsgs; _thresholdMsgs = thresholdMsgs; _maxBytes = maxBytes; @@ -96,6 +103,7 @@ public NatsJSConsume( static state => { var self = (NatsJSConsume)state!; + self._notificationChannel?.Notify(new NatsJSTimeoutNotification()); if (self._cancellationToken.IsCancellationRequested) { @@ -167,6 +175,10 @@ public override async ValueTask DisposeAsync() await base.DisposeAsync().ConfigureAwait(false); await _pullTask.ConfigureAwait(false); await _timer.DisposeAsync().ConfigureAwait(false); + if (_notificationChannel != null) + { + await _notificationChannel.DisposeAsync(); + } } internal override IEnumerable GetReconnectCommands(int sid) diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index e528f268d..967860fe6 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -19,6 +19,7 @@ internal class NatsJSFetch : NatsSubBase private readonly INatsDeserialize _serializer; private readonly Timer _hbTimer; private readonly Timer _expiresTimer; + private readonly NatsJSNotificationChannel? _notificationChannel; private readonly long _maxMsgs; private readonly long _maxBytes; @@ -40,8 +41,10 @@ public NatsJSFetch( string consumer, string subject, string? queueGroup, + Func? notificationHandler, INatsDeserialize serializer, - NatsSubOpts? opts) + NatsSubOpts? opts, + CancellationToken cancellationToken) : base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts) { _logger = Connection.Opts.LoggerFactory.CreateLogger>(); @@ -51,6 +54,11 @@ public NatsJSFetch( _consumer = consumer; _serializer = serializer; + if (notificationHandler is { } handler) + { + _notificationChannel = new NatsJSNotificationChannel(handler, e => _userMsgs?.Writer.TryComplete(e), cancellationToken); + } + _maxMsgs = maxMsgs; _maxBytes = maxBytes; _expires = expires.ToNanos(); @@ -84,6 +92,8 @@ public NatsJSFetch( static state => { var self = (NatsJSFetch)state!; + self._notificationChannel?.Notify(new NatsJSTimeoutNotification()); + self.EndSubscription(NatsSubEndReason.IdleHeartbeatTimeout); if (self._debug) { @@ -134,6 +144,10 @@ public override async ValueTask DisposeAsync() await base.DisposeAsync().ConfigureAwait(false); await _hbTimer.DisposeAsync().ConfigureAwait(false); await _expiresTimer.DisposeAsync().ConfigureAwait(false); + if (_notificationChannel != null) + { + await _notificationChannel.DisposeAsync(); + } } internal override IEnumerable GetReconnectCommands(int sid) diff --git a/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs b/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs new file mode 100644 index 000000000..d3eb12a5c --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs @@ -0,0 +1,68 @@ +using System.Threading.Channels; + +namespace NATS.Client.JetStream.Internal; + +internal class NatsJSNotificationChannel : IAsyncDisposable +{ + private readonly Func _notificationHandler; + private readonly Action _exceptionHandler; + private readonly CancellationToken _cancellationToken; + private readonly Channel _channel; + private readonly Task _loop; + + public NatsJSNotificationChannel( + Func notificationHandler, + Action exceptionHandler, + CancellationToken cancellationToken) + { + _notificationHandler = notificationHandler; + _exceptionHandler = exceptionHandler; + _cancellationToken = cancellationToken; + _channel = Channel.CreateBounded(new BoundedChannelOptions(128) + { + AllowSynchronousContinuations = false, + SingleReader = false, + SingleWriter = false, + FullMode = BoundedChannelFullMode.DropOldest, + }); + _loop = Task.Run(NotificationLoop, _cancellationToken); + } + + public void Notify(INatsJSNotification notification) => _channel.Writer.TryWrite(notification); + + public async ValueTask DisposeAsync() + { + _channel.Writer.TryComplete(); + try + { + await _loop; + } + catch (OperationCanceledException) + { + } + } + + private async Task NotificationLoop() + { + try + { + while (await _channel.Reader.WaitToReadAsync(_cancellationToken)) + { + while (_channel.Reader.TryRead(out var notification)) + { + try + { + await _notificationHandler(notification, _cancellationToken); + } + catch (Exception e) + { + _exceptionHandler(e); + } + } + } + } + catch (OperationCanceledException) + { + } + } +} diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 847575800..465b01059 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -62,12 +62,44 @@ public async IAsyncEnumerable> ConsumeAsync( opts ??= _context.Opts.DefaultConsumeOpts; await using var cc = await ConsumeInternalAsync(serializer, opts, cancellationToken).ConfigureAwait(false); - // Keep subscription alive (since it's a wek ref in subscription manager) until we're done. + // Keep subscription alive (since it's a weak ref in subscription manager) until we're done. using var anchor = _context.Connection.RegisterSubAnchor(cc); - await foreach (var jsMsg in cc.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false)) + while (!cancellationToken.IsCancellationRequested) { - yield return jsMsg; + // We have to check calls individually since we can't use yield return in try-catch blocks. + bool ready; + try + { + ready = await cc.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + ready = false; + } + + if (!ready) + yield break; + + while (!cancellationToken.IsCancellationRequested) + { + bool read; + NatsJSMsg jsMsg; + try + { + read = cc.Msgs.TryRead(out jsMsg); + } + catch (OperationCanceledException) + { + read = false; + jsMsg = default; + } + + if (!read) + break; + + yield return jsMsg; + } } } @@ -115,9 +147,13 @@ public async IAsyncEnumerable> ConsumeAsync( MaxMsgs = 1, IdleHeartbeat = opts.IdleHeartbeat, Expires = opts.Expires, + NotificationHandler = opts.NotificationHandler, }, cancellationToken: cancellationToken).ConfigureAwait(false); + // Keep subscription alive (since it's a weak ref in subscription manager) until we're done. + using var anchor = _context.Connection.RegisterSubAnchor(f); + await foreach (var natsJSMsg in f.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { return natsJSMsg; @@ -146,9 +182,45 @@ public async IAsyncEnumerable> FetchAsync( serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); await using var fc = await FetchInternalAsync(serializer, opts, cancellationToken).ConfigureAwait(false); - await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false)) + + // Keep subscription alive (since it's a weak ref in subscription manager) until we're done. + using var anchor = _context.Connection.RegisterSubAnchor(fc); + + while (!cancellationToken.IsCancellationRequested) { - yield return jsMsg; + // We have to check calls individually since we can't use yield return in try-catch blocks. + bool ready; + try + { + ready = await fc.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + ready = false; + } + + if (!ready) + yield break; + + while (!cancellationToken.IsCancellationRequested) + { + bool read; + NatsJSMsg jsMsg; + try + { + read = fc.Msgs.TryRead(out jsMsg); + } + catch (OperationCanceledException) + { + read = false; + jsMsg = default; + } + + if (!read) + break; + + yield return jsMsg; + } } } @@ -252,6 +324,7 @@ internal async ValueTask> ConsumeInternalAsync(INatsDeserial thresholdBytes: max.ThresholdBytes, expires: timeouts.Expires, idle: timeouts.IdleHeartbeat, + notificationHandler: opts.NotificationHandler, cancellationToken: cancellationToken); await _context.Connection.SubAsync(sub: sub, cancellationToken).ConfigureAwait(false); @@ -273,11 +346,10 @@ await sub.CallMsgNextAsync( return sub; } - internal async ValueTask> OrderedConsumeInternalAsync(INatsDeserialize? serializer = default, NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default) + internal async ValueTask> OrderedConsumeInternalAsync(INatsDeserialize? serializer, NatsJSConsumeOpts opts, CancellationToken cancellationToken) { ThrowIfDeleted(); - opts ??= new NatsJSConsumeOpts(); serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); var inbox = _context.NewInbox(); @@ -348,7 +420,9 @@ internal async ValueTask> FetchInternalAsync( maxMsgs: max.MaxMsgs, maxBytes: max.MaxBytes, expires: timeouts.Expires, - idle: timeouts.IdleHeartbeat); + notificationHandler: opts.NotificationHandler, + idle: timeouts.IdleHeartbeat, + cancellationToken: cancellationToken); await _context.Connection.SubAsync(sub: sub, cancellationToken).ConfigureAwait(false); diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index ca49f62d3..7ebcad074 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -118,6 +118,8 @@ public record NatsJSConsumeOpts /// Hint for the number of bytes left in buffer that should trigger a low watermark on the client, and influence it to request more data. /// public int? ThresholdBytes { get; init; } + + public Func? NotificationHandler { get; init; } } /// @@ -134,6 +136,8 @@ public record NatsJSNextOpts /// Amount idle time the server should wait before sending a heartbeat. For requests with expires > 30s, heartbeats should be enabled by default /// public TimeSpan? IdleHeartbeat { get; init; } + + public Func? NotificationHandler { get; init; } } /// @@ -161,6 +165,8 @@ public record NatsJSFetchOpts /// public TimeSpan? IdleHeartbeat { get; init; } + public Func? NotificationHandler { get; init; } + /// /// Does not wait for messages to be available /// diff --git a/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs b/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs index 72d59bd13..7251d891e 100644 --- a/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs @@ -1,6 +1,7 @@ using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; using NATS.Client.Core; +using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; @@ -57,7 +58,9 @@ public async IAsyncEnumerable> ConsumeAsync( NatsJSConsumeOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { + opts ??= _context.Opts.DefaultConsumeOpts; var consumerName = string.Empty; + var notificationHandler = opts.NotificationHandler; try { @@ -88,6 +91,10 @@ public async IAsyncEnumerable> ConsumeAsync( if (!read) break; } + catch (OperationCanceledException) + { + break; + } catch (NatsJSProtocolException pe) { protocolException = pe; @@ -100,6 +107,7 @@ public async IAsyncEnumerable> ConsumeAsync( } catch (NatsJSTimeoutException e) { + notificationHandler?.Invoke(new NatsJSTimeoutNotification(), cancellationToken); _logger.LogWarning($"{e.Message}. Retrying..."); goto CONSUME_LOOP; } @@ -114,6 +122,10 @@ public async IAsyncEnumerable> ConsumeAsync( if (!canRead) break; } + catch (OperationCanceledException) + { + break; + } catch (NatsJSProtocolException pe) { protocolException = pe; @@ -126,6 +138,7 @@ public async IAsyncEnumerable> ConsumeAsync( } catch (NatsJSTimeoutException e) { + notificationHandler?.Invoke(new NatsJSTimeoutNotification(), cancellationToken); _logger.LogWarning($"{e.Message}. Retrying..."); goto CONSUME_LOOP; } @@ -223,6 +236,7 @@ public async IAsyncEnumerable> FetchAsync( MaxMsgs = 1, IdleHeartbeat = opts.IdleHeartbeat, Expires = opts.Expires, + NotificationHandler = opts.NotificationHandler, }; await foreach (var msg in FetchAsync(serializer, fetchOpts, cancellationToken)) diff --git a/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs b/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs new file mode 100644 index 000000000..1c7fd94d7 --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs @@ -0,0 +1,307 @@ +using NATS.Client.Core.Tests; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream.Tests; + +public class ErrorHandlerTest +{ + private readonly ITestOutputHelper _output; + + public ErrorHandlerTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Consumer_fetch_error_handling() + { + await using var server = NatsServer.StartJS(); + var (nats1, proxy) = server.CreateProxiedClientConnection(); + await using var nats = nats1; + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); + var consumer = await stream.CreateConsumerAsync(new ConsumerConfig("c1"), cts.Token); + + (await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); + + var timeoutNotifications = 0; + var opts = new NatsJSNextOpts + { + NotificationHandler = (e, _) => + { + if (e is NatsJSTimeoutNotification) + { + Interlocked.Increment(ref timeoutNotifications); + } + + return Task.CompletedTask; + }, + Expires = TimeSpan.FromSeconds(6), + IdleHeartbeat = TimeSpan.FromSeconds(3), + }; + + // Next is fetch under the hood. + var next = await consumer.NextAsync(opts: opts, cancellationToken: cts.Token); + if (next is { } msg) + { + msg.Subject.Should().Be("s1.1"); + msg.Data.Should().Be(1); + await msg.AckAsync(cancellationToken: cts.Token); + } + else + { + Assert.Fail("No message received."); + } + + // Swallow heartbeats + proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m); + + // Create an empty stream to potentially reduce the chance of having a message. + var stream2 = await js.CreateStreamAsync(new StreamConfig("s2", new[] { "s2.*" }), cts.Token); + var consumer2 = await stream2.CreateConsumerAsync(new ConsumerConfig("c2"), cts.Token); + + var next2 = await consumer2.NextAsync(opts: opts, cancellationToken: cts.Token); + Assert.Null(next2); + Assert.Equal(1, Volatile.Read(ref timeoutNotifications)); + } + + [Fact] + public async Task Consumer_consume_handling() + { + await using var server = NatsServer.StartJS(); + var (nats1, proxy) = server.CreateProxiedClientConnection(); + await using var nats = nats1; + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); + var consumer = await stream.CreateConsumerAsync(new ConsumerConfig("c1"), cts.Token); + + (await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); + + var timeoutNotifications = 0; + var opts = new NatsJSConsumeOpts + { + MaxMsgs = 10, + NotificationHandler = (e, _) => + { + if (e is NatsJSTimeoutNotification) + { + Interlocked.Increment(ref timeoutNotifications); + } + + return Task.CompletedTask; + }, + Expires = TimeSpan.FromSeconds(6), + IdleHeartbeat = TimeSpan.FromSeconds(3), + }; + + await foreach (var msg in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) + { + msg.Data.Should().Be(1); + msg.Subject.Should().Be("s1.1"); + await msg.AckAsync(cancellationToken: cts.Token); + break; + } + + Assert.Equal(0, Volatile.Read(ref timeoutNotifications)); + + // Swallow heartbeats + proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m); + + var count = 0; + var consumeCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token); + var consume = Task.Run( + async () => + { + await foreach (var unused in consumer.ConsumeAsync(opts: opts, cancellationToken: consumeCts.Token)) + { + Interlocked.Increment(ref count); + } + }, + cts.Token); + + await Retry.Until("timed out", () => Volatile.Read(ref timeoutNotifications) > 0, timeout: TimeSpan.FromSeconds(20)); + consumeCts.Cancel(); + await consume; + + Assert.Equal(0, Volatile.Read(ref count)); + Assert.True(Volatile.Read(ref timeoutNotifications) > 0); + } + + [Fact] + public async Task Ordered_consumer_fetch_error_handling() + { + await using var server = NatsServer.StartJS(); + var (nats1, proxy) = server.CreateProxiedClientConnection(); + await using var nats = nats1; + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); + var consumer = (NatsJSOrderedConsumer)await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); + + (await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); + + var timeoutNotifications = 0; + var opts = new NatsJSFetchOpts + { + MaxMsgs = 10, + NotificationHandler = (e, _) => + { + if (e is NatsJSTimeoutNotification) + { + Interlocked.Increment(ref timeoutNotifications); + } + + return Task.CompletedTask; + }, + Expires = TimeSpan.FromSeconds(6), + IdleHeartbeat = TimeSpan.FromSeconds(3), + }; + + var count1 = 0; + await foreach (var msg in consumer.FetchAsync(opts: opts, cancellationToken: cts.Token)) + { + msg.Subject.Should().Be("s1.1"); + msg.Data.Should().Be(1); + await msg.AckAsync(cancellationToken: cts.Token); + count1++; + } + + Assert.Equal(1, count1); + + // Swallow heartbeats + proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m); + + // Create an empty stream since ordered consumer will pick up messages from beginning everytime. + var stream2 = await js.CreateStreamAsync(new StreamConfig("s2", new[] { "s2.*" }), cts.Token); + var consumer2 = (NatsJSOrderedConsumer)await stream2.CreateOrderedConsumerAsync(cancellationToken: cts.Token); + + var count = 0; + await foreach (var unused in consumer2.FetchAsync(opts: opts, cancellationToken: cts.Token)) + { + count++; + } + + Assert.Equal(0, count); + Assert.Equal(1, Volatile.Read(ref timeoutNotifications)); + } + + [Fact] + public async Task Ordered_consumer_consume_handling() + { + await using var server = NatsServer.StartJS(); + var (nats1, proxy) = server.CreateProxiedClientConnection(); + await using var nats = nats1; + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); + var consumer = (NatsJSOrderedConsumer)await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); + + (await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess(); + + var timeoutNotifications = 0; + var opts = new NatsJSConsumeOpts + { + MaxMsgs = 10, + NotificationHandler = (e, _) => + { + if (e is NatsJSTimeoutNotification) + { + Interlocked.Increment(ref timeoutNotifications); + } + + return Task.CompletedTask; + }, + Expires = TimeSpan.FromSeconds(6), + IdleHeartbeat = TimeSpan.FromSeconds(3), + }; + + await foreach (var msg in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) + { + msg.Data.Should().Be(1); + msg.Subject.Should().Be("s1.1"); + break; + } + + Assert.Equal(0, Volatile.Read(ref timeoutNotifications)); + + // Swallow heartbeats + proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m); + + var consumeCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token); + var consume = Task.Run( + async () => + { + await foreach (var unused in consumer.ConsumeAsync(opts: opts, cancellationToken: consumeCts.Token)) + { + } + }, + cts.Token); + + await Retry.Until("timed out", () => Volatile.Read(ref timeoutNotifications) > 0, timeout: TimeSpan.FromSeconds(20)); + consumeCts.Cancel(); + await consume; + + Assert.True(Volatile.Read(ref timeoutNotifications) > 0); + } + + [Fact] + public async Task Exception_propagation_handling() + { + await using var server = NatsServer.StartJS(); + var (nats1, proxy) = server.CreateProxiedClientConnection(); + await using var nats = nats1; + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); + + var opts = new NatsJSConsumeOpts + { + MaxMsgs = 10, + NotificationHandler = (_, _) => throw new TestConsumerNotificationException(), + Expires = TimeSpan.FromSeconds(6), + IdleHeartbeat = TimeSpan.FromSeconds(3), + }; + + // Swallow heartbeats + proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m); + + try + { + var consumer = await stream.CreateConsumerAsync(new ConsumerConfig("c1"), cts.Token); + await foreach (var unused in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) + { + } + + throw new Exception("Should have thrown"); + } + catch (TestConsumerNotificationException) + { + } + + try + { + var consumer = await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); + await foreach (var unused in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) + { + } + + throw new Exception("Should have thrown"); + } + catch (TestConsumerNotificationException) + { + } + } +} + +public class TestConsumerNotificationException : Exception +{ +}