Skip to content

Commit

Permalink
Dispose on token cancellation request
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Jul 1, 2024
1 parent 12b6e39 commit 19f8c31
Show file tree
Hide file tree
Showing 6 changed files with 459 additions and 161 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using EventStore.Client.PersistentSubscriptions;
using Grpc.Core;
using Microsoft.Extensions.Logging;

namespace EventStore.Client {
/// <summary>
/// Represents a persistent subscription connection.
/// </summary>
public class PersistentSubscription : IDisposable {
private readonly EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult _persistentSubscriptionResult;
private readonly EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult
_persistentSubscriptionResult;

private readonly IAsyncEnumerator<PersistentSubscriptionMessage> _enumerator;
private readonly Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> _eventAppeared;
private readonly Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> _subscriptionDropped;
Expand All @@ -25,7 +25,8 @@ internal static async Task<PersistentSubscription> Confirm(
EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult persistentSubscriptionResult,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped,
ILogger log, UserCredentials? userCredentials, CancellationToken cancellationToken = default) {
ILogger log, UserCredentials? userCredentials, CancellationToken cancellationToken = default
) {
var enumerator = persistentSubscriptionResult
.Messages
.GetAsyncEnumerator(cancellationToken);
Expand All @@ -34,11 +35,20 @@ internal static async Task<PersistentSubscription> Confirm(

return (result, enumerator.Current) switch {
(true, PersistentSubscriptionMessage.SubscriptionConfirmation (var subscriptionId)) =>
new PersistentSubscription(persistentSubscriptionResult, enumerator, subscriptionId, eventAppeared,
subscriptionDropped, log, cancellationToken),
new PersistentSubscription(
persistentSubscriptionResult,
enumerator,
subscriptionId,
eventAppeared,
subscriptionDropped,
log,
cancellationToken
),
(true, PersistentSubscriptionMessage.NotFound) =>
throw new PersistentSubscriptionNotFoundException(persistentSubscriptionResult.StreamName,
persistentSubscriptionResult.GroupName),
throw new PersistentSubscriptionNotFoundException(
persistentSubscriptionResult.StreamName,
persistentSubscriptionResult.GroupName
),
_ => throw new InvalidOperationException("Subscription could not be confirmed.")
};
}
Expand All @@ -49,14 +59,15 @@ private PersistentSubscription(
IAsyncEnumerator<PersistentSubscriptionMessage> enumerator, string subscriptionId,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped, ILogger log,
CancellationToken cancellationToken) {
CancellationToken cancellationToken
) {
_persistentSubscriptionResult = persistentSubscriptionResult;
_enumerator = enumerator;
SubscriptionId = subscriptionId;
_eventAppeared = eventAppeared;
_subscriptionDropped = subscriptionDropped;
_log = log;
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_enumerator = enumerator;
SubscriptionId = subscriptionId;
_eventAppeared = eventAppeared;
_subscriptionDropped = subscriptionDropped;
_log = log;
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

Task.Run(Subscribe, _cts.Token);
}
Expand Down Expand Up @@ -91,15 +102,15 @@ public Task Ack(params ResolvedEvent[] resolvedEvents) =>
public Task Ack(IEnumerable<ResolvedEvent> resolvedEvents) =>
Ack(resolvedEvents.Select(resolvedEvent => resolvedEvent.OriginalEvent.EventId));


/// <summary>
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
/// </summary>
/// <param name="action">The <see cref="PersistentSubscriptionNakEventAction"/> to take.</param>
/// <param name="reason">A reason given.</param>
/// <param name="eventIds">The <see cref="Uuid"/> of the <see cref="ResolvedEvent" />s to nak. There should not be more than 2000 to nak at a time.</param>
/// <exception cref="ArgumentException">The number of eventIds exceeded the limit of 2000.</exception>
public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) => NackInternal(eventIds, action, reason);
public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) =>
NackInternal(eventIds, action, reason);

/// <summary>
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
Expand All @@ -108,10 +119,15 @@ public Task Ack(IEnumerable<ResolvedEvent> resolvedEvents) =>
/// <param name="reason">A reason given.</param>
/// <param name="resolvedEvents">The <see cref="ResolvedEvent" />s to nak. There should not be more than 2000 to nak at a time.</param>
/// <exception cref="ArgumentException">The number of resolvedEvents exceeded the limit of 2000.</exception>
public Task Nack(PersistentSubscriptionNakEventAction action, string reason,
params ResolvedEvent[] resolvedEvents) =>
Nack(action, reason,
Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId));
public Task Nack(
PersistentSubscriptionNakEventAction action, string reason,
params ResolvedEvent[] resolvedEvents
) =>
Nack(
action,
reason,
Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId)
);

/// <inheritdoc />
public void Dispose() => SubscriptionDropped(SubscriptionDroppedReason.Disposed);
Expand All @@ -121,64 +137,94 @@ private async Task Subscribe() {

try {
while (await _enumerator.MoveNextAsync(_cts.Token).ConfigureAwait(false)) {
if (_enumerator.Current is not PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
if (_enumerator.Current is not
PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
continue;
}

if (_enumerator.Current is PersistentSubscriptionMessage.NotFound) {
if (_subscriptionDroppedInvoked != 0) {
return;
}
SubscriptionDropped(SubscriptionDroppedReason.ServerError,

SubscriptionDropped(
SubscriptionDroppedReason.ServerError,
new PersistentSubscriptionNotFoundException(
_persistentSubscriptionResult.StreamName, _persistentSubscriptionResult.GroupName));
_persistentSubscriptionResult.StreamName,
_persistentSubscriptionResult.GroupName
)
);

return;
}

_log.LogTrace(
"Persistent Subscription {subscriptionId} received event {streamName}@{streamRevision} {position}",
SubscriptionId, resolvedEvent.OriginalEvent.EventStreamId,
resolvedEvent.OriginalEvent.EventNumber, resolvedEvent.OriginalEvent.Position);
SubscriptionId,
resolvedEvent.OriginalEvent.EventStreamId,
resolvedEvent.OriginalEvent.EventNumber,
resolvedEvent.OriginalEvent.Position
);

try {
await _eventAppeared(
this,
resolvedEvent,
retryCount,
_cts.Token).ConfigureAwait(false);
_cts.Token
).ConfigureAwait(false);
} catch (Exception ex) when (ex is ObjectDisposedException or OperationCanceledException) {
if (_subscriptionDroppedInvoked != 0) {
return;
}

_log.LogWarning(ex,
_log.LogWarning(
ex,
"Persistent Subscription {subscriptionId} was dropped because cancellation was requested by another caller.",
SubscriptionId);
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.Disposed);

return;
} catch (Exception ex) {
_log.LogError(ex,
_log.LogError(
ex,
"Persistent Subscription {subscriptionId} was dropped because the subscriber made an error.",
SubscriptionId);
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.SubscriberError, ex);

return;
}
}
} catch (Exception ex) {
if (_subscriptionDroppedInvoked == 0) {
_log.LogError(ex,
"Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.",
SubscriptionId);
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
if (_cts.Token.IsCancellationRequested) {
_log.LogInformation(
"Subscription {subscriptionId} was dropped because cancellation was requested.",
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
} else {
_log.LogError(
ex,
"Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.",
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
}
}
} finally {
if (_subscriptionDroppedInvoked == 0) {
_log.LogError(
"Persistent Subscription {subscriptionId} was unexpectedly terminated.",
SubscriptionId);
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.ServerError);
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/EventStore.Client.Streams/StreamSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,22 @@ await _checkpointReached(this, position, _cts.Token)
SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
} catch (Exception ex) {
if (_subscriptionDroppedInvoked == 0) {
_log.LogError(
ex,
"Subscription {subscriptionId} was dropped because an error occurred on the server.",
SubscriptionId
);
if (_cts.IsCancellationRequested) {
_log.LogInformation(
"Subscription {subscriptionId} was dropped because cancellation was requested by the client.",
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
} else {
_log.LogError(
ex,
"Subscription {subscriptionId} was dropped because an error occurred on the server.",
SubscriptionId
);

SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// ReSharper disable InconsistentNaming

using EventStore.Client.Streams.Tests.Subscriptions;

namespace EventStore.Client.PersistentSubscriptions.Tests.SubscriptionToAll.Obsolete;

[Obsolete("Will be removed in future release when older subscriptions APIs are removed from the client")]
public class
PersistentSubscriptionDropsDueToCancellationToken(PersistentSubscriptionDropsDueToCancellationToken.Fixture fixture)
: IClassFixture<
PersistentSubscriptionDropsDueToCancellationToken.Fixture> {
static readonly string Group = Guid.NewGuid().ToString();
static readonly string Stream = Guid.NewGuid().ToString();

[SupportsPSToAll.Fact]
public async Task persistent_subscription_to_all_drops_due_to_cancellation_token() {
var subscriptionDropped = new TaskCompletionSource<SubscriptionDroppedResult>();

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));

await fixture.Client.CreateToAllAsync(
Group,
cancellationToken: cts.Token,
settings: new PersistentSubscriptionSettings()
);

using var subscription = await fixture.Client.SubscribeToAllAsync(
Group,
async (s, e, r, ct) => await s.Ack(e),
(sub, reason, ex) => subscriptionDropped.SetResult(new SubscriptionDroppedResult(reason, ex)),
userCredentials: TestCredentials.Root,
cancellationToken: cts.Token
)
.WithTimeout();

// wait until the cancellation token cancels
await Task.Delay(TimeSpan.FromSeconds(3));

var result = await subscriptionDropped.Task.WithTimeout();
result.Reason.ShouldBe(SubscriptionDroppedReason.Disposed);
}

[SupportsPSToAll.Fact]
public async Task persistent_subscription_to_stream_drops_due_to_cancellation_token() {
var subscriptionDropped = new TaskCompletionSource<SubscriptionDroppedResult>();

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));

await fixture.Client.CreateToStreamAsync(
Group,
Stream,
cancellationToken: cts.Token,
settings: new PersistentSubscriptionSettings()
);

using var subscription = await fixture.Client.SubscribeToStreamAsync(
Group,
Stream,
async (s, e, r, ct) => await s.Ack(e),
(sub, reason, ex) => subscriptionDropped.SetResult(new SubscriptionDroppedResult(reason, ex)),
userCredentials: TestCredentials.Root,
cancellationToken: cts.Token
)
.WithTimeout();

// wait until the cancellation token cancels
await Task.Delay(TimeSpan.FromSeconds(3));

var result = await subscriptionDropped.Task.WithTimeout();
result.Reason.ShouldBe(SubscriptionDroppedReason.Disposed);
}

public class Fixture : EventStoreClientFixture {
protected override Task Given() {
return Task.CompletedTask;
}

protected override Task When() => Task.CompletedTask;
}
}
Loading

0 comments on commit 19f8c31

Please sign in to comment.