Skip to content

Commit

Permalink
adding sensible persistent subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
thefringeninja committed Feb 14, 2024
1 parent 10f86d3 commit 874f978
Show file tree
Hide file tree
Showing 99 changed files with 4,798 additions and 1,903 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Encodings.Web;
using System.Threading.Channels;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
Expand All @@ -11,6 +9,12 @@ namespace EventStore.Client {
/// The client used to manage persistent subscriptions in the EventStoreDB.
/// </summary>
public sealed partial class EventStorePersistentSubscriptionsClient : EventStoreClientBase {
private static BoundedChannelOptions ReadBoundedChannelOptions = new (1) {
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = true
};

private readonly ILogger _log;

/// <summary>
Expand Down
171 changes: 42 additions & 129 deletions src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client.PersistentSubscriptions;
using Grpc.Core;
using Microsoft.Extensions.Logging;
Expand All @@ -12,12 +7,12 @@ namespace EventStore.Client {
/// Represents a persistent subscription connection.
/// </summary>
public class PersistentSubscription : IDisposable {
private readonly EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult _persistentSubscriptionResult;
private readonly IAsyncEnumerator<PersistentSubscriptionMessage> _enumerator;
private readonly Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> _eventAppeared;
private readonly Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> _subscriptionDropped;
private readonly IDisposable _disposable;
private readonly CancellationToken _cancellationToken;
private readonly AsyncDuplexStreamingCall<ReadReq, ReadResp> _call;
private readonly ILogger _log;
private readonly Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> _subscriptionDropped;
private readonly ILogger _log;
private readonly CancellationTokenSource _cts;

private int _subscriptionDroppedInvoked;

Expand All @@ -27,61 +22,49 @@ public class PersistentSubscription : IDisposable {
public string SubscriptionId { get; }

internal static async Task<PersistentSubscription> Confirm(
ChannelBase channel, CallInvoker callInvoker, EventStoreClientSettings settings,
UserCredentials? userCredentials, ReadReq.Types.Options options, ILogger log,
EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult persistentSubscriptionResult,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped,
ILogger log,
UserCredentials? userCredentials,
CancellationToken cancellationToken = default) {
var enumerator = persistentSubscriptionResult
.Messages
.GetAsyncEnumerator(cancellationToken);

var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker)
.Read(EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials,
cancellationToken: cts.Token));

await call.RequestStream.WriteAsync(new ReadReq {
Options = options
}).ConfigureAwait(false);

if (await call.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false) &&
call.ResponseStream.Current.ContentCase == ReadResp.ContentOneofCase.SubscriptionConfirmation)
return new PersistentSubscription(call, log, eventAppeared, subscriptionDropped, cts.Token, cts);
if (!await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(false) ||
enumerator.Current is not PersistentSubscriptionMessage.SubscriptionConfirmation (var subscriptionId)) {
throw new InvalidOperationException("Subscription could not be confirmed.");
}

call.Dispose();
cts.Dispose();
throw new InvalidOperationException("Subscription could not be confirmed.");
return new PersistentSubscription(persistentSubscriptionResult, enumerator, subscriptionId, eventAppeared,
subscriptionDropped, log, cancellationToken);
}

// PersistentSubscription takes responsibility for disposing the call and the disposable
private PersistentSubscription(
AsyncDuplexStreamingCall<ReadReq, ReadResp> call,
ILogger log,
EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult persistentSubscriptionResult,
IAsyncEnumerator<PersistentSubscriptionMessage> enumerator, string subscriptionId,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped,
CancellationToken cancellationToken,
IDisposable disposable) {
_call = call;
_eventAppeared = eventAppeared;
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped, ILogger log,
CancellationToken cancellationToken) {
_persistentSubscriptionResult = persistentSubscriptionResult;
_enumerator = enumerator;
SubscriptionId = subscriptionId;
_eventAppeared = eventAppeared;
_subscriptionDropped = subscriptionDropped;
_cancellationToken = cancellationToken;
_disposable = disposable;
_log = log;
SubscriptionId = call.ResponseStream.Current.SubscriptionConfirmation.SubscriptionId;
Task.Run(Subscribe);
_log = log;
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

Task.Run(Subscribe, _cts.Token);
}

/// <summary>
/// Acknowledge that a message has completed processing (this will tell the server it has been processed).
/// </summary>
/// <remarks>There is no need to ack a message if you have Auto Ack enabled.</remarks>
/// <param name="eventIds">The <see cref="Uuid"/> of the <see cref="ResolvedEvent" />s to acknowledge. There should not be more than 2000 to ack at a time.</param>
public Task Ack(params Uuid[] eventIds) {
if (eventIds.Length > 2000) {
throw new ArgumentException();
}

return AckInternal(eventIds);
}
public Task Ack(params Uuid[] eventIds) => AckInternal(eventIds);

/// <summary>
/// Acknowledge that a message has completed processing (this will tell the server it has been processed).
Expand Down Expand Up @@ -114,13 +97,7 @@ public Task Ack(IEnumerable<ResolvedEvent> resolvedEvents) =>
/// <param name="reason">A reason given.</param>
/// <param name="eventIds">The <see cref="Uuid"/> of the <see cref="ResolvedEvent" />s to nak. There should not be more than 2000 to nak at a time.</param>
/// <exception cref="ArgumentException">The number of eventIds exceeded the limit of 2000.</exception>
public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) {
if (eventIds.Length > 2000) {
throw new ArgumentException();
}

return NackInternal(eventIds, action, reason);
}
public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) => NackInternal(eventIds, action, reason);

/// <summary>
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
Expand All @@ -130,7 +107,7 @@ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, par
/// <param name="resolvedEvents">The <see cref="ResolvedEvent" />s to nak. There should not be more than 2000 to nak at a time.</param>
/// <exception cref="ArgumentException">The number of resolvedEvents exceeded the limit of 2000.</exception>
public Task Nack(PersistentSubscriptionNakEventAction action, string reason,
params ResolvedEvent[] resolvedEvents) =>
params ResolvedEvent[] resolvedEvents) =>
Nack(action, reason,
Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId));

Expand All @@ -141,12 +118,11 @@ private async Task Subscribe() {
_log.LogDebug("Persistent Subscription {subscriptionId} confirmed.", SubscriptionId);

try {
await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) {
if (response.ContentCase != ReadResp.ContentOneofCase.Event) {
while (await _enumerator.MoveNextAsync(_cts.Token).ConfigureAwait(false)) {
if (_enumerator.Current is not PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
continue;
}

var resolvedEvent = ConvertToResolvedEvent(response);

_log.LogTrace(
"Persistent Subscription {subscriptionId} received event {streamName}@{streamRevision} {position}",
SubscriptionId, resolvedEvent.OriginalEvent.EventStreamId,
Expand All @@ -156,13 +132,8 @@ private async Task Subscribe() {
await _eventAppeared(
this,
resolvedEvent,
response.Event.CountCase switch {
ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event
.RetryCount,
_ => default
},
_cancellationToken).ConfigureAwait(false);

retryCount,
_cts.Token).ConfigureAwait(false);
} catch (Exception ex) when (ex is ObjectDisposedException or OperationCanceledException) {
if (_subscriptionDroppedInvoked != 0) {
return;
Expand All @@ -186,23 +157,6 @@ await _eventAppeared(
}
} catch (Exception ex) {
if (_subscriptionDroppedInvoked == 0) {
#if NET48
switch (ex) {
// The gRPC client for .NET 48 uses WinHttpHandler under the hood for sending HTTP requests.
// In certain scenarios, this can lead to exceptions of type WinHttpException being thrown.
// One such scenario is when the server abruptly closes the connection, which results in a WinHttpException with the error code 12030.
// Additionally, there are cases where the server response does not include the 'grpc-status' header.
// The absence of this header leads to an RpcException with the status code 'Cancelled' and the message "No grpc-status found on response".
// The switch statement below handles these specific exceptions and translates them into the appropriate
// PersistentSubscriptionDroppedByServerException exception. The downside of this approach is that it does not return the stream name
// and group name.
case RpcException { StatusCode: StatusCode.Unavailable } rex1 when rex1.Status.Detail.Contains("WinHttpException: Error 12030"):
case RpcException { StatusCode: StatusCode.Cancelled } rex2
when rex2.Status.Detail.Contains("No grpc-status found on response"):
ex = new PersistentSubscriptionDroppedByServerException("", "", ex);
break;
}
#endif
_log.LogError(ex,
"Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.",
SubscriptionId);
Expand All @@ -216,26 +170,6 @@ when rex2.Status.Detail.Contains("No grpc-status found on response"):
SubscriptionDropped(SubscriptionDroppedReason.ServerError);
}
}

ResolvedEvent ConvertToResolvedEvent(ReadResp response) => new(
ConvertToEventRecord(response.Event.Event)!,
ConvertToEventRecord(response.Event.Link),
response.Event.PositionCase switch {
ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition,
_ => null
});

EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) =>
e == null
? null
: new EventRecord(
e.StreamIdentifier!,
Uuid.FromDto(e.Id),
new StreamPosition(e.StreamRevision),
new Position(e.CommitPosition, e.PreparePosition),
e.Metadata,
e.Data.ToByteArray(),
e.CustomMetadata.ToByteArray());
}

private void SubscriptionDropped(SubscriptionDroppedReason reason, Exception? ex = null) {
Expand All @@ -246,35 +180,14 @@ private void SubscriptionDropped(SubscriptionDroppedReason reason, Exception? ex
try {
_subscriptionDropped.Invoke(this, reason, ex);
} finally {
_call.Dispose();
_disposable.Dispose();
_persistentSubscriptionResult.Dispose();
_cts.Dispose();
}
}

private Task AckInternal(params Uuid[] ids) =>
_call.RequestStream.WriteAsync(new ReadReq {
Ack = new ReadReq.Types.Ack {
Ids = {
Array.ConvertAll(ids, id => id.ToDto())
}
}
});
private Task AckInternal(params Uuid[] ids) => _persistentSubscriptionResult.Ack(ids);

private Task NackInternal(Uuid[] ids, PersistentSubscriptionNakEventAction action, string reason) =>
_call!.RequestStream.WriteAsync(new ReadReq {
Nack = new ReadReq.Types.Nack {
Ids = {
Array.ConvertAll(ids, id => id.ToDto())
},
Action = action switch {
PersistentSubscriptionNakEventAction.Park => ReadReq.Types.Nack.Types.Action.Park,
PersistentSubscriptionNakEventAction.Retry => ReadReq.Types.Nack.Types.Action.Retry,
PersistentSubscriptionNakEventAction.Skip => ReadReq.Types.Nack.Types.Action.Skip,
PersistentSubscriptionNakEventAction.Stop => ReadReq.Types.Nack.Types.Action.Stop,
_ => ReadReq.Types.Nack.Types.Action.Unknown
},
Reason = reason
}
});
_persistentSubscriptionResult.Nack(action, reason, ids);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
namespace EventStore.Client {
/// <summary>
/// The base record of all stream messages.
/// </summary>
public abstract record PersistentSubscriptionMessage {
/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> that represents a <see cref="EventStore.Client.ResolvedEvent"/>.
/// </summary>
/// <param name="ResolvedEvent">The <see cref="EventStore.Client.ResolvedEvent"/>.</param>
/// <param name="RetryCount">The number of times the <see cref="EventStore.Client.ResolvedEvent"/> has been retried.</param>
public record Event(ResolvedEvent ResolvedEvent, int? RetryCount) : PersistentSubscriptionMessage;

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> representing a stream that was not found.
/// </summary>
public record NotFound : PersistentSubscriptionMessage {
internal static readonly NotFound Instance = new();
}

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> representing a successful read operation.
/// </summary>
public record Ok : PersistentSubscriptionMessage {
internal static readonly Ok Instance = new();
};

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> indicating the first position of a stream.
/// </summary>
/// <param name="StreamPosition">The <see cref="EventStore.Client.StreamPosition"/>.</param>
public record FirstStreamPosition(StreamPosition StreamPosition) : PersistentSubscriptionMessage;

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> indicating the last position of a stream.
/// </summary>
/// <param name="StreamPosition">The <see cref="EventStore.Client.StreamPosition"/>.</param>
public record LastStreamPosition(StreamPosition StreamPosition) : PersistentSubscriptionMessage;

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> indicating the last position of the $all stream.
/// </summary>
/// <param name="Position">The <see cref="EventStore.Client.Position"/>.</param>
public record LastAllStreamPosition(Position Position) : PersistentSubscriptionMessage;

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> indicating that the subscription is ready to send additional messages.
/// </summary>
/// <param name="SubscriptionId">The unique identifier of the subscription.</param>
public record SubscriptionConfirmation(string SubscriptionId) : PersistentSubscriptionMessage;

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> indicating that a checkpoint has been reached.
/// </summary>
/// <param name="Position">The <see cref="Position" />.</param>
public record AllStreamCheckpointReached(Position Position) : PersistentSubscriptionMessage;

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> indicating that a checkpoint has been reached.
/// </summary>
/// <param name="StreamPosition">The <see cref="StreamPosition" />.</param>
public record StreamCheckpointReached(StreamPosition StreamPosition) : PersistentSubscriptionMessage;

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> indicating that the subscription is live.
/// </summary>
public record CaughtUp : PersistentSubscriptionMessage {
internal static readonly CaughtUp Instance = new();
}

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> indicating that the subscription has switched to catch up mode.
/// </summary>
public record FellBehind : PersistentSubscriptionMessage {
internal static readonly FellBehind Instance = new();
}

/// <summary>
/// A <see cref="PersistentSubscriptionMessage"/> that could not be identified, usually indicating a lower client compatibility level than the server supports.
/// </summary>
public record Unknown : PersistentSubscriptionMessage {
internal static readonly Unknown Instance = new();
}
}
}
Loading

0 comments on commit 874f978

Please sign in to comment.