diff --git a/src/EventStore.Client.Streams/EventStoreClient.Read.cs b/src/EventStore.Client.Streams/EventStoreClient.Read.cs index 8e2e743bf..626b3674b 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Read.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Read.cs @@ -165,7 +165,7 @@ public async IAsyncEnumerator GetAsyncEnumerator( /// /// Asynchronously reads all the events from a stream. - /// + /// /// The result could also be inspected as a means to avoid handling exceptions as the would indicate whether or not the stream is readable./> /// /// The in which to read. @@ -227,12 +227,12 @@ public class ReadStreamResult : IAsyncEnumerable { public string StreamName { get; } /// - /// The of the first message in this stream. Will only be filled once has been enumerated. + /// The of the first message in this stream. Will only be filled once has been enumerated. /// public StreamPosition? FirstStreamPosition { get; private set; } /// - /// The of the last message in this stream. Will only be filled once has been enumerated. + /// The of the last message in this stream. Will only be filled once has been enumerated. /// public StreamPosition? LastStreamPosition { get; private set; } @@ -357,7 +357,7 @@ public async IAsyncEnumerator GetAsyncEnumerator( } } - private async IAsyncEnumerable<(SubscriptionConfirmation, Position?, ResolvedEvent)> ReadInternal( + private async IAsyncEnumerable<(SubscriptionConfirmation, Position?, ResolvedEvent, StreamMessage.SubscriptionMessage?)> ReadInternal( ReadReq request, UserCredentials? userCredentials, [EnumeratorCancellation] CancellationToken cancellationToken) { @@ -392,16 +392,18 @@ public async IAsyncEnumerator GetAsyncEnumerator( } } - private static (SubscriptionConfirmation, Position?, ResolvedEvent)? ConvertToItem(ReadResp response) => + private static (SubscriptionConfirmation, Position?, ResolvedEvent, StreamMessage.SubscriptionMessage?)? ConvertToItem(ReadResp response) => response.ContentCase switch { Confirmation => ( - new SubscriptionConfirmation(response.Confirmation.SubscriptionId), null, default), + new SubscriptionConfirmation(response.Confirmation.SubscriptionId), null, default, null), Event => (SubscriptionConfirmation.None, null, - ConvertToResolvedEvent(response.Event)), + ConvertToResolvedEvent(response.Event), null), Checkpoint => (SubscriptionConfirmation.None, new Position(response.Checkpoint.CommitPosition, response.Checkpoint.PreparePosition), - default), + default, null), + CaughtUp => (SubscriptionConfirmation.None, null, default, StreamMessage.SubscriptionMessage.CaughtUp.Instance), + FellBehind => (SubscriptionConfirmation.None, null, default, StreamMessage.SubscriptionMessage.FellBehind.Instance), _ => null }; @@ -421,6 +423,7 @@ private static StreamMessage ConvertResponseToMessage(ReadResp response) => new StreamPosition(response.LastStreamPosition)), StreamNotFound => StreamMessage.NotFound.Instance, CaughtUp => StreamMessage.SubscriptionMessage.CaughtUp.Instance, + FellBehind => StreamMessage.SubscriptionMessage.FellBehind.Instance, _ => StreamMessage.Unknown.Instance }; diff --git a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs index 01d26393b..7b3f0372d 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs @@ -17,6 +17,8 @@ public partial class EventStoreClient { /// A Task invoked and awaited when a new event is received over the subscription. /// Whether to resolve LinkTo events automatically. /// An action invoked if the subscription is dropped. + /// An action invoked if the subscription reached the head of the stream. + /// An action invoked if the subscription has fallen behind /// The optional to apply. /// The optional user credentials to perform operation with. /// The optional . @@ -26,6 +28,8 @@ public Task SubscribeToAllAsync( Func eventAppeared, bool resolveLinkTos = false, Action? subscriptionDropped = default, + Func? caughtUp = default, + Func? fellBehind = default, SubscriptionFilterOptions? filterOptions = null, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) => StreamSubscription.Confirm(ReadInternal(new ReadReq { @@ -36,9 +40,9 @@ public Task SubscribeToAllAsync( Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), Filter = GetFilterOptions(filterOptions)! } - }, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log, + }, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, caughtUp, fellBehind, _log, filterOptions?.CheckpointReached, cancellationToken); - + /// /// Subscribes to all events. /// @@ -72,6 +76,8 @@ public SubscriptionResult SubscribeToAll( /// A Task invoked and awaited when a new event is received over the subscription. /// Whether to resolve LinkTo events automatically. /// An action invoked if the subscription is dropped. + /// An action invoked if the subscription reached the head of the stream. + /// An action is invoked if the subscription has fallen behind /// The optional user credentials to perform operation with. /// The optional . /// @@ -80,6 +86,8 @@ public Task SubscribeToStreamAsync(string streamName, Func eventAppeared, bool resolveLinkTos = false, Action? subscriptionDropped = default, + Func? caughtUp = default, + Func? fellBehind = default, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) => StreamSubscription.Confirm(ReadInternal(new ReadReq { Options = new ReadReq.Types.Options { @@ -88,7 +96,7 @@ public Task SubscribeToStreamAsync(string streamName, Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start), Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions() } - }, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log, + }, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, caughtUp, fellBehind, _log, cancellationToken: cancellationToken); /// @@ -115,9 +123,7 @@ public SubscriptionResult SubscribeToStream(string streamName, } }, Settings, userCredentials, cancellationToken, _log); } - - - + /// /// A class which represents current subscription state and an enumerator to consume messages /// @@ -130,9 +136,9 @@ public class SubscriptionResult { /// The name of the stream. /// public string StreamName { get; } - + /// - /// + /// /// public Position StreamPosition { get; private set; } @@ -144,7 +150,7 @@ public class SubscriptionResult { /// /// Current subscription state /// - + public SubscriptionState SubscriptionState { get { if (_exceptionInternal is not null) { @@ -156,7 +162,7 @@ public SubscriptionState SubscriptionState { } private volatile SubscriptionState _subscriptionStateInternal; - + private volatile Exception? _exceptionInternal; /// @@ -170,7 +176,7 @@ async IAsyncEnumerable GetMessages() { if (Interlocked.Exchange(ref _messagesEnumerated, 1) == 1) { throw new InvalidOperationException("Messages may only be enumerated once."); } - + try { await foreach (var message in _internalChannel.Reader.ReadAllAsync() .ConfigureAwait(false)) { @@ -232,10 +238,10 @@ internal SubscriptionResult(Func> selectCal : request.Options.Stream.StreamIdentifier!; _subscriptionStateInternal = Initializing; - + _ = PumpMessages(selectCallInvoker, request, callOptions); } - + async Task PumpMessages(Func> selectCallInvoker, ReadReq request, CallOptions callOptions) { var firstMessageRead = false; var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false); @@ -269,14 +275,14 @@ async Task PumpMessages(Func> selectCallInv await _internalChannel.Writer.WriteAsync(messageToWrite, _cts.Token).ConfigureAwait(false); if (messageToWrite is StreamMessage.NotFound) { - _exceptionInternal = new StreamNotFoundException(StreamName); + _exceptionInternal = new StreamNotFoundException(StreamName); break; } } } catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Cancelled && ex.Status.Detail.Contains("Call canceled by the client.")) { _log.LogInformation( - "Subscription {subscriptionId} was dropped because cancellation was requested by the client.", + "Subscription {subscriptionId} was dropped because cancellation was requested by the client.", SubscriptionId); } catch (Exception ex) { if (ex is ObjectDisposedException or OperationCanceledException) { @@ -309,11 +315,11 @@ private static void Validate(ReadReq request) { var streamOptions = request.Options.Stream; var allOptions = request.Options.All; - + if (allOptions == null && streamOptions == null) { throw new ArgumentException("No stream provided to subscribe"); } - + if (allOptions != null && streamOptions != null) { throw new ArgumentException($"Cannot subscribe both ${SystemStreams.AllStream}, and ${streamOptions.StreamIdentifier}"); } diff --git a/src/EventStore.Client.Streams/StreamMessage.cs b/src/EventStore.Client.Streams/StreamMessage.cs index 0636b348a..3d371da4d 100644 --- a/src/EventStore.Client.Streams/StreamMessage.cs +++ b/src/EventStore.Client.Streams/StreamMessage.cs @@ -62,6 +62,13 @@ public record Checkpoint(Position Position) : SubscriptionMessage; public record CaughtUp : SubscriptionMessage { internal static readonly CaughtUp Instance = new(); } + + /// + /// A representing client has fallen behind, meaning it's no longer keeping up with the stream's space + /// + public record FellBehind : SubscriptionMessage { + internal static readonly FellBehind Instance = new(); + } } /// diff --git a/src/EventStore.Client.Streams/StreamSubscription.cs b/src/EventStore.Client.Streams/StreamSubscription.cs index db21c551a..748d67c12 100644 --- a/src/EventStore.Client.Streams/StreamSubscription.cs +++ b/src/EventStore.Client.Streams/StreamSubscription.cs @@ -1,7 +1,3 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; using Grpc.Core; using Microsoft.Extensions.Logging; @@ -10,13 +6,15 @@ namespace EventStore.Client { /// A class representing a . /// public class StreamSubscription : IDisposable { - private readonly IAsyncEnumerable _events; - private readonly Func _eventAppeared; - private readonly Func _checkpointReached; + private readonly IAsyncEnumerable _events; + private readonly Func _eventAppeared; + private readonly Func _caughtUp; + private readonly Func _fellBehind; + private readonly Func _checkpointReached; private readonly Action? _subscriptionDropped; - private readonly ILogger _log; - private readonly CancellationTokenSource _disposed; - private int _subscriptionDroppedInvoked; + private readonly ILogger _log; + private readonly CancellationTokenSource _disposed; + private int _subscriptionDroppedInvoked; /// /// The id of the set by the server. @@ -24,9 +22,11 @@ public class StreamSubscription : IDisposable { public string SubscriptionId { get; } internal static async Task Confirm( - IAsyncEnumerable<(SubscriptionConfirmation confirmation, Position?, ResolvedEvent)> read, + IAsyncEnumerable<(SubscriptionConfirmation confirmation, Position?, ResolvedEvent, StreamMessage.SubscriptionMessage?)> read, Func eventAppeared, Action? subscriptionDropped, + Func? caughtUp, + Func? fellBehind, ILogger log, Func? checkpointReached = null, CancellationToken cancellationToken = default) { @@ -34,29 +34,36 @@ internal static async Task Confirm( var enumerator = read.GetAsyncEnumerator(cancellationToken); if (await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(false) && enumerator.Current.confirmation != SubscriptionConfirmation.None) - return new StreamSubscription(enumerator, eventAppeared, subscriptionDropped, log, + return new StreamSubscription(enumerator, eventAppeared, subscriptionDropped, caughtUp, fellBehind, log, checkpointReached, cancellationToken); throw new InvalidOperationException($"Subscription to {enumerator} could not be confirmed."); } private StreamSubscription( - IAsyncEnumerator<(SubscriptionConfirmation confirmation, Position?, ResolvedEvent)> events, + IAsyncEnumerator<(SubscriptionConfirmation confirmation, Position?, ResolvedEvent, StreamMessage.SubscriptionMessage?)> events, Func eventAppeared, Action? subscriptionDropped, + Func? caughtUp, + Func? fellBehind, ILogger log, Func? checkpointReached, CancellationToken cancellationToken = default) { - _disposed = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - _events = new Enumerable(events, CheckpointReached, _disposed.Token); - _eventAppeared = eventAppeared; - _checkpointReached = checkpointReached ?? ((_, __, ct) => Task.CompletedTask); - _subscriptionDropped = subscriptionDropped; - _log = log; + _disposed = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _events = new Enumerable(events, CheckpointReached, CaughtUp, FellBehind, _disposed.Token); + _eventAppeared = eventAppeared; + _caughtUp = caughtUp ?? ((_, ct) => Task.CompletedTask); + _fellBehind = fellBehind ?? ((_, ct) => Task.CompletedTask); + _checkpointReached = checkpointReached ?? ((_, __, ct) => Task.CompletedTask); + _subscriptionDropped = subscriptionDropped; + _log = log; _subscriptionDroppedInvoked = 0; - SubscriptionId = events.Current.confirmation.SubscriptionId; + SubscriptionId = events.Current.confirmation.SubscriptionId; Task.Run(Subscribe); } + private Task CaughtUp(CancellationToken cancellationToken) => _caughtUp(this, cancellationToken); + + private Task FellBehind(CancellationToken cancellationToken) => _fellBehind(this, cancellationToken); private Task CheckpointReached(Position position) => _checkpointReached(this, position, _disposed.Token); @@ -67,13 +74,6 @@ private async Task Subscribe() { try { await foreach (var resolvedEvent in _events.ConfigureAwait(false)) { try { - _log.LogTrace( - "Subscription {subscriptionId} received event {streamName}@{streamRevision} {position}", - SubscriptionId, - resolvedEvent.OriginalEvent.EventStreamId, - resolvedEvent.OriginalEvent.EventNumber, - resolvedEvent.OriginalEvent.Position - ); await _eventAppeared(this, resolvedEvent, _disposed.Token).ConfigureAwait(false); } catch (Exception ex) when (ex is ObjectDisposedException or OperationCanceledException) { @@ -104,9 +104,9 @@ private async Task Subscribe() { } catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Cancelled && ex.Status.Detail.Contains("Call canceled by the client.")) { _log.LogInformation( - "Subscription {subscriptionId} was dropped because cancellation was requested by the client.", + "Subscription {subscriptionId} was dropped because cancellation was requested by the client.", SubscriptionId); - SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex); + SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex); } catch (Exception ex) { if (_subscriptionDroppedInvoked == 0) { @@ -134,12 +134,19 @@ private void SubscriptionDropped(SubscriptionDroppedReason reason, Exception? ex } private class Enumerable : IAsyncEnumerable { - private readonly IAsyncEnumerator<(SubscriptionConfirmation, Position?, ResolvedEvent)> _inner; + private readonly IAsyncEnumerator<(SubscriptionConfirmation, Position?, ResolvedEvent, StreamMessage.SubscriptionMessage?)> _inner; private readonly Func _checkpointReached; private readonly CancellationToken _cancellationToken; - - public Enumerable(IAsyncEnumerator<(SubscriptionConfirmation, Position?, ResolvedEvent)> inner, - Func checkpointReached, CancellationToken cancellationToken) { + private readonly Func _caughtUp; + private readonly Func _fellBehind; + + public Enumerable( + IAsyncEnumerator<(SubscriptionConfirmation, Position?, ResolvedEvent, StreamMessage.SubscriptionMessage?)> inner, + Func checkpointReached, + Func caughtUp, + Func fellBehind, + CancellationToken cancellationToken + ) { if (inner == null) { throw new ArgumentNullException(nameof(inner)); } @@ -148,23 +155,27 @@ public Enumerable(IAsyncEnumerator<(SubscriptionConfirmation, Position?, Resolve throw new ArgumentNullException(nameof(checkpointReached)); } - _inner = inner; + _inner = inner; _checkpointReached = checkpointReached; _cancellationToken = cancellationToken; + _caughtUp = caughtUp; + _fellBehind = fellBehind; } public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) - => new Enumerator(_inner, _checkpointReached, _cancellationToken); + => new Enumerator(_inner, _checkpointReached, _caughtUp, _fellBehind, _cancellationToken); private class Enumerator : IAsyncEnumerator { private readonly IAsyncEnumerator<(SubscriptionConfirmation, Position? position, ResolvedEvent - resolvedEvent)> _inner; + resolvedEvent, StreamMessage.SubscriptionMessage? messageType)> _inner; - private readonly Func _checkpointReached; - private readonly CancellationToken _cancellationToken; + private readonly Func _checkpointReached; + private readonly CancellationToken _cancellationToken; + private readonly Func _caughtUp; + private readonly Func _fellBehind; - public Enumerator(IAsyncEnumerator<(SubscriptionConfirmation, Position?, ResolvedEvent)> inner, - Func checkpointReached, CancellationToken cancellationToken) { + public Enumerator(IAsyncEnumerator<(SubscriptionConfirmation, Position?, ResolvedEvent, StreamMessage.SubscriptionMessage?)> inner, + Func checkpointReached, Func caughtUp, Func fellBehind, CancellationToken cancellationToken) { if (inner == null) { throw new ArgumentNullException(nameof(inner)); } @@ -173,9 +184,11 @@ public Enumerator(IAsyncEnumerator<(SubscriptionConfirmation, Position?, Resolve throw new ArgumentNullException(nameof(checkpointReached)); } - _inner = inner; + _inner = inner; _checkpointReached = checkpointReached; _cancellationToken = cancellationToken; + _caughtUp = caughtUp; + _fellBehind = fellBehind; } public ValueTask DisposeAsync() => _inner.DisposeAsync(); @@ -194,6 +207,16 @@ public async ValueTask MoveNextAsync() { return false; } + if (_inner.Current.messageType == StreamMessage.SubscriptionMessage.CaughtUp.Instance) { + await _caughtUp(_cancellationToken).ConfigureAwait(false); + goto ReadLoop; + } + + if (_inner.Current.messageType == StreamMessage.SubscriptionMessage.FellBehind.Instance) { + await _caughtUp(_cancellationToken).ConfigureAwait(false); + goto ReadLoop; + } + if (!_inner.Current.position.HasValue) { return true; } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs index 741acd876..72c3041a5 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs @@ -121,4 +121,4 @@ public override Task DisposeAsync() { return base.DisposeAsync(); } } -} \ No newline at end of file +} diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs index e611c2492..aaaf5b290 100644 --- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs +++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs @@ -20,6 +20,8 @@ public async Task Callback_subscription_does_not_send_checkpoint_reached_after_d }, false, (_, _, _) => subscriptionDisposed.TrySetResult(true), + caughtUp: null, + fellBehind: null, new( StreamFilter.Prefix(streamName), 1, diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs index f4891abb4..b4022469d 100644 --- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs +++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs @@ -137,6 +137,8 @@ await Fixture.Streams (_, e, _) => EventAppeared(e, streamName, out startFrom), false, (s, r, e) => SubscriptionDropped(s, r, e, Subscribe), + caughtUp: null, + fellBehind: null, new(EventTypeFilter.ExcludeSystemEvents()) ); diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs index c7462306a..c2d9237e4 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs @@ -1,12 +1,15 @@ +using Exception = System.Exception; + namespace EventStore.Client.Streams.Tests.Subscriptions; [Trait("Category", "Subscriptions")] [Trait("Category", "Target:All")] public class subscribe_to_all(ITestOutputHelper output, SubscriptionsFixture fixture) : EventStoreTests(output, fixture) { [Fact] - public async Task receives_all_events_from_start() { + public async Task Callback_receives_all_events_from_start() { var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); + var caughtUpCalled = new TaskCompletionSource(); var seedEvents = Fixture.CreateTestEvents(10).ToArray(); var pageSize = seedEvents.Length / 2; @@ -17,13 +20,14 @@ public async Task receives_all_events_from_start() { await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); using var subscription = await Fixture.Streams - .SubscribeToAllAsync(FromAll.Start, OnReceived, false, OnDropped) + .SubscribeToAllAsync(FromAll.Start, OnReceived, false, OnDropped, OnCaughtUp) .WithTimeout(); foreach (var evt in seedEvents.Skip(pageSize)) await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); await receivedAllEvents.Task.WithTimeout(); + await caughtUpCalled.Task.WithTimeout(); // if the subscription dropped before time, raise the reason why if (subscriptionDropped.Task.IsCompleted) @@ -38,30 +42,92 @@ public async Task receives_all_events_from_start() { Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) { availableEvents.RemoveWhere(x => x == re.OriginalEvent.EventId); - + if (availableEvents.Count == 0) { receivedAllEvents.TrySetResult(true); Fixture.Log.Information("Received all {TotalEventsCount} expected events", seedEvents.Length); } - + + return Task.CompletedTask; + } + + Task OnCaughtUp(StreamSubscription sub, CancellationToken ct) { + Fixture.Log.Information("Subscription has caught up"); + caughtUpCalled.TrySetResult(true); return Task.CompletedTask; } void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } - + + [Fact] + [Trait("Type", "All")] + public async Task Iterator_receives_all_events_from_start() { + var receivedAllEvents = new TaskCompletionSource(); + var subscriptionDropped = new TaskCompletionSource(); + var caughtUpCalled = new TaskCompletionSource(); + + var seedEvents = Fixture.CreateTestEvents(10).ToArray(); + var pageSize = seedEvents.Length / 2; + + var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); + + foreach (var evt in seedEvents.Take(pageSize)) + await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); + + var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start, false); + ReadMessages(subscription, EventAppeared, SubscriptionDropped, OnCaughtUp); + + foreach (var evt in seedEvents.Skip(pageSize)) + await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); + + await receivedAllEvents.Task.WithTimeout(); + await caughtUpCalled.Task.WithTimeout(); + + // if the subscription dropped before time, raise the reason why + if (subscriptionDropped.Task.IsCompleted) + subscriptionDropped.Task.IsCompleted.ShouldBe(false, subscriptionDropped.Task.Result?.ToString()); + + // stop the subscription + subscription.Dispose(); + var result = await subscriptionDropped.Task.WithTimeout(); + result.ShouldBe(null); + + return; + + Task EventAppeared(ResolvedEvent re) { + availableEvents.RemoveWhere(x => x == re.OriginalEvent.EventId); + + if (availableEvents.Count == 0) { + receivedAllEvents.TrySetResult(true); + Fixture.Log.Information("Received all {TotalEventsCount} expected events", seedEvents.Length); + } + + return Task.CompletedTask; + } + + Task OnCaughtUp(EventStoreClient.SubscriptionResult sub) { + Fixture.Log.Information("Subscription has caught up"); + caughtUpCalled.TrySetResult(true); + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => subscriptionDropped.SetResult(ex); + } + [Fact] public async Task receives_all_events_from_end() { var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); + var caughtUpCalled = new TaskCompletionSource(); var seedEvents = Fixture.CreateTestEvents(10).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); using var subscription = await Fixture.Streams - .SubscribeToAllAsync(FromAll.End, OnReceived, false, OnDropped) + .SubscribeToAllAsync(FromAll.End, OnReceived, false, OnDropped, OnCaughtUp) .WithTimeout(); // add the events we want to receive after we start the subscription @@ -69,6 +135,7 @@ public async Task receives_all_events_from_end() { await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); await receivedAllEvents.Task.WithTimeout(); + await caughtUpCalled.Task.WithTimeout(); // if the subscription dropped before time, raise the reason why if (subscriptionDropped.Task.IsCompleted) @@ -92,6 +159,12 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) return Task.CompletedTask; } + Task OnCaughtUp(StreamSubscription sub, CancellationToken ct) { + Fixture.Log.Information("Subscription has caught up"); + caughtUpCalled.TrySetResult(true); + return Task.CompletedTask; + } + void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } @@ -100,6 +173,7 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti public async Task receives_all_events_from_position() { var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); + var caughtUpCalled = new TaskCompletionSource(); var seedEvents = Fixture.CreateTestEvents(10).ToArray(); var pageSize = seedEvents.Length / 2; @@ -114,7 +188,7 @@ public async Task receives_all_events_from_position() { var position = FromAll.After(writeResult.LogPosition); using var subscription = await Fixture.Streams - .SubscribeToAllAsync(position, OnReceived, false, OnDropped) + .SubscribeToAllAsync(position, OnReceived, false, OnDropped, OnCaughtUp) .WithTimeout(); foreach (var evt in seedEvents.Skip(pageSize)) @@ -144,6 +218,12 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) return Task.CompletedTask; } + Task OnCaughtUp(StreamSubscription sub, CancellationToken ct) { + Fixture.Log.Information("Subscription has caught up"); + caughtUpCalled.TrySetResult(true); + return Task.CompletedTask; + } + void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } @@ -154,17 +234,19 @@ public async Task receives_all_events_with_resolved_links() { var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); + var caughtUpCalled = new TaskCompletionSource(); var seedEvents = Fixture.CreateTestEvents(3).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); - + await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); using var subscription = await Fixture.Streams - .SubscribeToAllAsync(FromAll.Start, OnReceived, true, OnDropped) + .SubscribeToAllAsync(FromAll.Start, OnReceived, true, OnDropped, OnCaughtUp) .WithTimeout(); await receivedAllEvents.Task.WithTimeout(); + await caughtUpCalled.Task.WithTimeout(); // if the subscription dropped before time, raise the reason why if (subscriptionDropped.Task.IsCompleted) @@ -192,6 +274,12 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) return Task.CompletedTask; } + Task OnCaughtUp(StreamSubscription sub, CancellationToken ct) { + Fixture.Log.Information("Subscription has caught up"); + caughtUpCalled.TrySetResult(true); + return Task.CompletedTask; + } + void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } @@ -206,7 +294,8 @@ public async Task receives_all_filtered_events_from_start(SubscriptionFilter fil var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); var checkpointReached = new TaskCompletionSource(); - + var caughtUpCalled = new TaskCompletionSource(); + var seedEvents = Fixture.CreateTestEvents(64) .Select(evt => filter.PrepareEvent(streamPrefix, evt)) .ToArray(); @@ -232,7 +321,7 @@ public async Task receives_all_filtered_events_from_start(SubscriptionFilter fil var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1, CheckpointReached); using var subscription = await Fixture.Streams - .SubscribeToAllAsync(FromAll.Start, OnReceived, false, OnDropped, filterOptions) + .SubscribeToAllAsync(FromAll.Start, OnReceived, false, OnDropped, OnCaughtUp, null, filterOptions) .WithTimeout(); // add some of the events we want to see after we start the subscription @@ -242,6 +331,7 @@ public async Task receives_all_filtered_events_from_start(SubscriptionFilter fil // wait until all events were received and at least one checkpoint was reached? await receivedAllEvents.Task.WithTimeout(); await checkpointReached.Task.WithTimeout(); + await caughtUpCalled.Task.WithTimeout(); // await Task.WhenAll(receivedAllEvents.Task, checkpointReached.Task).WithTimeout(); @@ -278,6 +368,12 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) return Task.CompletedTask; } + Task OnCaughtUp(StreamSubscription sub, CancellationToken ct) { + Fixture.Log.Information("Subscription has caught up"); + caughtUpCalled.TrySetResult(true); + return Task.CompletedTask; + } + void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) { subscriptionDropped.SetResult(new(reason, ex)); if (reason != SubscriptionDroppedReason.Disposed) { @@ -306,7 +402,8 @@ public async Task receives_all_filtered_events_from_end(SubscriptionFilter filte var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); var checkpointReached = new TaskCompletionSource(); - + var caughtUpCalled = new TaskCompletionSource(); + var seedEvents = Fixture.CreateTestEvents(64) .Select(evt => filter.PrepareEvent(streamPrefix, evt)) .ToArray(); @@ -329,7 +426,7 @@ public async Task receives_all_filtered_events_from_end(SubscriptionFilter filte var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1, CheckpointReached); using var subscription = await Fixture.Streams - .SubscribeToAllAsync(FromAll.End, OnReceived, false, OnDropped, filterOptions) + .SubscribeToAllAsync(FromAll.End, OnReceived, false, OnDropped, OnCaughtUp, null, filterOptions) .WithTimeout(); // add the events we want to receive after we start the subscription @@ -339,6 +436,7 @@ public async Task receives_all_filtered_events_from_end(SubscriptionFilter filte // wait until all events were received and at least one checkpoint was reached? await receivedAllEvents.Task.WithTimeout(); await checkpointReached.Task.WithTimeout(); + await caughtUpCalled.Task.WithTimeout(); // if the subscription dropped before time, raise the reason why if (subscriptionDropped.Task.IsCompleted) @@ -375,6 +473,12 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) return Task.CompletedTask; } + Task OnCaughtUp(StreamSubscription sub, CancellationToken ct) { + Fixture.Log.Information("Subscription has caught up"); + caughtUpCalled.TrySetResult(true); + return Task.CompletedTask; + } + void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) { subscriptionDropped.SetResult(new(reason, ex)); if (reason != SubscriptionDroppedReason.Disposed) { @@ -403,7 +507,8 @@ public async Task receives_all_filtered_events_from_position(SubscriptionFilter var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); var checkpointReached = new TaskCompletionSource(); - + var caughtUpCalled = new TaskCompletionSource(); + var seedEvents = Fixture.CreateTestEvents(64) .Select(evt => filter.PrepareEvent(streamPrefix, evt)) .ToArray(); @@ -429,7 +534,7 @@ public async Task receives_all_filtered_events_from_position(SubscriptionFilter var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1, CheckpointReached); using var subscription = await Fixture.Streams - .SubscribeToAllAsync(position, OnReceived, false, OnDropped, filterOptions) + .SubscribeToAllAsync(position, OnReceived, false, OnDropped, OnCaughtUp, null, filterOptions) .WithTimeout(); // add the events we want to receive after we start the subscription @@ -439,7 +544,8 @@ public async Task receives_all_filtered_events_from_position(SubscriptionFilter // wait until all events were received and at least one checkpoint was reached? await receivedAllEvents.Task.WithTimeout(); await checkpointReached.Task.WithTimeout(); - + await caughtUpCalled.Task.WithTimeout(); + // if the subscription dropped before time, raise the reason why if (subscriptionDropped.Task.IsCompleted) subscriptionDropped.Task.IsCompleted.ShouldBe(false, subscriptionDropped.Task.Result.ToString()); @@ -475,6 +581,12 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) return Task.CompletedTask; } + Task OnCaughtUp(StreamSubscription sub, CancellationToken ct) { + Fixture.Log.Information("Subscription has caught up"); + caughtUpCalled.TrySetResult(true); + return Task.CompletedTask; + } + void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) { subscriptionDropped.SetResult(new(reason, ex)); if (reason != SubscriptionDroppedReason.Disposed) { @@ -499,6 +611,7 @@ public async Task receives_all_filtered_events_with_resolved_links() { var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); + var caughtUpCalled = new TaskCompletionSource(); var seedEvents = Fixture.CreateTestEvents(3).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); @@ -510,7 +623,7 @@ public async Task receives_all_filtered_events_with_resolved_links() { ); using var subscription = await Fixture.Streams - .SubscribeToAllAsync(FromAll.Start, OnReceived, true, OnDropped, options) + .SubscribeToAllAsync(FromAll.Start, OnReceived, true, OnDropped, OnCaughtUp, null, options) .WithTimeout(); await receivedAllEvents.Task.WithTimeout(); @@ -541,6 +654,12 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) return Task.CompletedTask; } + Task OnCaughtUp(StreamSubscription sub, CancellationToken ct) { + Fixture.Log.Information("Subscription has caught up"); + caughtUpCalled.TrySetResult(true); + return Task.CompletedTask; + } + void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } @@ -573,7 +692,7 @@ public async Task Iterator_client_stops_reading_messages_when_subscription_dispo var dropped = new TaskCompletionSource(); var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start); - var testEvent = Fixture.CreateTestEvents(1).First(); + var testEvent = Fixture.CreateTestEvents(1).First(); ReadMessages(subscription, EventAppeared, SubscriptionDropped); if (dropped.Task.IsCompleted) { @@ -618,10 +737,10 @@ public async Task Callback_drops_when_subscriber_error() { [Fact] public async Task Iterator_client_stops_reading_messages_when_error_processing_event() { - var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; - var dropped = new TaskCompletionSource(); + var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); var expectedException = new Exception("Error"); - int numTimesCalled = 0; + int numTimesCalled = 0; var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start); ReadMessages(subscription, EventAppeared, SubscriptionDropped); @@ -641,12 +760,23 @@ Task EventAppeared(ResolvedEvent e) { void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); } - async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped) { + async void ReadMessages( + EventStoreClient.SubscriptionResult subscription, + Func eventAppeared, + Action subscriptionDropped, + Func? caughtUp = null + ) { Exception? exception = null; try { await foreach (var message in subscription.Messages) { - if (message is StreamMessage.Event eventMessage) { - await eventAppeared(eventMessage.ResolvedEvent); + switch (message) { + case StreamMessage.Event eventMessage: await eventAppeared(eventMessage.ResolvedEvent); + break; + + case StreamMessage.SubscriptionMessage.CaughtUp: { + if (caughtUp is not null) await caughtUp(subscription); + break; + } } } } catch (Exception ex) { diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs index 0be18a4d5..fcab59598 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs @@ -4,7 +4,7 @@ namespace EventStore.Client.Streams.Tests.Subscriptions; [Trait("Category", "Target:Stream")] public class subscribe_to_stream(ITestOutputHelper output, SubscriptionsFixture fixture) : EventStoreTests(output, fixture) { [Fact] - public async Task receives_all_events_from_start() { + public async Task Callback_receives_all_events_from_start() { var streamName = Fixture.GetStreamName(); var receivedAllEvents = new TaskCompletionSource(); @@ -149,9 +149,9 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti [Fact] public async Task Iterator_subscribe_to_non_existing_stream() { - var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; var appeared = new TaskCompletionSource(); - var dropped = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start); ReadMessages(subscription, EventAppeared, SubscriptionDropped); @@ -262,11 +262,11 @@ public async Task Callback_drops_when_disposed() { [Fact] public async Task Iterator_client_stops_reading_messages_when_subscription_disposed() { - var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource(); var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start); - var testEvent = Fixture.CreateTestEvents(1).First(); + var testEvent = Fixture.CreateTestEvents(1).First(); ReadMessages(subscription, EventAppeared, SubscriptionDropped); if (dropped.Task.IsCompleted) { @@ -314,10 +314,10 @@ public async Task Callback_drops_when_subscriber_error() { [Fact] public async Task Iterator_client_stops_reading_messages_when_error_processing_event() { - var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; - var dropped = new TaskCompletionSource(); + var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); var expectedException = new Exception("Error"); - int numTimesCalled = 0; + int numTimesCalled = 0; var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start); ReadMessages(subscription, EventAppeared, SubscriptionDropped); @@ -428,12 +428,23 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti subscriptionDropped.SetResult(new(reason, ex)); } - async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action? subscriptionDropped) { + async void ReadMessages( + EventStoreClient.SubscriptionResult subscription, + Func eventAppeared, + Action? subscriptionDropped, + Func? caughtUp = null + ) { Exception? exception = null; try { await foreach (var message in subscription.Messages) { - if (message is StreamMessage.Event eventMessage) { - await eventAppeared(eventMessage.ResolvedEvent); + switch (message) { + case StreamMessage.Event eventMessage: await eventAppeared(eventMessage.ResolvedEvent); + break; + + case StreamMessage.SubscriptionMessage.CaughtUp: { + if (caughtUp is not null) await caughtUp(subscription); + break; + } } } } catch (Exception ex) {