Skip to content

Commit

Permalink
JetStream consume error notifications (#220)
Browse files Browse the repository at this point in the history
* JetStream consume error notifications

Notify users of events like timeouts while consuming messages.

* Test fixes for ordered consume error handler

* Kick GitHub CI

* Exception handling for notifications

* For normal consumer we are using notification channel since recovery
  effort is happening with in the consumer subscription.

* For Ordered consumer we are using the handler directly in the loop
  since the consume subscription does not do error recovery and all
  exceptions are handled with in the main consumer loop.

* Cancellation support for notification handlers

* Attempt to fix test flapper

* Fixing test flapper

Using fetch instead of next. It is a little pointless using notifications with next.
  • Loading branch information
mtmk authored Nov 17, 2023
1 parent aa99468 commit f7057a2
Show file tree
Hide file tree
Showing 8 changed files with 515 additions and 9 deletions.
11 changes: 11 additions & 0 deletions src/NATS.Client.JetStream/INatsJSNotification.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace NATS.Client.JetStream;

public interface INatsJSNotification
{
string Name { get; }
}

public class NatsJSTimeoutNotification : INatsJSNotification
{
public string Name => "Timeout";
}
12 changes: 12 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal class NatsJSConsume<TMsg> : NatsSubBase
private readonly INatsDeserialize<TMsg> _serializer;
private readonly Timer _timer;
private readonly Task _pullTask;
private readonly NatsJSNotificationChannel? _notificationChannel;

private readonly long _maxMsgs;
private readonly long _expires;
Expand All @@ -54,6 +55,7 @@ public NatsJSConsume(
string consumer,
string subject,
string? queueGroup,
Func<INatsJSNotification, CancellationToken, Task>? notificationHandler,
INatsDeserialize<TMsg> serializer,
NatsSubOpts? opts,
CancellationToken cancellationToken)
Expand All @@ -67,6 +69,11 @@ public NatsJSConsume(
_consumer = consumer;
_serializer = serializer;

if (notificationHandler is { } handler)
{
_notificationChannel = new NatsJSNotificationChannel(handler, e => _userMsgs?.Writer.TryComplete(e), cancellationToken);
}

_maxMsgs = maxMsgs;
_thresholdMsgs = thresholdMsgs;
_maxBytes = maxBytes;
Expand Down Expand Up @@ -96,6 +103,7 @@ public NatsJSConsume(
static state =>
{
var self = (NatsJSConsume<TMsg>)state!;
self._notificationChannel?.Notify(new NatsJSTimeoutNotification());

if (self._cancellationToken.IsCancellationRequested)
{
Expand Down Expand Up @@ -167,6 +175,10 @@ public override async ValueTask DisposeAsync()
await base.DisposeAsync().ConfigureAwait(false);
await _pullTask.ConfigureAwait(false);
await _timer.DisposeAsync().ConfigureAwait(false);
if (_notificationChannel != null)
{
await _notificationChannel.DisposeAsync();
}
}

internal override IEnumerable<ICommand> GetReconnectCommands(int sid)
Expand Down
16 changes: 15 additions & 1 deletion src/NATS.Client.JetStream/Internal/NatsJSFetch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ internal class NatsJSFetch<TMsg> : NatsSubBase
private readonly INatsDeserialize<TMsg> _serializer;
private readonly Timer _hbTimer;
private readonly Timer _expiresTimer;
private readonly NatsJSNotificationChannel? _notificationChannel;

private readonly long _maxMsgs;
private readonly long _maxBytes;
Expand All @@ -40,8 +41,10 @@ public NatsJSFetch(
string consumer,
string subject,
string? queueGroup,
Func<INatsJSNotification, CancellationToken, Task>? notificationHandler,
INatsDeserialize<TMsg> serializer,
NatsSubOpts? opts)
NatsSubOpts? opts,
CancellationToken cancellationToken)
: base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts)
{
_logger = Connection.Opts.LoggerFactory.CreateLogger<NatsJSFetch<TMsg>>();
Expand All @@ -51,6 +54,11 @@ public NatsJSFetch(
_consumer = consumer;
_serializer = serializer;

if (notificationHandler is { } handler)
{
_notificationChannel = new NatsJSNotificationChannel(handler, e => _userMsgs?.Writer.TryComplete(e), cancellationToken);
}

_maxMsgs = maxMsgs;
_maxBytes = maxBytes;
_expires = expires.ToNanos();
Expand Down Expand Up @@ -84,6 +92,8 @@ public NatsJSFetch(
static state =>
{
var self = (NatsJSFetch<TMsg>)state!;
self._notificationChannel?.Notify(new NatsJSTimeoutNotification());

self.EndSubscription(NatsSubEndReason.IdleHeartbeatTimeout);
if (self._debug)
{
Expand Down Expand Up @@ -134,6 +144,10 @@ public override async ValueTask DisposeAsync()
await base.DisposeAsync().ConfigureAwait(false);
await _hbTimer.DisposeAsync().ConfigureAwait(false);
await _expiresTimer.DisposeAsync().ConfigureAwait(false);
if (_notificationChannel != null)
{
await _notificationChannel.DisposeAsync();
}
}

internal override IEnumerable<ICommand> GetReconnectCommands(int sid)
Expand Down
68 changes: 68 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System.Threading.Channels;

namespace NATS.Client.JetStream.Internal;

internal class NatsJSNotificationChannel : IAsyncDisposable
{
private readonly Func<INatsJSNotification, CancellationToken, Task> _notificationHandler;
private readonly Action<Exception> _exceptionHandler;
private readonly CancellationToken _cancellationToken;
private readonly Channel<INatsJSNotification> _channel;
private readonly Task _loop;

public NatsJSNotificationChannel(
Func<INatsJSNotification, CancellationToken, Task> notificationHandler,
Action<Exception> exceptionHandler,
CancellationToken cancellationToken)
{
_notificationHandler = notificationHandler;
_exceptionHandler = exceptionHandler;
_cancellationToken = cancellationToken;
_channel = Channel.CreateBounded<INatsJSNotification>(new BoundedChannelOptions(128)
{
AllowSynchronousContinuations = false,
SingleReader = false,
SingleWriter = false,
FullMode = BoundedChannelFullMode.DropOldest,
});
_loop = Task.Run(NotificationLoop, _cancellationToken);
}

public void Notify(INatsJSNotification notification) => _channel.Writer.TryWrite(notification);

public async ValueTask DisposeAsync()
{
_channel.Writer.TryComplete();
try
{
await _loop;
}
catch (OperationCanceledException)
{
}
}

private async Task NotificationLoop()
{
try
{
while (await _channel.Reader.WaitToReadAsync(_cancellationToken))
{
while (_channel.Reader.TryRead(out var notification))
{
try
{
await _notificationHandler(notification, _cancellationToken);
}
catch (Exception e)
{
_exceptionHandler(e);
}
}
}
}
catch (OperationCanceledException)
{
}
}
}
90 changes: 82 additions & 8 deletions src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,44 @@ public async IAsyncEnumerable<NatsJSMsg<T>> ConsumeAsync<T>(
opts ??= _context.Opts.DefaultConsumeOpts;
await using var cc = await ConsumeInternalAsync<T>(serializer, opts, cancellationToken).ConfigureAwait(false);

// Keep subscription alive (since it's a wek ref in subscription manager) until we're done.
// Keep subscription alive (since it's a weak ref in subscription manager) until we're done.
using var anchor = _context.Connection.RegisterSubAnchor(cc);

await foreach (var jsMsg in cc.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
while (!cancellationToken.IsCancellationRequested)
{
yield return jsMsg;
// We have to check calls individually since we can't use yield return in try-catch blocks.
bool ready;
try
{
ready = await cc.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
ready = false;
}

if (!ready)
yield break;

while (!cancellationToken.IsCancellationRequested)
{
bool read;
NatsJSMsg<T> jsMsg;
try
{
read = cc.Msgs.TryRead(out jsMsg);
}
catch (OperationCanceledException)
{
read = false;
jsMsg = default;
}

if (!read)
break;

yield return jsMsg;
}
}
}

Expand Down Expand Up @@ -115,9 +147,13 @@ public async IAsyncEnumerable<NatsJSMsg<T>> ConsumeAsync<T>(
MaxMsgs = 1,
IdleHeartbeat = opts.IdleHeartbeat,
Expires = opts.Expires,
NotificationHandler = opts.NotificationHandler,
},
cancellationToken: cancellationToken).ConfigureAwait(false);

// Keep subscription alive (since it's a weak ref in subscription manager) until we're done.
using var anchor = _context.Connection.RegisterSubAnchor(f);

await foreach (var natsJSMsg in f.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
return natsJSMsg;
Expand Down Expand Up @@ -146,9 +182,45 @@ public async IAsyncEnumerable<NatsJSMsg<T>> FetchAsync<T>(
serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer<T>();

await using var fc = await FetchInternalAsync<T>(serializer, opts, cancellationToken).ConfigureAwait(false);
await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))

// Keep subscription alive (since it's a weak ref in subscription manager) until we're done.
using var anchor = _context.Connection.RegisterSubAnchor(fc);

while (!cancellationToken.IsCancellationRequested)
{
yield return jsMsg;
// We have to check calls individually since we can't use yield return in try-catch blocks.
bool ready;
try
{
ready = await fc.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
ready = false;
}

if (!ready)
yield break;

while (!cancellationToken.IsCancellationRequested)
{
bool read;
NatsJSMsg<T> jsMsg;
try
{
read = fc.Msgs.TryRead(out jsMsg);
}
catch (OperationCanceledException)
{
read = false;
jsMsg = default;
}

if (!read)
break;

yield return jsMsg;
}
}
}

Expand Down Expand Up @@ -252,6 +324,7 @@ internal async ValueTask<NatsJSConsume<T>> ConsumeInternalAsync<T>(INatsDeserial
thresholdBytes: max.ThresholdBytes,
expires: timeouts.Expires,
idle: timeouts.IdleHeartbeat,
notificationHandler: opts.NotificationHandler,
cancellationToken: cancellationToken);

await _context.Connection.SubAsync(sub: sub, cancellationToken).ConfigureAwait(false);
Expand All @@ -273,11 +346,10 @@ await sub.CallMsgNextAsync(
return sub;
}

internal async ValueTask<NatsJSOrderedConsume<T>> OrderedConsumeInternalAsync<T>(INatsDeserialize<T>? serializer = default, NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default)
internal async ValueTask<NatsJSOrderedConsume<T>> OrderedConsumeInternalAsync<T>(INatsDeserialize<T>? serializer, NatsJSConsumeOpts opts, CancellationToken cancellationToken)
{
ThrowIfDeleted();

opts ??= new NatsJSConsumeOpts();
serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer<T>();
var inbox = _context.NewInbox();

Expand Down Expand Up @@ -348,7 +420,9 @@ internal async ValueTask<NatsJSFetch<T>> FetchInternalAsync<T>(
maxMsgs: max.MaxMsgs,
maxBytes: max.MaxBytes,
expires: timeouts.Expires,
idle: timeouts.IdleHeartbeat);
notificationHandler: opts.NotificationHandler,
idle: timeouts.IdleHeartbeat,
cancellationToken: cancellationToken);

await _context.Connection.SubAsync(sub: sub, cancellationToken).ConfigureAwait(false);

Expand Down
6 changes: 6 additions & 0 deletions src/NATS.Client.JetStream/NatsJSOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public record NatsJSConsumeOpts
/// Hint for the number of bytes left in buffer that should trigger a low watermark on the client, and influence it to request more data.
/// </summary>
public int? ThresholdBytes { get; init; }

public Func<INatsJSNotification, CancellationToken, Task>? NotificationHandler { get; init; }
}

/// <summary>
Expand All @@ -134,6 +136,8 @@ public record NatsJSNextOpts
/// Amount idle time the server should wait before sending a heartbeat. For requests with expires > 30s, heartbeats should be enabled by default
/// </summary>
public TimeSpan? IdleHeartbeat { get; init; }

public Func<INatsJSNotification, CancellationToken, Task>? NotificationHandler { get; init; }
}

/// <summary>
Expand Down Expand Up @@ -161,6 +165,8 @@ public record NatsJSFetchOpts
/// </summary>
public TimeSpan? IdleHeartbeat { get; init; }

public Func<INatsJSNotification, CancellationToken, Task>? NotificationHandler { get; init; }

/// <summary>
/// Does not wait for messages to be available
/// </summary>
Expand Down
Loading

0 comments on commit f7057a2

Please sign in to comment.