Skip to content

Commit

Permalink
Add caughtUp handler for async methods
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Jan 8, 2024
1 parent 434626b commit f2ded3e
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 102 deletions.
19 changes: 11 additions & 8 deletions src/EventStore.Client.Streams/EventStoreClient.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public async IAsyncEnumerator<ResolvedEvent> GetAsyncEnumerator(

/// <summary>
/// Asynchronously reads all the events from a stream.
///
///
/// The result could also be inspected as a means to avoid handling exceptions as the <see cref="ReadState"/> would indicate whether or not the stream is readable./>
/// </summary>
/// <param name="direction">The <see cref="Direction"/> in which to read.</param>
Expand Down Expand Up @@ -227,12 +227,12 @@ public class ReadStreamResult : IAsyncEnumerable<ResolvedEvent> {
public string StreamName { get; }

/// <summary>
/// The <see cref="StreamPosition"/> of the first message in this stream. Will only be filled once <see cref="Messages"/> has been enumerated.
/// The <see cref="StreamPosition"/> of the first message in this stream. Will only be filled once <see cref="Messages"/> has been enumerated.
/// </summary>
public StreamPosition? FirstStreamPosition { get; private set; }

/// <summary>
/// The <see cref="StreamPosition"/> of the last message in this stream. Will only be filled once <see cref="Messages"/> has been enumerated.
/// The <see cref="StreamPosition"/> of the last message in this stream. Will only be filled once <see cref="Messages"/> has been enumerated.
/// </summary>
public StreamPosition? LastStreamPosition { get; private set; }

Expand Down Expand Up @@ -357,7 +357,7 @@ public async IAsyncEnumerator<ResolvedEvent> GetAsyncEnumerator(
}
}

private async IAsyncEnumerable<(SubscriptionConfirmation, Position?, ResolvedEvent)> ReadInternal(
private async IAsyncEnumerable<(SubscriptionConfirmation, Position?, ResolvedEvent, StreamMessage.SubscriptionMessage?)> ReadInternal(
ReadReq request,
UserCredentials? userCredentials,
[EnumeratorCancellation] CancellationToken cancellationToken) {
Expand Down Expand Up @@ -392,16 +392,18 @@ public async IAsyncEnumerator<ResolvedEvent> GetAsyncEnumerator(
}
}

private static (SubscriptionConfirmation, Position?, ResolvedEvent)? ConvertToItem(ReadResp response) =>
private static (SubscriptionConfirmation, Position?, ResolvedEvent, StreamMessage.SubscriptionMessage?)? ConvertToItem(ReadResp response) =>
response.ContentCase switch {
Confirmation => (
new SubscriptionConfirmation(response.Confirmation.SubscriptionId), null, default),
new SubscriptionConfirmation(response.Confirmation.SubscriptionId), null, default, null),
Event => (SubscriptionConfirmation.None,
null,
ConvertToResolvedEvent(response.Event)),
ConvertToResolvedEvent(response.Event), null),
Checkpoint => (SubscriptionConfirmation.None,
new Position(response.Checkpoint.CommitPosition, response.Checkpoint.PreparePosition),
default),
default, null),
CaughtUp => (SubscriptionConfirmation.None, null, default, StreamMessage.SubscriptionMessage.CaughtUp.Instance),
FellBehind => (SubscriptionConfirmation.None, null, default, StreamMessage.SubscriptionMessage.FellBehind.Instance),
_ => null
};

Expand All @@ -421,6 +423,7 @@ private static StreamMessage ConvertResponseToMessage(ReadResp response) =>
new StreamPosition(response.LastStreamPosition)),
StreamNotFound => StreamMessage.NotFound.Instance,
CaughtUp => StreamMessage.SubscriptionMessage.CaughtUp.Instance,
FellBehind => StreamMessage.SubscriptionMessage.FellBehind.Instance,
_ => StreamMessage.Unknown.Instance
};

Expand Down
40 changes: 23 additions & 17 deletions src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public partial class EventStoreClient {
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
/// <param name="caughtUp">An action invoked if the subscription reached the head of the stream.</param>
/// <param name="fellBehind">An action invoked if the subscription has fallen behind</param>
/// <param name="filterOptions">The optional <see cref="SubscriptionFilterOptions"/> to apply.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
Expand All @@ -26,6 +28,8 @@ public Task<StreamSubscription> SubscribeToAllAsync(
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
Func<StreamSubscription, CancellationToken, Task>? caughtUp = default,
Func<StreamSubscription, CancellationToken, Task>? fellBehind = default,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) => StreamSubscription.Confirm(ReadInternal(new ReadReq {
Expand All @@ -36,9 +40,9 @@ public Task<StreamSubscription> SubscribeToAllAsync(
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
Filter = GetFilterOptions(filterOptions)!
}
}, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
}, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, caughtUp, fellBehind, _log,
filterOptions?.CheckpointReached, cancellationToken);

/// <summary>
/// Subscribes to all events.
/// </summary>
Expand Down Expand Up @@ -72,6 +76,8 @@ public SubscriptionResult SubscribeToAll(
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
/// <param name="caughtUp">An action invoked if the subscription reached the head of the stream.</param>
/// <param name="fellBehind">An action is invoked if the subscription has fallen behind</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
Expand All @@ -80,6 +86,8 @@ public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
Func<StreamSubscription, CancellationToken, Task>? caughtUp = default,
Func<StreamSubscription, CancellationToken, Task>? fellBehind = default,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) => StreamSubscription.Confirm(ReadInternal(new ReadReq {
Options = new ReadReq.Types.Options {
Expand All @@ -88,7 +96,7 @@ public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions()
}
}, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
}, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, caughtUp, fellBehind, _log,
cancellationToken: cancellationToken);

/// <summary>
Expand All @@ -115,9 +123,7 @@ public SubscriptionResult SubscribeToStream(string streamName,
}
}, Settings, userCredentials, cancellationToken, _log);
}




/// <summary>
/// A class which represents current subscription state and an enumerator to consume messages
/// </summary>
Expand All @@ -130,9 +136,9 @@ public class SubscriptionResult {
/// The name of the stream.
/// </summary>
public string StreamName { get; }

/// <summary>
///
///
/// </summary>
public Position StreamPosition { get; private set; }

Expand All @@ -144,7 +150,7 @@ public class SubscriptionResult {
/// <summary>
/// Current subscription state
/// </summary>

public SubscriptionState SubscriptionState {
get {
if (_exceptionInternal is not null) {
Expand All @@ -156,7 +162,7 @@ public SubscriptionState SubscriptionState {
}

private volatile SubscriptionState _subscriptionStateInternal;

private volatile Exception? _exceptionInternal;

/// <summary>
Expand All @@ -170,7 +176,7 @@ async IAsyncEnumerable<StreamMessage> GetMessages() {
if (Interlocked.Exchange(ref _messagesEnumerated, 1) == 1) {
throw new InvalidOperationException("Messages may only be enumerated once.");
}

try {
await foreach (var message in _internalChannel.Reader.ReadAllAsync()
.ConfigureAwait(false)) {
Expand Down Expand Up @@ -232,10 +238,10 @@ internal SubscriptionResult(Func<CancellationToken, Task<CallInvoker>> selectCal
: request.Options.Stream.StreamIdentifier!;

_subscriptionStateInternal = Initializing;

_ = PumpMessages(selectCallInvoker, request, callOptions);
}

async Task PumpMessages(Func<CancellationToken, Task<CallInvoker>> selectCallInvoker, ReadReq request, CallOptions callOptions) {
var firstMessageRead = false;
var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false);
Expand Down Expand Up @@ -269,14 +275,14 @@ async Task PumpMessages(Func<CancellationToken, Task<CallInvoker>> selectCallInv
await _internalChannel.Writer.WriteAsync(messageToWrite, _cts.Token).ConfigureAwait(false);

if (messageToWrite is StreamMessage.NotFound) {
_exceptionInternal = new StreamNotFoundException(StreamName);
_exceptionInternal = new StreamNotFoundException(StreamName);
break;
}
}
} catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Cancelled &&
ex.Status.Detail.Contains("Call canceled by the client.")) {
_log.LogInformation(
"Subscription {subscriptionId} was dropped because cancellation was requested by the client.",
"Subscription {subscriptionId} was dropped because cancellation was requested by the client.",
SubscriptionId);
} catch (Exception ex) {
if (ex is ObjectDisposedException or OperationCanceledException) {
Expand Down Expand Up @@ -309,11 +315,11 @@ private static void Validate(ReadReq request) {

var streamOptions = request.Options.Stream;
var allOptions = request.Options.All;

if (allOptions == null && streamOptions == null) {
throw new ArgumentException("No stream provided to subscribe");
}

if (allOptions != null && streamOptions != null) {
throw new ArgumentException($"Cannot subscribe both ${SystemStreams.AllStream}, and ${streamOptions.StreamIdentifier}");
}
Expand Down
7 changes: 7 additions & 0 deletions src/EventStore.Client.Streams/StreamMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public record Checkpoint(Position Position) : SubscriptionMessage;
public record CaughtUp : SubscriptionMessage {
internal static readonly CaughtUp Instance = new();
}

/// <summary>
/// A <see cref="EventStore.Client.StreamMessage.SubscriptionMessage"/> representing client has fallen behind, meaning it's no longer keeping up with the stream's space
/// </summary>
public record FellBehind : SubscriptionMessage {
internal static readonly FellBehind Instance = new();
}
}

/// <summary>
Expand Down
Loading

0 comments on commit f2ded3e

Please sign in to comment.