Skip to content

Commit

Permalink
JetStream ordered consume stability fixes (#202)
Browse files Browse the repository at this point in the history
* JetStream ordered consume stability fixes

Because ordered consume operations use ephemeral consumers it's important
to be able to create new consumers when things go wrong. There are two
major events that might strongly indicate we won't be able to find our
consumer on the server: disconnects and idle heartbeat timeouts.

With this approach introduced with this fix, we recreate the consumer on
server disconnects and idle heartbeat timeouts, making sure the consumer
can carry on from where it's left off (sequence state is maintained when
this happens).

Justification for large code duplication: NatsJSOrderedConsume.cs is a
copy of NatsJSConsume.cs subscription class. Reason for this is to
maintain stability of the normal consumer since the main behaviour is
fairly different. There is also a chance the behaviours might diverge
even greater as we discover other issues. We may consider to merge
these classes in a tidy-up effort later on.

We also introduced a general timeout (same as connection request timeout)
for all JetStream API calls. We needed this because the consumer deletion
process was sometimes hanging due to the server receiving the request
being killed and never sending a response back. Before this we were
relying on the CommandTimeout (which is 1 minute by default) to kick in.
Now we use RequestTimeout (5 seconds by default) on the subscription
waiting for the reply.

* Consume memory test fixes

* Ordered consumer create retries
  • Loading branch information
mtmk authored Nov 13, 2023
1 parent 2b9c1c9 commit 240a7e5
Show file tree
Hide file tree
Showing 8 changed files with 697 additions and 62 deletions.
55 changes: 36 additions & 19 deletions src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,22 @@ public partial class NatsConnection
/// <inheritdoc />
public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var anchor = Interlocked.Increment(ref _subAnchorId);
try
{
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();

await using var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();

_subAnchor[anchor] = sub;
await using var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
using var anchor = RegisterSubAnchor(sub);

await SubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);
await SubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);

// We don't cancel the channel reader here because we want to keep reading until the subscription
// channel writer completes so that messages left in the channel can be consumed before exit the loop.
while (await sub.Msgs.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false))
// We don't cancel the channel reader here because we want to keep reading until the subscription
// channel writer completes so that messages left in the channel can be consumed before exit the loop.
while (await sub.Msgs.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false))
{
while (sub.Msgs.TryRead(out var msg))
{
while (sub.Msgs.TryRead(out var msg))
{
yield return msg;
}
yield return msg;
}
}
finally
{
_subAnchor.TryRemove(anchor, out _);
}
}

internal async ValueTask<NatsSub<T>> SubscribeInternalAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
Expand All @@ -48,4 +39,30 @@ internal async ValueTask<NatsSub<T>> SubscribeInternalAsync<T>(string subject, s
await SubAsync(sub, cancellationToken).ConfigureAwait(false);
return sub;
}

/// <summary>
/// Make sure subscription is not collected until the end of the scope.
/// </summary>
/// <param name="sub">Subscription object</param>
/// <returns>Disposable</returns>
/// <remarks>
/// We must keep subscription alive until the end of its scope especially in async iterators.
/// Otherwise subscription is collected because subscription manager only holds a weak reference to it.
/// </remarks>
internal IDisposable RegisterSubAnchor(NatsSubBase sub) => new SubAnchor(this, sub);

internal class SubAnchor : IDisposable
{
private readonly NatsConnection _nats;
private readonly long _anchor;

public SubAnchor(NatsConnection nats, NatsSubBase sub)
{
_nats = nats;
_anchor = Interlocked.Increment(ref _nats._subAnchorId);
_nats._subAnchor[_anchor] = sub;
}

public void Dispose() => _nats._subAnchor.TryRemove(_anchor, out _);
}
}
Loading

0 comments on commit 240a7e5

Please sign in to comment.