Skip to content

Commit

Permalink
Subscriptions Without Callbacks (DEV-112) (#282)
Browse files Browse the repository at this point in the history
* add sensible subscriptions

* add sensible persistent subscriptions

* obsolete old subscription / ps apis

* put missing props on sever exception in net48
  • Loading branch information
thefringeninja authored Feb 28, 2024
1 parent e65f330 commit 5bd189a
Show file tree
Hide file tree
Showing 138 changed files with 7,364 additions and 3,372 deletions.
4 changes: 2 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
<DebugType Condition=" '$(Configuration)' == 'Release' ">pdbonly</DebugType>

<UseLinkBase>true</UseLinkBase>
<GrpcPackageVersion>2.59.0</GrpcPackageVersion>
<GrpcToolsPackageVersion>2.59.0</GrpcToolsPackageVersion>
<GrpcPackageVersion>2.60.0</GrpcPackageVersion>
<GrpcToolsPackageVersion>2.60.0</GrpcToolsPackageVersion>
</PropertyGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net48'">
Expand Down
139 changes: 85 additions & 54 deletions samples/persistent-subscriptions/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,21 @@

await CreatePersistentSubscription(client);
await UpdatePersistentSubscription(client);
await ConnectToPersistentSubscriptionToStream(client);

try {
await ConnectToPersistentSubscriptionToStream(client, GetCT());
} catch (OperationCanceledException) { }

await CreatePersistentSubscriptionToAll(client);
await ConnectToPersistentSubscriptionToAll(client);
await ConnectToPersistentSubscriptionWithManualAcks(client);

try {
await ConnectToPersistentSubscriptionToAll(client, GetCT());
} catch (OperationCanceledException) { }

try {
await ConnectToPersistentSubscriptionWithManualAcks(client, GetCT());
} catch (OperationCanceledException) { }

await GetPersistentSubscriptionToStreamInfo(client);
await GetPersistentSubscriptionToAllInfo(client);
await ReplayParkedToStream(client);
Expand Down Expand Up @@ -39,33 +50,38 @@ await client.CreateToStreamAsync(
);

Console.WriteLine("Subscription to stream created");

#endregion create-persistent-subscription-to-stream
}

static async Task ConnectToPersistentSubscriptionToStream(EventStorePersistentSubscriptionsClient client) {
static async Task ConnectToPersistentSubscriptionToStream(EventStorePersistentSubscriptionsClient client,
CancellationToken ct) {
#region subscribe-to-persistent-subscription-to-stream

var subscription = await client.SubscribeToStreamAsync(
await using var subscription = client.SubscribeToStream(
"test-stream",
"subscription-group",
async (subscription, evnt, retryCount, cancellationToken) => {
await HandleEvent(evnt);
await subscription.Ack(evnt);
},
(subscription, dropReason, exception) => {
Console.WriteLine($"Subscription to stream was dropped due to {dropReason}. {exception?.Message}");
"subscription-group",
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId):
Console.WriteLine($"Subscription {subscriptionId} to stream started");
break;
case PersistentSubscriptionMessage.Event(var resolvedEvent, _):
await HandleEvent(resolvedEvent);
await subscription.Ack(resolvedEvent);
break;
}
);
}

Console.WriteLine("Subscription to stream started");
#endregion subscribe-to-persistent-subscription-to-stream
}

static async Task CreatePersistentSubscriptionToAll(EventStorePersistentSubscriptionsClient client) {
#region create-persistent-subscription-to-all

var userCredentials = new UserCredentials("admin", "changeit");
var filter = StreamFilter.Prefix("test");
var filter = StreamFilter.Prefix("test");

var settings = new PersistentSubscriptionSettings();
await client.CreateToAllAsync(
Expand All @@ -76,45 +92,56 @@ await client.CreateToAllAsync(
);

Console.WriteLine("Subscription to all created");

#endregion create-persistent-subscription-to-all
}

static async Task ConnectToPersistentSubscriptionToAll(EventStorePersistentSubscriptionsClient client) {
static async Task ConnectToPersistentSubscriptionToAll(EventStorePersistentSubscriptionsClient client,
CancellationToken ct) {
#region subscribe-to-persistent-subscription-to-all

await client.SubscribeToAllAsync(
await using var subscription = client.SubscribeToAll(
"subscription-group",
async (subscription, evnt, retryCount, cancellationToken) => { await HandleEvent(evnt); },
(subscription, dropReason, exception) => {
Console.WriteLine($"Subscription to all was dropped due to {dropReason}. {exception?.Message}");
cancellationToken: ct);

await foreach (var message in subscription.Messages) {
switch (message) {
case PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId):
Console.WriteLine($"Subscription {subscriptionId} to stream started");
break;
case PersistentSubscriptionMessage.Event(var resolvedEvent, _):
await HandleEvent(resolvedEvent);
break;
}
);
}

Console.WriteLine("Subscription to all started");
#endregion subscribe-to-persistent-subscription-to-all
}

static async Task ConnectToPersistentSubscriptionWithManualAcks(EventStorePersistentSubscriptionsClient client) {
static async Task ConnectToPersistentSubscriptionWithManualAcks(EventStorePersistentSubscriptionsClient client,
CancellationToken ct) {
#region subscribe-to-persistent-subscription-with-manual-acks

var subscription = await client.SubscribeToStreamAsync(
await using var subscription = client.SubscribeToStream(
"test-stream",
"subscription-group",
async (subscription, evnt, retryCount, cancellationToken) => {
try {
await HandleEvent(evnt);
await subscription.Ack(evnt);
}
catch (UnrecoverableException ex) {
await subscription.Nack(PersistentSubscriptionNakEventAction.Park, ex.Message, evnt);
}
},
(subscription, dropReason, exception) => {
Console.WriteLine($"Subscription to stream with manual acks was dropped due to {dropReason}. {exception?.Message}");
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId):
Console.WriteLine($"Subscription {subscriptionId} to stream with manual acks started");
break;
case PersistentSubscriptionMessage.Event(var resolvedEvent, _):
try {
await HandleEvent(resolvedEvent);
await subscription.Ack(resolvedEvent);
} catch (UnrecoverableException ex) {
await subscription.Nack(PersistentSubscriptionNakEventAction.Park, ex.Message, resolvedEvent);
}
break;
}
);
}

Console.WriteLine("Subscription to stream with manual acks started");
#endregion subscribe-to-persistent-subscription-with-manual-acks
}

Expand All @@ -132,6 +159,7 @@ await client.UpdateToStreamAsync(
);

Console.WriteLine("Subscription updated");

#endregion update-persistent-subscription
}

Expand All @@ -147,14 +175,12 @@ await client.DeleteToStreamAsync(
);

Console.WriteLine("Subscription to stream deleted");
}
catch (PersistentSubscriptionNotFoundException) {
} catch (PersistentSubscriptionNotFoundException) {
// ignore
}
catch (Exception ex) {
} catch (Exception ex) {
Console.WriteLine($"Subscription to stream delete error: {ex.GetType()} {ex.Message}");
}

#endregion delete-persistent-subscription
}

Expand All @@ -169,11 +195,9 @@ await client.DeleteToAllAsync(
);

Console.WriteLine("Subscription to all deleted");
}
catch (PersistentSubscriptionNotFoundException) {
} catch (PersistentSubscriptionNotFoundException) {
// ignore
}
catch (Exception ex) {
} catch (Exception ex) {
Console.WriteLine($"Subscription to all delete error: {ex.GetType()} {ex.Message}");
}

Expand Down Expand Up @@ -221,6 +245,7 @@ await client.ReplayParkedMessagesToStreamAsync(
);

Console.WriteLine("Replay of parked messages to stream requested");

#endregion persistent-subscription-replay-parked-to-stream
}

Expand All @@ -235,6 +260,7 @@ await client.ReplayParkedMessagesToAllAsync(
);

Console.WriteLine("Replay of parked messages to all requested");

#endregion replay-parked-of-persistent-subscription-to-all
}

Expand All @@ -249,7 +275,7 @@ static async Task ListPersistentSubscriptionsToStream(EventStorePersistentSubscr

var entries = subscriptions
.Select(s => $"GroupName: {s.GroupName} EventSource: {s.EventSource} Status: {s.Status}");

Console.WriteLine($"Subscriptions to stream: [ {string.Join(", ", entries)} ]");

#endregion list-persistent-subscriptions-to-stream
Expand All @@ -259,27 +285,27 @@ static async Task ListPersistentSubscriptionsToAll(EventStorePersistentSubscript
#region list-persistent-subscriptions-to-all

var userCredentials = new UserCredentials("admin", "changeit");
var subscriptions = await client.ListToAllAsync(userCredentials: userCredentials);
var subscriptions = await client.ListToAllAsync(userCredentials: userCredentials);

var entries = subscriptions
.Select(s => $"GroupName: {s.GroupName} EventSource: {s.EventSource} Status: {s.Status}");

Console.WriteLine($"Subscriptions to all: [ {string.Join(", ", entries)} ]");

#endregion list-persistent-subscriptions-to-all
}

static async Task ListAllPersistentSubscriptions(EventStorePersistentSubscriptionsClient client) {
#region list-persistent-subscriptions

var userCredentials = new UserCredentials("admin", "changeit");
var subscriptions = await client.ListAllAsync(userCredentials: userCredentials);
var subscriptions = await client.ListAllAsync(userCredentials: userCredentials);

var entries = subscriptions
.Select(s => $"GroupName: {s.GroupName} EventSource: {s.EventSource} Status: {s.Status}");

Console.WriteLine($"Subscriptions: [{string.Join(", ", entries)} ]");

#endregion list-persistent-subscriptions
}

Expand All @@ -290,9 +316,14 @@ static async Task RestartPersistentSubscriptionSubsystem(EventStorePersistentSub
await client.RestartSubsystemAsync(userCredentials: userCredentials);

Console.WriteLine("Persistent subscription subsystem restarted");

#endregion restart-persistent-subscription-subsystem
}

static Task HandleEvent(ResolvedEvent evnt) => Task.CompletedTask;

class UnrecoverableException : Exception { }
// ensures that samples exit in a timely manner on CI
static CancellationToken GetCT() => new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token;

class UnrecoverableException : Exception {
}
Loading

0 comments on commit 5bd189a

Please sign in to comment.